任务处理方法及装置
本发明专利申请是申请日为2011年12月27日、申请号为201110444845.6、名称为“任务处理方法及装置”的中国发明专利申请的分案申请。
技术领域
本申请涉及网络技术领域,特别是涉及一种任务处理方法及装置。
背景技术
多道流水线处理技术,是一种基于网络消息通信的工作调度系统。在大规模分布式环境中,常使用多道流水线处理技术对数据处理任务进行高效地发布、分配和处理。
目前,多道流水线处理技术一般使用分布式消息队列实现。分布式消息队列中,通常包含三个基本组件,即,消息发布组件,消息中转组件和消息处理组件。其中,消息发布组件,为系统的使用者提供了固定的API(Application Programming Interface,应用程序接口),方便使用者提交自己的任务消息,消息被提交后,消息发布组件,将通过TCP(Transmission Control Protocol,传输控制协议)协议或UDP(User Datagram Protocol,用户数据报协议)协议,将任务消息发送至消息中转组件;消息中转组件的作用是临时存储消息,并将任务消息的发布信息通信给消息处理组件;当消息处理组件接收到来自消息中转组件的通知,将通过TCP协议或UDP协议接收任务消息,之后将该任务消息提交给处理单元进行处理,此时,消息中转组件将负责将消息置为不可见状态,防止多个任务处理单元接收到同样的任务消息;处理单元完成处理后,将根据处理结果发布消息至消息中转组件,当处理成功时,消息处理组件将通知消息中转组件删除该消息;当处理失败时,消息处理组件将通知消息中转组件把该消息重新置为可见状态,这样可以使其他处理单元有机会再次处理该任务。
一种分布式消息队列系统是如图1所示的Gearman系统。Gearman是目前基于C/C++实现的一个分布式消息队列系统,主要包含三个组件:Gearman Client,Gearman Server与Gearman Worker,分别对应于上述的消息发布组件、消息中转组件和消息处理组件。Gearman系统对于消息处理的流程,如图1中所示,包括:Gearman Client向Gearman Server发布任务;Gearman Server接收任务后,通过任务调度,将任务交给Gearman Worker进行处理;Gearman Worker对任务进行处理,并将处理结果返回给Gearman Server;GearmanServer根据返回的处理结果对其保存的任务进行相应地处理,如删除或重置为可见状态等,至次,一次任务处理完成。
可见,使用分布式消息队列这一实现方式,使得任务发布组件无需关心任务处理单元的位置,只需发送任务消息至消息中转组件,实现了任务发布与任务处理的解耦合。另一方面,任务消息由中转组件进行统一管理,降低了任务处理失败后的重新调试难度。
然而,目前的各类分布式消息队列系统,都缺乏对于网络连接的有效管理,对于已经建立的网络连接,通常不对其连接状态进行监控与管理,而仅仅依赖于系统的网络功能进行控制。这样,虽然可以在理想的网络环境中取得比较好的性能测试结果,但在实际使用中,特别是大规模分布式集群中,常常因为单个节点的故障,而使得所有消息发送与接收环节都要多次尝试连接一个失败的节点,这样对于性能的影响非常巨大。
发明内容
本申请所要解决的技术问题是提供一种任务处理方法及装置,以解决目前的分布式消息队列系统中,因节点失效造成集群处理能力下降的问题。
为了解决上述问题,本申请公开了一种任务处理方法,包括:设置一个或多个用于存储所述分布式消息队列中各个组件之间的网络连接信息的连接池;在分布式消息队列中的至少一个组件接收任务消息;根据所述网络连接的信息,从所述连接池中选择一个有效的网络连接;使用选择的所述有效的网络连接发送所述任务消息,进行所述任务消息所请求的任务处理。
优选地,所述分布式消息队列的每个组件中均设置有连接池,每个所述组件的连接池用于存储本组件与其它组件之间的网络连接的信息;所述根据所述网络连接的信息,从所述连接池中选择一个有效的网络连接的步骤包括:根据所述网络连接的信息,从接收到所述任务消息的组件的所述连接池中选择一个有效的网络连接。
优选地,任务处理方法还包括:若使用选择的所述有效的网络连接发送所述任务消息失败,则重新选择一个有效的网络连接发送所述任务消息,并中断失败的网络连接并释放其资源,将该网络连接设置为失效的网络连接;记录该网络连接中断的时间点,在设定的时间间隔内,不再使用该网络连接发送任何任务消息。
优选地,任务处理方法还包括:当经过所述设定的时间间隔后,接收到新的任务消息,则尝试恢复所述中断的网络连接。
优选地,任务处理方法还包括:若尝试恢复所述中断的网络连接失败,则重新设置该网络连接中断的时间点,在所述设定的时间间隔内,不再使用该网络连接发送任何任务消息;若尝试恢复所述中断的网络连接成功,则将该网络连接设置为有效的网络连接。
优选地,在所述根据所述网络连接的信息,从所述连接池中选择一个有效的网络连接的步骤之前,还包括:根据所述网络连接的信息,判断所述连接池中是否存在有效的网络连接;若存在,则执行所述根据所述网络连接的信息,从所述连接池中选择一个有效的网络连接的步骤;若不存在,将所述连接池中所有的网络连接的中断时间点设置为当前时间点,并尝试恢复所述连接池中所有的网络连接。
优选地,任务处理方法还包括:当使用选择的所述有效的网络连接发送所述任务消息成功完成后,将该网络连接返回给所述连接池,并判断该网络连接的当前连接状态,根据判断结果,将该网络连接设置为有效的网络连接或者无效的网络连接。
优选地,在所述多道流水线处理系统的分布式消息队列中的至少一个组件接收到任务消息的步骤之前,还包括:在所述分布式消息队列的各个组件中设置负载均衡算法。
优选地,所述根据所述网络连接的信息,从接收到所述任务消息的组件的所述连接池中选择一个有效的网络连接的步骤包括:根据所述网络连接的信息,按照所述负载均衡算法从接收到所述任务消息的组件的所述连接池中选择一个有效的网络连接。
优选地,所述分布式消息队列包括消息发布组件、消息中转组件和消息处理组件;在所述多道流水线处理系统的分布式消息队列中的至少一个组件接收到任务消息的步骤之前,还包括:将所述分布式消息队列的各个组件中的连接池分别封装为网络连接循环队列,并且在所述消息中转组件中设置消息优先级队列。
优选地,所述根据所述负载均衡算法和所述网络连接的信息,从接收到所述任务消息的组件的所述连接池中选择一个有效的网络连接的步骤包括:根据所述网络连接的信息,从接收到所述任务消息的组件的所述网络连接循环队列的头部开始,选择一个有效的网络连接;所述方法还包括:当使用选择的所述有效的网络连接发送所述任务消息成功完成后,将该网络连接返回所述网络连接循环队列的尾部。
优选地,所述消息中转组件中的所述消息优先级队列为先进先出方式的消息优先级队列;所述多道流水线处理系统的分布式消息队列中的至少一个组件接收到任务消息的步骤包括:所述消息中转组件从所述消息优先级队列的头部开始,向后查找第一个可见状态的任务消息作为当前待处理的任务消息。
为了解决上述问题,本申请还公开了一种任务处理装置,包括:连接池,用于存储所述分布式消息队列中各个组件之间的网络连接的信息;接收模块,用于在分布式消息队列中的至少一个组件中接收任务消息;选择模块,用于根据所述网络连接的信息,从所述连接池中选择一个有效的网络连接;连接模块,用于使用选择的所述有效的网络连接发送所述任务消息,进行所述任务消息所请求的任务处理。
优选地,所述分布式消息队列的每个组件中均设置有连接池,每个所述组件的连接池用于存储本组件与其它组件之间的网络连接的信息;所述选择模块,用于根据所述网络连接的信息,从接收到所述任务消息的组件的所述连接池中选择一个有效的网络连接。
优选地,任务处理装置还包括:失败模块,用于若所述连接模块使用选择的所述有效的网络连接发送所述任务消息失败,则重新选择一个有效的网络连接发送所述任务消息,并中断失败的网络连接并释放其资源,将该网络连接设置为失效的网络连接;记录该网络连接中断的时间点,在设定的时间间隔内,不再使用该网络连接发送任何任务消息。
优选地,任务处理装置还包括:重连模块,用于当经过所述设定的时间间隔后,所述接收模块接收到新的任务消息,尝试恢复所述中断的网络连接。
优选地,任务处理装置还包括:重连设置模块,用于若所述重连模块尝试恢复所述中断的网络连接失败,则重新设置该网络连接中断的时间点,在所述设定的时间间隔内,不再使用该网络连接发送任何任务消息;若尝试恢复所述中断的网络连接成功,则将该网络连接设置为有效的网络连接。
优选地,任务处理装置还包括:判断模块,用于在所述选择模块根据所述网络连接的信息,从所述连接池中选择一个有效的网络连接之前,根据所述网络连接的信息,判断所述连接池中是否存在有效的网络连接;若存在,则执行所述选择模块;若不存在,将所述连接池中所有的网络连接的中断时间点设置为当前时间点,并尝试恢复所述连接池中所有的网络连接。
优选地,任务处理装置还包括:成功模块,用于当所述连接模块使用选择的所述有效的网络连接发送所述任务消息成功完成后,将该网络连接返回给所述连接池,并判断该网络连接的当前连接状态,根据判断结果,将该网络连接设置为有效的网络连接或者无效的网络连接。
优选地,任务处理装置还包括:第一设置模块,用于在所述接收模块接收任务消息之前,在所述分布式消息队列的各个组件中设置负载均衡算法。
优选地,所述选择模块,用于根据所述网络连接的信息,按照所述负载均衡算法从接收到所述任务消息的组件的所述连接池中选择一个有效的网络连接。
优选地,所述分布式消息队列包括消息发布组件、消息中转组件和消息处理组件;所述装置还包括:第二设置模块,用于在所述接收模块接收任务消息之前,将所述分布式消息队列的各个组件中的连接池分别封装为网络连接循环队列,并且在所述消息中转组件中设置消息优先级队列。
优选地,所述选择模块,用于根据所述网络连接的信息,从接收到所述任务消息的组件的所述网络连接循环队列的头部开始,选择一个有效的网络连接;所述装置还包括:回归模块,用于当所述连接模块使用选择的所述有效的网络连接发送所述任务消息成功完成后,将该网络连接返回所述网络连接循环队列的尾部。
优选地,所述消息中转组件中的所述消息优先级队列为先进先出方式的消息优先级队列;所述接收模块,用于所述消息中转组件从所述消息优先级队列的头部开始,向后查找第一个可见状态的任务消息作为当前待处理的任务消息。
与现有技术相比,本申请具有以下优点:
本申请在多道流水线处理系统中设置连接池,对分布式消息队列中各个组件之间的网络连接情况进行管理,当有任务消息到达时,多道流水线处理系统会根据连接池中的网络连接情况为该消息分配有效的网络连接。与现有技术中,当网络连接出现故障时,如网络中的某个节点失效,多道流水线处理系统却不能获知,而仍然多次尝试该网络连接相比,本申请的多道流水线处理系统通过对连接池的管理即可及时获知网络连接情况,在网络中出现某个或某些节点失效时,放弃使用这些失效节点的网络连接,而选用正常节点进行有效的网络连接,这一方面使得系统可以及时侦测到已经失效的网络连接,另一方面保证了失效的节点对于任务消息的传递的影响到最低,从而有效避免了目前的分布式消息队列系统中,因节点失效造成集群处理能力下降的问题,有效保证了多道流水线处理系统的集群处理能力。
附图说明
图1是根据现有技术的一种Gearman系统进行任务处理的通信时序图;
图2是根据本申请实施例一的一种任务处理方法的步骤流程图;
图3是根据本申请实施例二的一种任务处理方法的步骤流程图;
图4是根据本申请实施例三的一种任务处理方法的步骤流程图;
图5是图4所示实施例的任务处理方法中的一种网络连接循环队列初始化状态时的示意图;
图6是图4所示实施例的任务处理方法中的一种网络连接循环队列申请连接后状态的示意图;
图7是根据本申请实施例四的一种任务处理装置的结构框图;
图8是根据本申请实施例五的一种任务处理装置的结构框图;
图9是图8所示实施例的任务处理装置中的网络连接管理模块管理网络连接的通信时序图;
图10是图8所示实施例的任务处理装置中的网络连接管理模块管理网络连接的流程图。
具体实施方式
为使本申请的上述目的、特征和优点能够更加明显易懂,下面结合附图和具体实施方式对本申请作进一步详细的说明。
实施例一
参照图2,示出了根据本申请实施例一的一种任务处理方法的步骤流程图。
本实施例的任务处理方法包括以下步骤:
步骤S100:设置一个或多个用于存储所述分布式消息队列中各个组件之间的网络连接的信息的连接池;
步骤S102:在多道流水线处理系统的分布式消息队列中的至少一个组件接收到任务消息。
任务消息是用于进行相应任务处理的消息,可以是包括具体任务的消息,也可以是包括任务处理结果的消息。
多道流水线处理系统的分布式消息队列中可以包括多个组件,如消息发布组件、消息中转组件和消息处理组件。在接收到任务消息进行任务处理时,需要依赖于组件之间的网络连接,如消息发布组件通过与消息中转组件之间的网络连接,将任务发布到消息中转组件;消息中转组件接收任务后,经过调度,将任务发送到与其有网络连接的消息处理组件上;消息处理组件对任务进行处理后,再通过与消息中转组件之间的网络连接,将任务处理结果返回给消息中转组件,由消息中转组件进行后续处理。
连接池可以设置于分布式消息队列的各个组件中,以存储本组件和与本组件与直接连接关系的其它组件之间的网络连接信息,也可以存储本组件与分布式消息队列中的其它组件之间的直接或间接的网络连接信息;连接池也可以设置于独立于分布式消息队列的各个组件之外的适当位置,以存储有直接连接关系的组件之间的网络连接信息,或者存储各个组件之间的直接或间接的网络连接的信息。
步骤S104:根据网络连接的信息,从连接池中选择一个有效的网络连接。
当组件接收到任务消息后,根据该任务消息所要处理的任务,从连接池中选择一个有效的网络连接。
比如,消息发布组件接收到一个任务消息,确定该消息需要向消息中转组件发布,则从连接池中选择一个消息发布组件和消息中转组件之间的有产的网络连接,然后,通过该网络连接向消息中转组件发布该任务消息。或者,消息发布组件从连接池中选择一个从消息发布组件到消息中转组件再到消息处理组件之间的有效网络连接,通过该网络连接,先将任务消息发布到消息中转组件,再由消息中转组件通过该连接,将任务消息交给任务处理组件处理。
步骤S106:使用选择的有效的网络连接发送任务消息,进行任务消息所请求的任务处理。
本实施例通过在多道流水线处理系统中设置连接池,对分布式消息队列中各个组件之间的网络连接情况进行管理,当有任务消息到达时,多道流水线处理系统会根据连接池中的网络连接情况为该消息分配有效的网络连接。与现有技术中,当网络连接出现故障时,如网络中的某个节点失效,多道流水线处理系统却不能获知,而仍然多次尝试该网络连接相比,本申请的多道流水线处理系统通过对连接池的管理即可及时获知网络连接情况,在网络中出现某个或某些节点失效时,放弃使用这些失效节点的网络连接,而选用正常节点进行有效的网络连接,这一方面使得系统可以及时侦测到已经失效的网络连接,另一方面保证了失效的节点对于任务消息的传递的影响到最低,从而有效避免了目前的分布式消息队列系统中,因节点失效造成集群处理能力下降的问题,有效保证了多道流水线处理系统的集群处理能力。
实施例二
参照图3,示出了根据本申请实施例二的一种任务处理方法的步骤流程图。
本实施例中,连接池设置于分布式消息队列的各个组件中,每个组件的连接池用于存储本组件与其它组件之间的网络连接的信息。
本实施例中,分布式消息队列包括消息发布组件、消息中转组件和消息处理组件,设定消息发布组件接收到一个任务消息,选择网络连接进行任务处理。需要说明的是,分布式消息队列中的多个组件接收到任务消息时,各个组件对任务消息的处理,均可参照本实施例进行。
本实施例的任务处理方法包括以下步骤:
步骤S202:多道流水线处理系统的分布式消息队列中的消息发布组件接收到一个任务消息。
步骤S204:消息发布组件根据其连接池中的网络连接信息,判断其连接池中是否存在有效的网络连接,若存在,则执行步骤S206;若不存在,则执行步骤S220。
网络连接信息是描述本组件与其它组件网络连接状况的信息,可以包括流水线名字、目的地址端口号、本地端口号,以及网络连接的有效或无效状态等。本实施例中,消息发布组件的连接池中包括了所有消息发布组件与消息中转组件之间的网络连接的信息。
与此类似,消息中转组件的连接池中则包括了所有消息中转组件与消息发布组件之间的、以及消息中转组件与消息处理组件之间的网络连接的信息;而消息处理组件的连接池中则包括了所有消息处理组件与消息中转组件之间的网络连接的信息。
通过判断连接池中是否存在有效的网络连接,可以使组件快速了解其与其它组件的当前连接状况,以保障网络连接的正常,为任务消息的正常处理提供有效保障。
步骤S206:消息发布组件从所有有效的网络连接中选择一个网络连接。
步骤S208:消息发布组件使用选择的网络连接将任务消息发送到消息中转组件。
消息中转组件接收到该任务消息后,进行适当的调度,交给消息处理组件进行相应的任务处理。
步骤S210:消息发布组件判断使用选择的网络连接发送任务消息是否成功,若成功,则执行步骤S212;若失败,则执行步骤S214。
步骤S212:消息发布组件将发送任务消息的网络连接返回给其连接池,并判断该网络连接的当前连接状态,根据判断结果,将该网络连接设置为有效的网络连接或者无效的网络连接,结束本次任务消息发送过程。
当任务消息发送成功后,及时将网络连接资源返回给连接池,能够使其它任务消息有足够的网络连接资源可用;而通过判断返回的网络连接的当前连接状态,确定该网络连接的有效或失效,则向组件提供了网络连接的及时的、可靠的信息,以保障任务的高效处理。
步骤S214:消息发布组件重新选择一个有效的网络连接发送任务消息,并中断之前选择的网络连接,释放该网络连接的资源,将该网络连接设置为失效的网络连接;并且,记录该网络连接中断的时间点,在设定的时间间隔内,不再使用该网络连接发送任何任务消息。
时间间隔由本领域技术人员依据网络状况,如网络带宽或者该网络连接经过的交换机的数量等,灵活设置,本申请对此不作限制。
步骤S216:当经过设定的时间间隔后,消息发布组件接收到新的任务消息,尝试恢复中断的网络连接。
步骤S218:判断尝试恢复中断的网络连接是否成功,若成功,则将该网络连接设置为有效的网络连接,返回步骤S204;若失败,则重新设置该网络连接中断的时间点,在设定的时间间隔内,不再使用该网络连接发送任何任务消息,返回步骤S216。
重连时间间隔采用固定的时间间隔,以方便操作和实现。因此,本步骤中的设定的时间间隔与步骤S214中的时间间隔一致。
在大规模分布式集群中,连接失效的原因有很多,如网络的暂时故障。如何有效的恢复这些失效连接,成为了管理的重要的问题。本申请中,引入了重连时间的设定,即设定一个时间间隔。当一个连接失效后,会按照失效时间点向后,选择一个重连时间,在重连时间之前的请求,将不会尝试对连接进行恢复,以减少连接重试的开销。当有请求在重连时间之后到来,将尝试恢复该网络连接,若恢复失败,则再次设置重连时间;若成功,则将该连接返回提供给消息组件进行通信。
步骤S220:将消息发布组件的连接池中所有的网络连接的中断时间点设置为当前时间点,并尝试恢复该连接池中所有的网络连接;然后,返回步骤S204。
当连接池中所有连接都已经失效时,将取消所有连接的重连时间,因为这时消息发布组件已经无法提供任何连接供通信使用,消息发布组件已经无法正常完成流水线的处理,此时应该尽快恢复网络连接,保证任务的正常处理。优选地,将连接池中所有的网络连接的中断时间点设置为当前时间点,方便中断时间点设置,及后续间隔时间计算,便于实现。
通过本实施例,有效保证了任务消息的发送,进而保证了任务的处理,避免了因节点失效造成分布式消息队列系统的集群处理能力下降的问题,保证了多道流水线处理系统的集群处理能力。
需要说明的是,本实施例仅以消息发布组件为例,对任务消息的处理作以说明,但本领域技术人员应当明了,分布式消息队列的其它组件均可参照本实施例对任务消息进行了相应地处理。
实施例三
参照图4,示出了根据本申请实施例三的一种任务处理方法的步骤流程图。
本实施例在图3所示实施例的基础上,在各个组件上设置负载均衡算法,增加负载均衡功能,以对网络连接或任务消息进行负载均衡。并且,将分布式消息队列的各个组件中的连接池分别封装为网络连接循环队列,同时在消息中转组件中设置消息优先级队列。也就是说,消息中转组件中设置有网络连接循环队列和消息优先级队列,而消息发布组件和消息处理组件因不会临时存储任务消息,所以这两个组件中仅设置有网络连接循环队列,而没有消息优先级队列。
本实施例以消息中转组件为例,对其根据负载均衡算法进行消息处理及网络连接选择进行说明。需要说明的是,因消息发布组件和消息处理组件仅设置有网络连接循环队列,因此,可以仅参照本实施例的消息中转组件的网络连接选择方法进行负载均衡。
本实施例的任务处理方法包括以下步骤:
步骤S302:消息发布组件将任务消息传递至消息中转组件。
其中,消息发布组件根据设定的负载均衡算法,从其自身的网络连接循环队列中选择一个有效的网络连接,将任务消息传递给消息中转组件。
本实施例中,设定各个组件均使用LRU(Least Recently Used)算法进行网络连接的负载均衡。在选择有效网络连接时,将从网络连接循环队列的头部提取一个有效的网络连接,返回给组件进行通信。当通信完成后,该连接将会被放入网络连接循环队列的尾部。网络连接循环队列初始化状态时的示意图和申请连接后状态的示意图分别如图5和图6所示。
通过这种方式,实现了一个完整的LRU算法,并使得所有的组件都可以使用该算法进行连接的管理,以实现全局的负载均衡。使用LRU算法,实现简单,且查找有效网络连接快速高效。当然,不限于此,本领域技术人员在实际时,可以采用任意适当的负载均衡算法,本申请对此不作限制。
步骤S304:消息中转组件依次将任务消息插入消息优先级队列的尾部,并将该任务消息设置为可见状态。
步骤S306:消息中转组件根据设定的负载均衡算法,从消息优先级队列中选择一个可见状态的任务消息。
本实施例中,设定消息优先级队列中先进先出FIFO方式的队列。消息中转组件从消息优先级队列头部开始,向后查找第一个可见状态的任务消息,作为当前待处理的任务消息。
消息优先级队列是为了保证任务消息可以可靠、公平的进行处理而设计的。该队列使用先进先出(FIFO)的方式,对任务消息进行管理,这保证了消息在时序上的公平性,即先提交的任务,可以得到优先的处理,同时也不影响后续提交的任务的正确调度。
步骤S308:消息中转组件将任务消息发送至消息处理组件。
本步骤中,消息中转组件根据LRU算法,从自身的网络连接循环队列中选择一个有效的网络连接,将任务消息发送给消息处理组件。
步骤S310:消息中转组件将发送出去的任务消息设置为不可见状态,并等待消息处理组件的回复。
步骤S312:消息中转组件根据消息处理处理的回复,对该任务消息进行处理;然后,返回步骤S306。
如果任务处理组件处理该任务消息成功,消息中转组件将删除该任务消息;如果任务处理组件处理该任务消息不成功,消息中转组件将重新将该任务消息设置为可见状态。
本实施例重点对分布式消息队列中各个组件的负载均衡进行了说明,而对网络连接的选择和处理部分描述的较为简单,本领域技术人员在具体实施时,可以参照实施例二中的相关方法进行有效网络连接的选择和后续相关处理,在此不再赘述。
在大规模分布式环境下,单个节点的能力是有限的,通常只能够满足某一类任务性能要求的几分之一,甚至几十分之一,这就要求整个的多道流水线处理系统,能够充分利用目前所有已知节点进行任务的处理。目前的各类分布式消息队列系统,对于负载均衡的问题,都没有给出合理的解决方案,多数只是依赖于系统的调度策略进行调节。这样将造成部分节点负载过高,但系统仍然没有充分利用所有处理节点的处理能力,大量节点的能力被浪费。通过本实施例,不但解决了分布式消息队列中,因节点失效造成集群处理能力下降的问题,而且解决了分布式消息队列的负载均衡问题,使得整个系统中的全部处理任务能够均匀地分配到每个处理节点中。
实施例四
参照图7,示出了根据本申请实施例四的一种任务处理装置的结构框图。
本实施例的任务处理装置包括:连接池,用于存储所述分布式消息队列中各个组件之间的网络连接的信息;接收模块402,用于在多道流水线处理系统的分布式消息队列中的至少一个组件中接收任务消息;选择模块404,用于根据网络连接的信息,从连接池中选择一个有效的网络连接;连接模块406,用于使用选择的有效的网络连接发送任务消息,进行任务消息所请求的任务处理。
优选地,分布式消息队列的每个组件中均设置有连接池,每个组件的连接池用于存储本组件与其它组件之间的网络连接的信息;选择模块404,用于根据网络连接的信息,从接收到任务消息的组件的连接池中选择一个有效的网络连接。
优选地,本实施例的任务处理装置还包括:失败模块408,用于若连接模块406使用选择的有效的网络连接发送任务消息失败,则重新选择一个有效的网络连接发送任务消息,并中断失败的网络连接并释放其资源,将该网络连接设置为失效的网络连接;记录该网络连接中断的时间点,在设定的时间间隔内,不再使用该网络连接发送任何任务消息。
优选地,本实施例的任务处理装置还包括:重连模块410,用于当经过设定的时间间隔后,接收模块402接收到新的任务消息,尝试恢复中断的网络连接。
优选地,本实施例的任务处理装置还包括:重连设置模块412,用于若重连模块410尝试恢复中断的网络连接失败,则重新设置该网络连接中断的时间点,在设定的时间间隔内,不再使用该网络连接发送任何任务消息;若尝试恢复中断的网络连接成功,则将该网络连接设置为有效的网络连接。
优选地,本实施例的任务处理装置还包括:判断模块414,用于在选择模块404根据网络连接的信息,从连接池中选择一个有效的网络连接之前,根据网络连接的信息,判断连接池中是否存在有效的网络连接;若存在,则执行选择模块404;若不存在,将连接池中所有的网络连接的中断时间点设置为当前时间点,并尝试恢复连接池中所有的网络连接。
优选地,本实施例的任务处理装置还包括:成功模块416,用于当连接模块406使用选择的有效的网络连接发送任务消息成功完成后,将该网络连接返回给连接池,并判断该网络连接的当前连接状态,根据判断结果,将该网络连接设置为有效的网络连接或者无效的网络连接。
优选地,本实施例的任务处理装置还包括:第一设置模块418,用于在接收模块402接收任务消息之前,在分布式消息队列的各个组件中设置负载均衡算法。
优选地,选择模块404,用于根据网络连接的信息,按照负载均衡算法从接收到任务消息的组件的连接池中选择一个有效的网络连接。
优选地,分布式消息队列包括消息发布组件、消息中转组件和消息处理组件;本实施例的任务处理装置还包括:第二设置模块420,用于在接收模块402接收任务消息之前,将分布式消息队列的各个组件中的连接池分别封装为网络连接循环队列,并且在消息中转组件中设置消息优先级队列。
优选地,选择模块404,用于根据网络连接的信息,从接收到任务消息的组件的网络连接循环队列的头部开始,选择一个有效的网络连接;本实施例的任务处理装置还包括:回归模块(图中未示出),用于当连接模块406使用选择的有效的网络连接发送任务消息成功完成后,将该网络连接返回网络连接循环队列的尾部。
优选地,消息中转组件中的消息优先级队列为先进先出方式的消息优先级队列;接收模块402,用于消息中转组件从消息优先级队列的头部开始,向后查找第一个可见状态的任务消息作为当前待处理的任务消息。
本实施例的任务处理装置用于实现前述多个方法实施例中相应的任务处理方法,并具有相应的任务处理方法的有益效果,在此不再赘述。
实施例五
参照图8,示出了根据本申请实施例五的一种任务处理装置的结构框图。
本实施例中,分布式消息队列的各个组件中均设置有连接池,每个组件的连接池用于存储本组件与其它组件之间的网络连接的信息;并且,在各个组件上设置有负载均衡算法,以对网络连接或任务消息进行负载均衡。
本实施例的任务处理装置主要包括:网络连接管理模块502和负载均衡模块504。
其中:
网络连接管理模块502负责所有组件中的网络连接管理,可以设置在各个组件中,也可以设置在各组件之外,主要功能在于监控网络连接的状态,以及对于失效连接的管理(其功能包括了实施例四中的选择模块、连接模块、失败模块、成功模块、重连模块和重连设置模块等模块的功能)。其监控与管理都是通过各组件中建立的连接池,按顺序将当前组件中的所有网络连接都放在连接池中。
对于网络连接管理模块502的网络连接管理功能,当有网络通信需要进行时,组件通过相应接口请求一个网络连接以供通信使用,网络连接管理模块502将在组件的连接池中选择一个可用连接(即有效的网络连接)并返回。当通信成功完成时,组件将把该连接返回给网络连接管理模块502管理的连接池;若通过该连接进行通信失败,则系统会通知网络连接管理模块502。之后,网络连接管理模块502会分析失败原因,并将该失效连接中断以释放资源,同时还会记录下该次中断的时间点,在一定时间间隔内,网络连接管理模块502将不再使用该网络连接进行通信,防止单点失效问题,引起的性能下降。网络连接管理模块502对网络连接进行管理的时序图如图9所示。可见,当分布式消息队列的组件,如消息发布组件、消息中转组件或消息处理组件,在需要网络连接时,向网络连接管理模块502发送请求连接的请求;网络连接管理模块502在接收到请求后,从连接池中选择一个有效的网络连接,返回给请求的组件;组件在使用网络连接管理模块502返回的有效网络连接进行通信后,将该连接再次返回给网络连接管理模块502;此后,网络连接管理模块502判断返回的该连接的当前连接状态,以便更有效地管理组件间的网络连接。
网络连接管理模块502的第二个主要功能是管理失效连接,在大规模分布式集群中,连接失效的原因有很多,如网络的暂时故障。如何有效的恢复这些失效连接,成为了管理的重要的问题。在这里,本申请引入了重连时间的设定,当一个连接失效后,会按照失效时间点向后,选择一个重连时间,在重连时间之前的请求,网络连接管理模块502将不会尝试对连接进行恢复,以减少连接重试的开销。当有请求在重连时间之后到来,网络连接管理模块502将尝试恢复该网络连接,若恢复失败,则再次设置重连时间;若成功,则将该连接返回提供给消息组件进行通信。
一个网络连接管理模块502设置重连时间的接口的示例如下所示:
消息组件可以通过该接口设定连接时的超时等待时间与重连的时间间隔。具体设定时,不同的业务可以根据自身的需求和网络状态进行设置。例如,在同一个网络区段内,可以将timeout时间缩短,以减小网络重连失败时的开销;当网络波动比较频繁时,可以将reconnect_interval时间缩短,以尽快恢复网络连接,避免当所有连接失效时,重连所有连接造成的大量开销。
此外,在接口设计中,本申请将恢复的流程集成到了网络连接管理模块502的接口内部,这样可以保证,在组件不进行通信时,网络连接管理模块502不会进行不必要的尝试连接,进一步降低了恢复的开销。需要特别注意的是,当连接池中所有连接都已经失效时,将取消所有连接的重连时间,因为这时网络连接管理模块502已经无法提供任何连接供消息组件进行通信,消息组件已经无法正常完成流水线的处理,此时应该尽快恢复网络连接,保证任务的正常处理。
本实施例中,网络连接管理模块502实现网络管理的流程如图10所示,包括:
步骤S602:网络连接管理模块502接收到请求网络连接的请求。
步骤S604:网络连接管理模块502检查相应组件的连接池中的下一个连接。
步骤S606:网络连接管理模块502检查该连接是否有效,若是,则执行步骤S608;若否,则执行步骤S610。
步骤S608:网络连接管理模块502向组件返回该连接,本次连接选择过程结束。
步骤S610:网络连接管理模块502判断该连接的重连时间是否到达,若是,则执行步骤S612;若否,则执行步骤S618。
步骤S612:网络连接管理模块502尝试恢复该连接。
步骤S614:网络连接管理模块502判断尝试连接是否成功,若是,则返回步骤S608;若否,则执行步骤S616。
步骤S616:网络连接管理模块502重新为该连接设置重连时间,然后,返回步骤S604。
步骤S618:网络连接管理模块502判断是否已检查连接池中的所有连接,若是,执行步骤S620;若否,则返回步骤S604。
步骤S620:网络连接管理模块502重新设置连接池中所有连接的重连时间。
通过上述过程,网络连接管理模块502实现了组件传递任务消息的有效网络连接,解决了现有分布式消息队列中节点失效带来的处理能力下降的问题。
然而,在大规模集群系统中,即使能解决节点失效问题,但如果大量的任务集中在同一个组件中进行,则该组件即使使用连接池和网络连接管理模块502对网络连接进行管理,也无法保证连接的充分可用和有效。为此,本实施例的任务处理装置中还设置了负载均衡模块504。
通过图1分布式消息队列的通信时序图可以知道,分布式消息队列的组件之间的网络通信共有3组传输方向:消息发布组件→消息中转组件;消息中转组件→消息处理组件;消息处理组件→消息中转组件。任何一组通信的负载不均衡,都将对系统的处理能力造成严重的影响。
例如:消息发布组件→消息中转组件的负载不均衡时,将造成大量消息发送至某些消息中转组件,使得这些中转组件过于繁忙,无法及时提供响应;空闲的中转组件则没有消息进行调度。这时,消息中转组件将限制系统总体的吞吐能力。消息中转组件→消息处理组件的负载不均衡时,会造成中转组件将大量的消息发送给某些消息处理组件,使得这些处理组件过于繁忙;而另一部分处理组件无法获取到中转组件中的消息进行处理,一直处理等待状态。这一现象会从整体上降低处理集群的处理能力,即使进行水平扩展,增加处理单元的数量,但由于新加入的节点也无法获取到中转组件上的消息,而造成水平扩展的失效。消息处理组件→消息中转组件的负载不均衡,与消息发布组件→消息中转组件的现象相似。消息处理组件只优先处理某些消息中转组件上的任务消息,当这些中转组件上出现任何消息时,都有大量的处理组件可以调度;而另一部分中转组件上的任务消息,则无法得到及时响应,造成了任务处理的延时。
针对以上问题,本实施例在所有消息组件中,都设置了负载均衡模块504(其功能包括实施例四中第一、第二设置模块和回归模块等模块的功能),加入了负载均衡算法,从而保证了所有消息发布组件发出的任务消息,都能够公平地被调度到所有的消息处理组件上。
为此,本实施例设计了2个队列系统,一个是消息优先级队列,一个是网络连接循环队列。消息发布组件与消息处理组件,因为不会临时存储任务消息,所以没有消息优先级队列,仅有网络连接的循环队列;消息中转组件则同时包含了消息优先级队列和网络连接循环队列。
在实现中,本实施例将上文中已经实现的网络连接池,利用循环队列进行了封装,形成网络连接循环队列。当有任何组件需要进行网络通信时,负载均衡模块504将从网络连接循环队列的头部提取一个连接,返回给组件进行通信。当通信完成后,该连接将会被放入网络连接循环队列的尾部。通过这种方式,本实施例实现了一个完整的LRU算法,并使得所有的组件都可以使用该算法进行连接的管理,以实现全局的负载均衡。网络连接循环队列初始化状态的示意图和申请连接后状态的示意图如图5和图6所示。
消息优先级队列是为了保证消息可以可靠、公平的进行处理而设计的。该队列使用先进先出(FIFO)的方式,对消息进行管理,这保证了消息在时序上的公平性,即先提交的任务,可以得到优先的处理,同时也不影响后续提交的任务的正确调度。
在使用上述消息优先级队列对任务消息进行调度时,先由消息发布组件将任务消息传递至消息中转组件;消息中转组件依次将该消息插入优先级队列的尾部,并将该消息设置为可见状态;消息中转组件的负载均衡模块504从优先级队列头部开始,向后查找第一个可见状态的消息,之后发送给相应的消息处理组件;消息中转组件将发送出去的消息设置为不可见状态,并等待处理组件的回复;如果处理成功,消息中转组件将删除该消息;如果处理不成功,消息中转组件将重新将该消息设置为可见状态。至此,一个任务消息在消息中转组件中的调度处理完成。
本实施例针对分布式环境下的特点,提供了解决大规模集群情况下,进行均衡任务负载,以及在机器出现故障时,单点失效造成集群处理能力下降的问题的方案。通过本实施例,在消息组件中对于网络连接进行了管理,一方面使得系统可以及时侦测到已经失效的网络连接,另一方面保证失效的节点对于任务消息的传递的影响到最低;通过可配置的接口,让使用者可以按照需求来控制对于失效节点进行恢复的策略;在消息发布组件、消息中转组件与消息处理组件中,分别实现负载均衡算法,使得整个系统中的全部处理任务能够均匀地分配到每个处理节点中。
本申请的任务处理方案可以在大规模分布式集群中,实现任务的多道流水线式的处理,并针对实际使用中的单点失效问题、负载均衡问题进行了详细地分析与设计,保证了集群处理能力的充分利用。
本申请的任务处理方案支持集群并发处理模式,能够动态扩展系统到1000+以上的服务;平台无I/O需求,仅依赖CPU、MEM、NETWORK,数据吞吐量仅局限于节点的数量和网络I/O能力;基于开源软件gearmand改进而来,解决了原版本的一些BUG和性能瓶颈;处理模块支持“热插拔模式”,支持不同业务流程快速搭建需求。
本申请的技术方案解决了海量实时数据处理的问题,解决了样本数据上传后,需要快速经过存储、鉴定、分拣、计算RANK等环节处理的问题。能够支持每日2~4T bytes/day的样本数据,达到平均延迟不大于30s的处理能力。可广泛地应用在360的云计算平台上,为木马云查杀引擎、主动防御系统、网盾安全系统、用户推荐系统等产品,提供了高效、可靠的数据分析服务。
本说明书中的各个实施例均采用递进的方式描述,每个实施例重点说明的都是与其他实施例的不同之处,各个实施例之间相同相似的部分互相参见即可。对于装置实施例而言,由于其与方法实施例基本相似,所以描述的比较简单,相关之处参见方法实施例的部分说明即可。
以上对本申请所提供的一种任务处理方法和装置进行了详细介绍,本文中应用了具体个例对本申请的原理及实施方式进行了阐述,以上实施例的说明只是用于帮助理解本申请的方法及其核心思想;同时,对于本领域的一般技术人员,依据本申请的思想,在具体实施方式及应用范围上均会有改变之处,综上所述,本说明书内容不应理解为对本申请的限制。
本发明实施例公开了A1、一种任务处理方法,包括:
设置一个或多个用于存储所述分布式消息队列中各个组件之间的网络连接的信息的连接池;
在分布式消息队列中的至少一个组件接收任务消息;根据所述网络连接的信息,从所述连接池中选择一个有效网络连接;
使用选择的所述有效网络连接发送所述任务消息,进行所述任务消息所请求的任务处理。
A2、根据权利要求A1所述的方法,所述分布式消息队列的每个组件中均设置有连接池,每个所述组件的连接池用于存储本组件与其它组件之间的网络连接的信息;
所述根据所述网络连接的信息,从所述连接池中选择一个有效的网络连接的步骤包括:根据所述网络连接的信息,从接收到所述任务消息的组件的所述连接池中选择一个有效的网络连接。
A3、根据权利要求A1或A2所述的方法,还包括:
若使用选择的所述有效的网络连接发送所述任务消息失败,则重新选择一个有效的网络连接发送所述任务消息,并中断失败的网络连接并释放其资源,将该网络连接设置为失效的网络连接;记录该网络连接中断的时间点,在设定的时间间隔内,不再使用该网络连接发送任何任务消息。
A4、根据权利要求A3所述的方法,还包括:
当经过所述设定的时间间隔后,接收到新的任务消息,则尝试恢复所述中断的网络连接。
A5、根据权利要求A4所述的方法,还包括:
若尝试恢复所述中断的网络连接失败,则重新设置该网络连接中断的时间点,在所述设定的时间间隔内,不再使用该网络连接发送任何任务消息;若尝试恢复所述中断的网络连接成功,则将该网络连接设置为有效的网络连接。
A6、根据权利要求A1所述的方法,在所述根据所述网络连接的信息,从所述连接池中选择一个有效的网络连接的步骤之前,还包括:
根据所述网络连接的信息,判断所述连接池中是否存在有效的网络连接;
若存在,则执行所述根据所述网络连接的信息,从所述连接池中选择一个有效的网络连接的步骤;
若不存在,将所述连接池中所有的网络连接的中断时间点设置为当前时间点,并尝试恢复所述连接池中所有的网络连接。
A7、根据权利要求A1所述的方法,还包括:
当使用选择的所述有效的网络连接发送所述任务消息成功完成后,将该网络连接返回给所述连接池,并判断该网络连接的当前连接状态,根据判断结果,将该网络连接设置为有效的网络连接或者无效的网络连接。
A8、根据权利要求A2所述的方法,在所述多道流水线处理系统的分布式消息队列中的至少一个组件接收到任务消息的步骤之前,还包括:
在所述分布式消息队列的各个组件中设置负载均衡算法。
A9、根据权利要求A8所述的方法,所述根据所述网络连接的信息,从接收到所述任务消息的组件的所述连接池中选择一个有效的网络连接的步骤包括:
根据所述网络连接的信息,按照所述负载均衡算法从接收到所述任务消息的组件的所述连接池中选择一个有效的网络连接。
A10、根据权利要求A9所述的方法,所述分布式消息队列包括消息发布组件、消息中转组件和消息处理组件;
在所述多道流水线处理系统的分布式消息队列中的至少一个组件接收到任务消息的步骤之前,还包括:将所述分布式消息队列的各个组件中的连接池分别封装为网络连接循环队列,并且在所述消息中转组件中设置消息优先级队列。
A11、根据权利要求A10所述的方法,
所述根据所述负载均衡算法和所述网络连接的信息,从接收到所述任务消息的组件的所述连接池中选择一个有效的网络连接的步骤包括:根据所述网络连接的信息,从接收到所述任务消息的组件的所述网络连接循环队列的头部开始,选择一个有效的网络连接;
所述方法还包括:当使用选择的所述有效的网络连接发送所述任务消息成功完成后,将该网络连接返回所述网络连接循环队列的尾部。
A12、根据权利要求A10所述的方法,所述消息中转组件中的所述消息优先级队列为先进先出方式的消息优先级队列;
所述多道流水线处理系统的分布式消息队列中的至少一个组件接收到任务消息的步骤包括:所述消息中转组件从所述消息优先级队列的头部开始,向后查找第一个可见状态的任务消息作为当前待处理的任务消息。
B13、一种任务处理装置,包括:
连接池,用于存储所述分布式消息队列中各个组件之间的网络连接的信息;
接收模块,用于在分布式消息队列中的至少一个组件中接收任务消息;
选择模块,用于根据所述网络连接的信息,从所述连接池中选择一个有效网络连接;
连接模块,用于使用选择的所述有效网络连接发送所述任务消息,进行所述任务消息所请求的任务处理。
B14、根据权利要求B 13所述的装置,所述分布式消息队列的每个组件中均设置有连接池,每个所述组件的连接池用于存储本组件与其它组件之间的网络连接的信息;
所述选择模块,用于根据所述网络连接的信息,从接收到所述任务消息的组件的所述连接池中选择一个有效的网络连接。
B15、根据权利要求B 13或B 14所述的装置,还包括:
失败模块,用于若所述连接模块使用选择的所述有效的网络连接发送所述任务消息失败,则重新选择一个有效的网络连接发送所述任务消息,并中断失败的网络连接并释放其资源,将该网络连接设置为失效的网络连接;记录该网络连接中断的时间点,在设定的时间间隔内,不再使用该网络连接发送任何任务消息。
B16、根据权利要求B 15所述的装置,还包括:
重连模块,用于当经过所述设定的时间间隔后,所述接收模块接收到新的任务消息,尝试恢复所述中断的网络连接。
B17、根据权利要求B 16所述的装置,还包括:
重连设置模块,用于若所述重连模块尝试恢复所述中断的网络连接失败,则重新设置该网络连接中断的时间点,在所述设定的时间间隔内,不再使用该网络连接发送任何任务消息;若尝试恢复所述中断的网络连接成功,则将该网络连接设置为有效的网络连接。
B18、根据权利要求B 13所述的装置,还包括:
判断模块,用于在所述选择模块根据所述网络连接的信息,从所述连接池中选择一个有效的网络连接之前,根据所述网络连接的信息,判断所述连接池中是否存在有效的网络连接;若存在,则执行所述选择模块;若不存在,将所述连接池中所有的网络连接的中断时间点设置为当前时间点,并尝试恢复所述连接池中所有的网络连接。
B19、根据权利要求B 13所述的装置,还包括:
成功模块,用于当所述连接模块使用选择的所述有效的网络连接发送所述任务消息成功完成后,将该网络连接返回给所述连接池,并判断该网络连接的当前连接状态,根据判断结果,将该网络连接设置为有效的网络连接或者无效的网络连接。
B20、根据权利要求B 14所述的装置,还包括:
第一设置模块,用于在所述接收模块接收任务消息之前,在所述分布式消息队列的各个组件中设置负载均衡算法。
B21、根据权利要求B 20所述的装置,所述选择模块,用于根据所述网络连接的信息,按照所述负载均衡算法从接收到所述任务消息的组件的所述连接池中选择一个有效的网络连接。
B22、根据权利要求B 21所述的装置,所述分布式消息队列包括消息发布组件、消息中转组件和消息处理组件;
所述装置还包括:第二设置模块,用于在所述接收模块接收任务消息之前,将所述分布式消息队列的各个组件中的连接池分别封装为网络连接循环队列,并且在所述消息中转组件中设置消息优先级队列。
B23、根据权利要求B 22所述的装置,
所述选择模块,用于根据所述网络连接的信息,从接收到所述任务消息的组件的所述网络连接循环队列的头部开始,选择一个有效的网络连接;
所述装置还包括:回归模块,用于当所述连接模块使用选择的所述有效的网络连接发送所述任务消息成功完成后,将该网络连接返回所述网络连接循环队列的尾部。
B24、根据权利要求B 22所述的装置,所述消息中转组件中的所述消息优先级队列为先进先出方式的消息优先级队列;
所述接收模块,用于所述消息中转组件从所述消息优先级队列的头部开始,向后查找第一个可见状态的任务消息作为当前待处理的任务消息。