CN105808334A - MapReduce short job optimization system and method based on resource reuse - Google Patents
MapReduce short job optimization system and method based on resource reuse Download PDFInfo
- Publication number
- CN105808334A CN105808334A CN201610124760.2A CN201610124760A CN105808334A CN 105808334 A CN105808334 A CN 105808334A CN 201610124760 A CN201610124760 A CN 201610124760A CN 105808334 A CN105808334 A CN 105808334A
- Authority
- CN
- China
- Prior art keywords
- task
- node
- tasks
- running
- short
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Granted
Links
- 238000000034 method Methods 0.000 title claims abstract description 121
- 238000005457 optimization Methods 0.000 title claims abstract description 27
- 230000008569 process Effects 0.000 claims description 105
- 238000012545 processing Methods 0.000 claims description 21
- 238000012544 monitoring process Methods 0.000 claims description 10
- 238000004364 calculation method Methods 0.000 claims description 8
- 238000012163 sequencing technique Methods 0.000 claims 1
- 238000004064 recycling Methods 0.000 abstract description 5
- 238000013468 resource allocation Methods 0.000 abstract description 3
- 238000004422 calculation algorithm Methods 0.000 description 20
- 230000007246 mechanism Effects 0.000 description 8
- 239000002699 waste material Substances 0.000 description 7
- 230000000694 effects Effects 0.000 description 6
- 238000012360 testing method Methods 0.000 description 6
- 238000002474 experimental method Methods 0.000 description 5
- 230000003862 health status Effects 0.000 description 5
- 230000008901 benefit Effects 0.000 description 4
- 238000013461 design Methods 0.000 description 4
- 241000287127 Passeridae Species 0.000 description 3
- 230000008859 change Effects 0.000 description 3
- 235000003642 hunger Nutrition 0.000 description 3
- 238000012546 transfer Methods 0.000 description 3
- 241000252506 Characiformes Species 0.000 description 2
- 238000004458 analytical method Methods 0.000 description 2
- 238000006243 chemical reaction Methods 0.000 description 2
- 238000010586 diagram Methods 0.000 description 2
- 238000005516 engineering process Methods 0.000 description 2
- 230000036541 health Effects 0.000 description 2
- 230000002452 interceptive effect Effects 0.000 description 2
- 238000004519 manufacturing process Methods 0.000 description 2
- 238000011084 recovery Methods 0.000 description 2
- 238000011160 research Methods 0.000 description 2
- 230000037351 starvation Effects 0.000 description 2
- 102100030386 Granzyme A Human genes 0.000 description 1
- 101001009599 Homo sapiens Granzyme A Proteins 0.000 description 1
- 230000002159 abnormal effect Effects 0.000 description 1
- 230000009286 beneficial effect Effects 0.000 description 1
- 230000005540 biological transmission Effects 0.000 description 1
- 238000004891 communication Methods 0.000 description 1
- 238000007405 data analysis Methods 0.000 description 1
- 230000007423 decrease Effects 0.000 description 1
- 230000005611 electricity Effects 0.000 description 1
- 210000000609 ganglia Anatomy 0.000 description 1
- 238000012986 modification Methods 0.000 description 1
- 230000004048 modification Effects 0.000 description 1
- 230000002035 prolonged effect Effects 0.000 description 1
- 230000004044 response Effects 0.000 description 1
- 238000003860 storage Methods 0.000 description 1
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/48—Program initiating; Program switching, e.g. by interrupt
- G06F9/4806—Task transfer initiation or dispatching
- G06F9/4843—Task transfer initiation or dispatching by program, e.g. task dispatcher, supervisor, operating system
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F9/00—Arrangements for program control, e.g. control units
- G06F9/06—Arrangements for program control, e.g. control units using stored programs, i.e. using an internal store of processing equipment to receive or retain programs
- G06F9/46—Multiprogramming arrangements
- G06F9/50—Allocation of resources, e.g. of the central processing unit [CPU]
- G06F9/5005—Allocation of resources, e.g. of the central processing unit [CPU] to service a request
- G06F9/5027—Allocation of resources, e.g. of the central processing unit [CPU] to service a request the resource being a machine, e.g. CPUs, Servers, Terminals
- G06F9/5038—Allocation of resources, e.g. of the central processing unit [CPU] to service a request the resource being a machine, e.g. CPUs, Servers, Terminals considering the execution order of a plurality of tasks, e.g. taking priority or time dependency constraints into consideration
Landscapes
- Engineering & Computer Science (AREA)
- Software Systems (AREA)
- Theoretical Computer Science (AREA)
- Physics & Mathematics (AREA)
- General Engineering & Computer Science (AREA)
- General Physics & Mathematics (AREA)
- Debugging And Monitoring (AREA)
- Management, Administration, Business Operations System, And Electronic Commerce (AREA)
Abstract
本发明公开了一种基于资源重用的MapReduce短作业优化系统及方法;系统包括:主节点、一级从节点和若干个二级从节点,其中主节点与一级从节点连接,一级从节点与若干个二级从节点连接;所述主节点上部署资源管理器和一级任务调度器;所述一级从节点上部署应用管理器、任务性能评估器和二级任务调度器,其中二级任务调度器与任务性能评估器连接,二级任务调度器还与主节点连接;每个所述二级从节点上均部署节点管理器。它从提高资源有效利用方面优化短作业运行性能,减少资源分配和回收的频率,将资源分配和回收的时间用来运行短作业,通过减少作业等待资源时间,提高执行短作业的性能。
The invention discloses a MapReduce short job optimization system and method based on resource reuse; the system includes: a master node, a first-level slave node and several second-level slave nodes, wherein the master node is connected to the first-level slave node, and the first-level slave node Connect with several second-level slave nodes; deploy resource manager and first-level task scheduler on the master node; deploy application manager, task performance evaluator and second-level task scheduler on the first-level slave node, wherein two The first-level task scheduler is connected to the task performance evaluator, and the second-level task scheduler is also connected to the master node; a node manager is deployed on each of the second-level slave nodes. It optimizes the running performance of short jobs from the aspect of improving the effective utilization of resources, reduces the frequency of resource allocation and recycling, uses the time of resource allocation and recycling to run short jobs, and improves the performance of executing short jobs by reducing the time for jobs to wait for resources.
Description
技术领域technical field
本发明涉及一种基于资源重用的MapReduce短作业优化系统及方法。The invention relates to a resource reuse-based MapReduce short job optimization system and method.
背景技术Background technique
互联网、金融及媒体等行业正在面临着处理大规模数据集的挑战,但常规的数据处理工具和计算模型不能满足其要求。Google提出的MapReduce模型为其提供了一个有效的解决方案,Hadoop是MapReduce的开源实现。Hadoop将提交的作业分解为粒度更小的Map任务和Reduce任务,这些任务在集群中的多个节点上并行运行,因此极大的缩短了作业的运行时间。Hadoop隐藏了并行计算的细节——向计算节点分发数据,重新运行失败的任务等,让用户能够专注于具体的业务逻辑处理。而且,Hadoop提供良好的线性扩展、数据冗余以及计算的高容错性,这些优点使得Hadoop成为运行数据密集型和计算密集型应用的主流计算框架。Hadoop在工业界的成功促使学术界开始密切关注Hadoop,并对其设计和实现的不足提出改善建议。Industries such as the Internet, finance, and media are facing the challenge of processing large-scale data sets, but conventional data processing tools and computing models cannot meet their requirements. The MapReduce model proposed by Google provides an effective solution for it, and Hadoop is an open source implementation of MapReduce. Hadoop decomposes the submitted job into smaller-grained Map tasks and Reduce tasks. These tasks run in parallel on multiple nodes in the cluster, thus greatly reducing the running time of the job. Hadoop hides the details of parallel computing—distributing data to computing nodes, rerunning failed tasks, etc., allowing users to focus on specific business logic processing. Moreover, Hadoop provides good linear expansion, data redundancy, and high fault tolerance of computing, which make Hadoop a mainstream computing framework for running data-intensive and computing-intensive applications. The success of Hadoop in the industry has prompted the academic community to pay close attention to Hadoop and make suggestions for improving its design and implementation.
Hadoop的初始设计目标是在大量计算节点中并行处理规模较大的作业,然而实际生产中,Hadoop却经常被用来处理规模较小的短作业。短作业是指完成时间小于设定阈值的作业,阈值一般由用户自行设置。规模小的短作业与规模大的作业在多个方面存在差异,比如输入数据集的大小、作业分解的任务数、任务需要的资源量、任务的完成时间以及用户对作业完成时间的期望等。由于Hadoop并未考虑短作业的特点,导致短作业在Hadoop中运行的性能比较低效。Hadoop's initial design goal is to process large-scale jobs in parallel in a large number of computing nodes. However, in actual production, Hadoop is often used to process smaller-scale short jobs. Short jobs refer to jobs whose completion time is less than the set threshold, which is generally set by the user. Small-scale short jobs differ from large-scale jobs in many aspects, such as the size of the input data set, the number of tasks that the job decomposes, the amount of resources required by the task, the completion time of the task, and the user's expectation on the completion time of the job, etc. Since Hadoop does not consider the characteristics of short jobs, the performance of short jobs running in Hadoop is relatively inefficient.
影响作业运行性能因素有多个方面,其中集群节点的配置、作业的调度算法和集群的负载是比较关键的三个因素。Hadoop在调度任务时假定构成集群的节点是同构的,即每个节点的CPU、内存、磁盘等硬件的配置相同。然而,随着公司业务的拓展集群的规模逐渐扩大,新增节点的配置明显高于前期节点的配置。因此,相同的任务在不同节点上执行,任务的完成时间相差较大。在集群高负载的情况下,作业分解的所有任务不能立即申请到运行任务的资源,部分任务进入等待资源的队列。获得资源的任务执行完成后释放占用的资源,Hadoop根据调度算法从等待队列中选择合适的任务将可用的资源分配给选择的任务。因此,若作业分解的任务比较多,同一个作业分解的任务可能执行运行多轮才能完成。例如在TaoBao的Hadoop集群中,Map任务运行两轮以上的作业占30%以上。因此,集群的负载情况对作业的响应时间和完成时间具有决定性的影响。There are many factors that affect job running performance, among which the configuration of cluster nodes, job scheduling algorithm and cluster load are the three key factors. When Hadoop schedules tasks, it assumes that the nodes that make up the cluster are isomorphic, that is, the hardware configurations of each node, such as CPU, memory, and disk, are the same. However, as the company's business expands and the scale of the cluster gradually expands, the configuration of new nodes is significantly higher than that of previous nodes. Therefore, the same task is executed on different nodes, and the completion time of the task varies greatly. In the case of high cluster load, all tasks decomposed by the job cannot immediately apply for the resources to run the tasks, and some tasks enter the queue waiting for resources. After the resource-acquired task is executed, the occupied resource is released, and Hadoop selects the appropriate task from the waiting queue according to the scheduling algorithm and allocates the available resources to the selected task. Therefore, if there are many tasks decomposed by the job, the tasks decomposed by the same job may be executed in multiple rounds to complete. For example, in TaoBao's Hadoop cluster, more than 30% of the Map tasks run for more than two rounds. Therefore, the load situation of the cluster has a decisive influence on the response time and completion time of the job.
最近几年,优化MapReduce作业的执行性能逐渐成为研究热点,大量研究工作从多个方面提升作业的执行效率,比如Hadoop的框架、作业的调度算法、作业的运行过程以及硬件加速器等。In recent years, optimizing the execution performance of MapReduce jobs has gradually become a research hotspot. A lot of research work has improved job execution efficiency from multiple aspects, such as Hadoop framework, job scheduling algorithm, job running process, and hardware accelerators.
Piranha基于作业历史数据总结出小作业的特点——任务数少和失败率低。现有技术中有基于任务数少的特点优化小作业的执行流程,比如Map任务和Reduce任务同时启动、Map任务的中间结果直接写到Reduce任务端。由于小作业的失败率低于1%,采用了简易的容错机制——若作业运行失败,Piranha重新执行整个作业而不是失败的任务。现有技术中有将批处理作业分解为大量的微小任务,每个任务只处理8M的数量,任务可以在数秒内完成计算。由于任务输入的数据量小、运行时间短,任务不存在数据倾斜和落后任务的问题,同时解决了交互式作业长时间等待资源的不足。Tenzing为减少MapReduce作业的周转时间,提供一组作业进程池。新作业提交后,Tenzing的调度器从作业进程池中选择一个空闲的进程运行提交的作业。使用进程池降低了启动新作业的成本,但Tenzing存在两个缺点:一个是预留的进程数超过实际需要,浪费了集群的资源;另一个是限制了Tenzing的调度器只能使用某些具体的进程,损害了任务本地计算的特性。Based on the job history data, Piranha summarizes the characteristics of small jobs—few tasks and low failure rate. In the existing technology, the execution process of small jobs is optimized based on the characteristics of a small number of tasks. For example, the Map task and the Reduce task are started at the same time, and the intermediate results of the Map task are directly written to the Reduce task end. Since the failure rate of small jobs is less than 1%, a simple fault tolerance mechanism is adopted - if a job fails to run, Piranha re-executes the entire job instead of the failed task. In the prior art, the batch processing job is decomposed into a large number of tiny tasks, each task only processes 8M, and the task can complete the calculation within a few seconds. Due to the small amount of data input by the task and the short running time, the task does not have the problems of data skew and lagging tasks, and at the same time solves the problem of interactive jobs waiting for resources for a long time. Tenzing provides a set of job process pools to reduce the turnaround time of MapReduce jobs. After a new job is submitted, Tenzing's scheduler selects an idle process from the job process pool to run the submitted job. Using the process pool reduces the cost of starting new jobs, but Tenzing has two disadvantages: one is that the number of reserved processes exceeds actual needs, which wastes cluster resources; the other is that the Tenzing scheduler can only use certain specific process, compromising the nature of task-local computing.
现有技术中有分析了Hadoop计算框架运行短作业存在的不足,提出从Hadoop的框架方面改善短作业的运行效率。论文从三个方面进行优化:1)将setup任务和cleanup任务改为在主节点运行,避免通过心跳消息改变作业的状态,作业的完成时间直接缩短了一个心跳周期;2)将任务分配从“拉”模式改为“推”模式,减少任务分配的延迟;3)将主节点和从节点之间的控制消息从当前的心跳消息机制中独立出来,采用即时传递机制。In the prior art, the shortcomings of running short jobs in the Hadoop computing framework are analyzed, and it is proposed to improve the running efficiency of short jobs from the Hadoop framework. The paper optimizes from three aspects: 1) Change the setup task and cleanup task to run on the master node to avoid changing the job status through the heartbeat message, and the job completion time is directly shortened by one heartbeat cycle; 2) The task allocation is changed from " The "pull" mode is changed to the "push" mode to reduce the delay of task assignment; 3) The control message between the master node and the slave node is independent from the current heartbeat message mechanism, and the instant delivery mechanism is adopted.
Spark也是处理大规模数据集的计算框架,具有运行DAG图的引擎。Spark与Hadoop的差异是Spark基于内存计算,spark将作业的中间结果存储在内存中而不是本地磁盘或者HDFS中,DAG图中的后继作业直接从内存中读取输入数据进行计算。SpongeFiles的目标是缓解数据倾斜的问题,但在减少作业运行时间方面效果非常显著。现有技术中有引入分布式内存,Map任务的输出结果优先写到分布式内存中,减少数据写入本地磁盘消耗的时间和shuffle阶段读取Map任务中间结果的时间。该方法对Map任务输出数据较大的作业优化效果明显,但对只有Map任务和Map任务输出数据量小的作业没有明显的效果。现有技术中都是通过减少磁盘IO来提高作业的执行效率。Spark is also a computing framework for processing large-scale data sets, with an engine for running DAG graphs. The difference between Spark and Hadoop is that Spark is based on memory computing. Spark stores the intermediate results of jobs in memory instead of local disk or HDFS. Subsequent jobs in the DAG graph directly read input data from memory for calculation. The goal of SpongeFiles is to alleviate the problem of data skew, but it is very effective in reducing job runtime. In the existing technology, distributed memory is introduced, and the output results of the Map task are written to the distributed memory first, reducing the time consumed for writing data to the local disk and the time for reading the intermediate results of the Map task in the shuffle stage. This method has an obvious effect on the optimization of the job with large output data of Map task, but has no obvious effect on the job with only Map task and the job with small output data of Map task. In the prior art, job execution efficiency is improved by reducing disk IO.
Sparrow的设计目标是提供低延迟的任务调度。Sparrow在每个节点中长期运行部分任务进程,由长期运行的任务进程执行新任务,降低任务进程频繁启动的成本。长期运行的任务进程数由用户静态的设定或是由资源管理器根据集群的负载自动调整。Quincy是任务级别的调度器,与Sparrow类似。Quincy为了计算最优的调度顺序将调度问题映射为一张图,调度时综合考虑数据的位置,公平性和饥饿问题。与Sparrow相比,Quincy计算调度顺序时间更长。Sparrow is designed to provide low-latency task scheduling. Sparrow runs some task processes in each node for a long time, and the long-running task processes execute new tasks, reducing the cost of frequent startup of task processes. The number of long-running task processes is statically set by the user or automatically adjusted by the resource manager according to the load of the cluster. Quincy is a task-level scheduler, similar to Sparrow. In order to calculate the optimal scheduling sequence, Quincy maps the scheduling problem into a graph, and considers the location of data, fairness, and hunger when scheduling. Compared to Sparrow, Quincy takes longer to calculate the scheduling sequence.
Hadoop的作业调度算法对作业的运行时间有重要的影响。FIFO调度算法根据作业的提交时间顺序调度作业。由于FIFO没有考虑作业之间的差异,导致执行小作业和交互式作业的性能比较低效。FAIR调度算法保证用户提交的作业公平的共享集群资源,确保短作业在合理的时间内完成,避免作业出现饥饿问题。但FAIR调度算法没有考虑集群的异构性和作业具有时间约束的情况。HFSP在作业运行时评估作业的大小,将小作业设置为高优先级的作业,保证小作业在最短的时间内完成。作业的优先级随着时间动态的调整,防止作业出现饥饿现象。Hadoop's job scheduling algorithm has a significant impact on the running time of jobs. The FIFO scheduling algorithm schedules jobs in order of their submission time. Because FIFO does not take into account the differences between jobs, the performance of small and interactive jobs is relatively inefficient. The FAIR scheduling algorithm ensures that jobs submitted by users share cluster resources fairly, ensures that short jobs are completed within a reasonable time, and avoids job starvation problems. However, the FAIR scheduling algorithm does not consider the heterogeneity of the cluster and the time constraints of the job. HFSP evaluates the size of the job when the job is running, and sets the small job as a high-priority job to ensure that the small job is completed in the shortest time. The priority of jobs is dynamically adjusted over time to prevent job starvation.
发明内容Contents of the invention
本发明的目的就是为了解决上述问题,提供一种基于资源重用的MapReduce短作业优化系统及方法,它从提高资源有效利用方面优化短作业运行性能,减少资源分配和回收的频率,将资源分配和回收的时间用来运行短作业,通过减少作业等待资源时间,提高执行短作业的性能。The object of the present invention is to solve the above-mentioned problems, provide a kind of MapReduce short job optimization system and method based on resource reuse, it optimizes the running performance of short jobs from the aspect of improving the effective utilization of resources, reduces the frequency of resource allocation and recovery, and allocates resources and The reclaimed time is used to run short jobs, which improves the performance of short jobs by reducing the time for jobs to wait for resources.
为了实现上述目的,本发明采用如下技术方案:In order to achieve the above object, the present invention adopts the following technical solutions:
一种基于资源重用的MapReduce短作业优化系统,包括:主节点、一级从节点和若干个二级从节点,其中主节点与一级从节点连接,一级从节点与若干个二级从节点连接;A MapReduce short job optimization system based on resource reuse, including: a master node, a first-level slave node and several second-level slave nodes, wherein the master node is connected to the first-level slave node, and the first-level slave node is connected to several second-level slave nodes connect;
所述主节点上部署资源管理器和一级任务调度器;A resource manager and a first-level task scheduler are deployed on the master node;
所述一级从节点上部署应用管理器、任务性能评估器和二级任务调度器,其中二级任务调度器与任务性能评估器连接,二级任务调度器还与主节点连接;An application manager, a task performance evaluator, and a secondary task scheduler are deployed on the first-level slave node, wherein the secondary task scheduler is connected to the task performance evaluator, and the secondary task scheduler is also connected to the master node;
每个所述二级从节点上均部署节点管理器。A node manager is deployed on each of the secondary slave nodes.
所述资源管理器负责全局资源的的分配和监控,以及应用管理器的启动和监控。The resource manager is responsible for the allocation and monitoring of global resources, and the starting and monitoring of the application manager.
所述一级任务调度器用于按照任务的优先级高低、任务所需资源、任务提交时间顺序等进行作业队列的调度。The first-level task scheduler is used to schedule the job queue according to the priority of the tasks, the resources required by the tasks, and the time sequence of task submission.
所述应用管理器把作业分解为Map任务和Reduce任务,同时还为Map任务和Reduce任务申请资源,与节点管理器配合运行,还用于对任务进行监控。应用管理器是作业运行的控制单元,每个作业对应一个应用管理器。The application manager decomposes the job into a Map task and a Reduce task, and at the same time applies for resources for the Map task and the Reduce task, cooperates with the node manager to run, and is also used to monitor the tasks. The application manager is the control unit for job running, and each job corresponds to an application manager.
所述任务性能评估器,基于任务性能模型预测正在运行的任务和未调度的任务的完成时间。The task performance evaluator predicts the completion time of running tasks and unscheduled tasks based on the task performance model.
所述二级任务调度器,根据任务性能评估器的预测结果判断正在执行的任务是否属于短任务以及从未调度的任务队列中选择任务。The secondary task scheduler judges whether the task being executed is a short task according to the prediction result of the task performance evaluator, and selects a task from the unscheduled task queue.
若正在执行的任务是短任务,二级任务调度器从未调度的任务队列中选择新的短任务,新的短任务重新利用当前正在执行的短任务即将释放的资源;若正在执行的任务不是短任务,正在执行的任务执行完成后直接释放所占用的资源。If the task being executed is a short task, the secondary task scheduler selects a new short task from the unscheduled task queue, and the new short task reuses the resources that are about to be released by the currently executing short task; if the executing task is not For short tasks, the resources occupied are released directly after the execution of the task being executed is completed.
二级任务调度器在选择未调度任务时,需要考虑任务的本地性、任务的运行时间、资源的公平性和集群的异构性。When the secondary task scheduler selects unscheduled tasks, it needs to consider the locality of tasks, the running time of tasks, the fairness of resources and the heterogeneity of clusters.
节点管理器负责监控任务使用的资源量,防止任务运行过程中使用的资源量超过任务申请的资源量。The node manager is responsible for monitoring the amount of resources used by the task to prevent the amount of resources used during the running of the task from exceeding the amount of resources requested by the task.
一种基于资源重用的MapReduce短作业优化方法,包括如下步骤:A method for optimizing short MapReduce jobs based on resource reuse, comprising the following steps:
步骤(1):应用管理器通过心跳消息向资源管理器申请资源;Step (1): The application manager applies for resources from the resource manager through a heartbeat message;
步骤(2):资源管理器将闲置的资源分配给申请资源的应用管理器,应用管理器获取申请到的资源;Step (2): The resource manager allocates idle resources to the application manager that applies for resources, and the application manager obtains the applied resources;
步骤(3):应用管理器把申请到的资源分配给未调度的任务,然后应用管理器通知对应的节点管理器启动任务进程;Step (3): The application manager allocates the requested resources to unscheduled tasks, and then the application manager notifies the corresponding node manager to start the task process;
步骤(4):节点管理器启动任务进程运行任务,任务在运行的过程中每间隔设定的心跳周期向所属的应用管理器发送心跳消息,所述心跳消息包括任务的进度、任务统计数据和任务健康状态;任务统计数据是指已处理的数据量、已输出的数量、已消耗的时间、溢写IO速率;任务健康状态是指执行任务的进程是否异常;Step (4): The node manager starts the task process to run the task, and the task sends a heartbeat message to the application manager at every set heartbeat period during the running process, and the heartbeat message includes the progress of the task, task statistics and Task health status; task statistics refer to the amount of processed data, output quantity, consumed time, overflow write IO rate; task health status refers to whether the process of executing the task is abnormal;
步骤(5):应用管理器收到心跳消息,基于任务性能评估器中的任务完成时间预测模型来预测任务的运行时间,若任务的运行时间小于等于用户设定的任务完成时间,则当前任务是短任务;否则当前任务是长任务;Step (5): The application manager receives the heartbeat message, and predicts the running time of the task based on the task completion time prediction model in the task performance evaluator. If the task running time is less than or equal to the task completion time set by the user, the current task is a short task; otherwise the current task is a long task;
若当前任务是短任务,二级任务调度器从未调度的任务队列中选择新任务;若当前任务是长任务,就忽略心跳消息;If the current task is a short task, the secondary task scheduler selects a new task from the unscheduled task queue; if the current task is a long task, the heartbeat message is ignored;
步骤(6):应用管理器将选择的新任务通知任务进程,任务进程执行完正在运行的任务后继续运行新任务;Step (6): The application manager notifies the task process of the selected new task, and the task process continues to run the new task after executing the running task;
步骤(7):若任务进程执行完正在运行的任务后未接收到新任务,任务进程退出并释放占用的资源,节点管理器通过心跳消息将释放的资源通知资源管理器。Step (7): If the task process does not receive a new task after executing the running task, the task process exits and releases the occupied resources, and the node manager notifies the resource manager of the released resources through a heartbeat message.
所述步骤(4)的任务在运行的过程中每间隔设定的心跳周期向所属的应用管理器发送心跳消息的步骤为:The step of sending a heartbeat message to the associated application manager at every set heartbeat period during the operation of the task in step (4) is as follows:
步骤(401):判断任务进度是否超过设定阈值,若任务进度超过设定阈值,计算当前任务的统计数据,发送统计心跳消息,转步骤(403);否则转步骤(402);所述当前任务的统计数据包括任务已处理的数量、已运行时间和输出数据量;Step (401): judge whether the task progress exceeds the set threshold, if the task progress exceeds the set threshold, calculate the statistical data of the current task, send a statistical heartbeat message, and turn to step (403); otherwise turn to step (402); Task statistics include the number of tasks processed, the elapsed running time and the amount of output data;
步骤(402):任务的进度未超过设定阈值,发送任务健康心跳消息;进入步骤(404);Step (402): if the progress of the task does not exceed the set threshold, send a task health heartbeat message; enter step (404);
步骤(403):任务收到应用管理器的心跳消息;进入步骤(404);Step (403): the task receives the heartbeat message from the application manager; enter step (404);
步骤(404):判断任务进程进行是否收到新任务,若任务进程收到新任务,转步骤(405);否则转(406);Step (404): Judging whether the task process has received a new task, if the task process has received a new task, go to step (405); otherwise go to (406);
步骤(405):将新任务的输入数据读取到当前节点;在当前任务执行完成后,运行新接收到的任务;Step (405): read the input data of the new task to the current node; after the execution of the current task is completed, run the newly received task;
步骤(406):若任务进程没有收到新任务,当前任务执行完成后,任务进程释放任务占用的资源。Step (406): If the task process does not receive a new task, after the execution of the current task is completed, the task process releases the resource occupied by the task.
所述步骤(5)的基于任务性能评估器中的任务完成时间预测模型来预测任务的运行时间:Described step (5) predicts the running time of task based on the task completion time prediction model in the task performance evaluator:
假如任务的运行过程可划分为多个子阶段,任务的完成时间与任务在子阶段消耗的时间密切相关,因此根据任务在子阶段消耗的时间建立任务完成时间预测模型;If the running process of the task can be divided into multiple sub-phases, the completion time of the task is closely related to the time consumed by the task in the sub-phase, so the task completion time prediction model is established according to the time consumed by the task in the sub-phase;
任务的运行过程划分为多个子阶段,若子阶段之间不存在重叠,任务的完成时间为各个子阶段的时间之和;The running process of the task is divided into multiple sub-phases. If there is no overlap between the sub-phases, the completion time of the task is the sum of the time of each sub-phase;
若子阶段之间存在重叠,要去除重叠的部分;If there is overlap between the sub-stages, the overlapping part should be removed;
假如任务i的运行过程分解为n个子阶段,每个子阶段处理的数据量用向量s表示,s=[s1,s1,…,sn],每个子阶段处理数据的速率用向量r表示,r=[r1,r1,…,rn]。If the running process of task i is decomposed into n sub-stages, the amount of data processed by each sub-stage is represented by vector s, s=[s 1 , s 1 ,...,s n ], and the rate of data processed by each sub-stage is represented by vector r , r=[r 1 , r 1 , . . . , r n ].
任务i的完成时间Ti为:The completion time T i of task i is:
其中,α为启动任务的时间,在固定的范围内变化,是一个常量。Among them, α is the time to start the task, which changes within a fixed range and is a constant.
Map任务的运行过程分解为四个子阶段,分别为读取输入数据阶段,运行map操作阶段,输出数据溢写阶段,中间结果文件合并阶段,各阶段分别用read,map,spill,combine表示。The running process of the Map task is decomposed into four sub-phases, namely, the phase of reading input data, the phase of running map operations, the phase of overflowing output data, and the phase of merging intermediate result files. Each phase is represented by read, map, spill, and combine.
根据公式(1)计算Map任务i的完成时间Ti:Calculate the completion time T i of Map task i according to formula (1):
对于短任务而言,中间结果文件合并阶段运行时间非常小,看做常量。For short tasks, the running time of the intermediate result file merging stage is very small, which is regarded as a constant.
读取输入数据阶段和map操作阶段处理的数量相同,为任务的输入数据量stask。The number of processes processed in the phase of reading input data and the phase of map operation is the same, which is the amount of input data s task of the task .
由于map操作阶段与输出数据溢写阶段存在部分重叠,溢写阶段的数据量取map操作之后输出的数量,即最后一次溢写的数量。Since there is a partial overlap between the map operation phase and the output data overflow phase, the amount of data in the overflow phase is the output after the map operation, that is, the amount of the last overflow.
设已处理的输入数据量,为已输出的数据量,sbuffer配置项设定的缓存大小,为最后一次溢写的数据量,公式(2)演变为:Assume the amount of input data processed, For the amount of output data, the cache size set by the s buffer configuration item, is the amount of data written in the last overflow, formula (2) evolves into:
其中,stask为任务的输入数据量大小,β为常量,rread,rmap,rspill从任务端发送的统计心跳消息中获取。 为i阶段已处理的数据量,为在i阶段已消耗的时间。Among them, s task is the input data size of the task, β is a constant, r read , r map , and r spill are obtained from the statistical heartbeat message sent by the task end. is the amount of data processed in stage i, is the elapsed time in stage i.
当应用管理器收到任务端发送的统计心跳消息时,根据公式(3)计算当前任务的完成时间。When the application manager receives the statistical heartbeat message sent by the task end, it calculates the completion time of the current task according to formula (3).
假如任务k在节点w上运行,预测未调度的任务m在节点w运行的完成时间时,任务m各个子阶段的处理速率为If task k is running on node w, when predicting the completion time of unscheduled task m running on node w, the processing rate of each sub-stage of task m is
所述步骤(5)的二级任务调度器从未调度的任务队列中选择新任务的步骤包括:The step that the secondary task scheduler of described step (5) selects new task from unscheduled task queue comprises:
步骤(51):Step (51):
按任务的本地性将未调度的任务分为节点本地任务,机架本地任务,其他机架任务;According to the locality of tasks, unscheduled tasks are divided into node local tasks, rack local tasks, and other rack tasks;
如果任务的输入数据在处理任务的节点上,称该任务为节点本地任务;If the input data of the task is on the node that processes the task, the task is called a node-local task;
如果任务的输入数据与处理任务的节点在同一个机架内,称为该任务为机架本地任务;If the input data of the task is in the same rack as the node processing the task, the task is called a rack-local task;
如果任务的输入数据与处理任务的节点不在同一个机架内,称为该任务为其他机架任务;If the input data of the task is not in the same rack as the node processing the task, the task is called another rack task;
再按照节点本地任务、机架本地任务、其他机架任务的顺序选择任务,优先选择每个优先级中运行时间最小的任务;Then select tasks in the order of node local tasks, rack local tasks, and other rack tasks, and give priority to the task with the smallest running time in each priority;
步骤(52):根据异构性原则选择最优任务。Step (52): Select the optimal task according to the principle of heterogeneity.
所述步骤(51)的步骤为:The step of described step (51) is:
步骤(511):应用管理器收到任务进程发送的统计心跳消息,转步骤(512);Step (511): the application manager receives the statistical heartbeat message sent by the task process, and turns to step (512);
步骤(512):任务性能评估器预测发送心跳任务的完成时间,转步骤(513);Step (512): the task performance evaluator predicts the completion time of sending the heartbeat task, and turns to step (513);
步骤(513):若发送心跳任务的完成时间小于设定的时间,则计算未调度任务在运行当前作业的全部节点上运行时间,转步骤(514);Step (513): If the completion time of sending the heartbeat task is less than the set time, then calculate the running time of unscheduled tasks on all nodes running the current job, and turn to step (514);
步骤(514):将未调度任务按照本地性分别加入到节点本地任务队列、机架本地任务队列和其他机架任务队列,转步骤(515);本地性是指任务的输入数据与处理节点是否相同,是否在一个机架内;Step (514): Add unscheduled tasks to node local task queues, rack local task queues and other rack task queues respectively according to locality, and go to step (515); locality refers to whether the input data of the task is related to the processing node Same, whether in a rack or not;
步骤(515):按照任务的运行时间对节点本地任务队列、机架本地任务队列和其他机架任务队列排序;转步骤(516);Step (515): sort the node local task queue, the rack local task queue and other rack task queues according to the running time of the task; go to step (516);
步骤(516):根据任务的本地性优先级,判断未调度任务是否属于最优任务,若是最优任务则将当前选择的未调度任务从未调度列表中删除,返回该任务;否则判断下一个未调度的任务,直到检查完全部未调度任务。Step (516): According to the local priority of the task, determine whether the unscheduled task belongs to the optimal task, if it is the optimal task, delete the currently selected unscheduled task from the unscheduled list, and return to the task; otherwise, determine the next Unscheduled tasks until all unscheduled tasks are checked.
所述步骤(52)的步骤为:The step of described step (52) is:
步骤(521):计算输入任务在运行当前作业的全节点上的运行时间,转步骤(522);Step (521): Calculate the running time of the input task on the full node running the current job, go to step (522);
步骤(522):获取计算输入任务运行时间最短的节点,转步骤(523);Step (522): Obtain the node with the shortest running time of the calculation input task, go to step (523);
步骤(523):判断获取的节点与输入的节点是否相同,如果相同,返回任务是最优任务;否则转步骤(524);Step (523): Judging whether the obtained node is the same as the input node, if they are the same, the returned task is the optimal task; otherwise, go to step (524);
步骤(524):计算任务从输入节点转移到运行任务时间最短的节点上运行的收益,若收益超过设定的阈值,返回输入任务是最优任务;否则返回输入任务不是最优任务。Step (524): Calculate the income from transferring the task from the input node to the node with the shortest running time. If the income exceeds the set threshold, the returned input task is the optimal task; otherwise, the returned input task is not the optimal task.
所述任务的本地性是指:任务的输入数据与处理节点在同一个节点称为本地任务;若在同一个机架称为机架本地任务;不在一个机架内称为其他机架任务。就是任务的输入数据与处理节点是何种关系。The locality of the task means that: if the input data of the task is on the same node as the processing node, it is called a local task; if it is in the same rack, it is called a rack local task; if it is not in a rack, it is called another rack task. It is the relationship between the input data of the task and the processing node.
所述任务转移运行收益是指:是指若任务在其他节点运行,运行时间比在原节点运行时间更短。缩短的这部分时间称为收益。The task transfer operation benefit refers to: if the task runs on other nodes, the running time is shorter than that on the original node. This shortened portion of time is called the benefit.
所述最优任务是指:是这在当前节点运行时间最短的任务。The optimal task refers to the task with the shortest running time on the current node.
本发明的有益效果:对Hadoop进行优化,提高短作业的运行效率。首先,本发明通过分析Hadoop中作业的执行过程,对短作业处理过程中存在的问题进行描述。然后,根据高负载情况下任务运行多轮的特点,提出一种基于资源重用的短作业优化机制——通过对已执行任务所释放资源的重用,减少资源在分配和回收过程中的浪费。在集群高负载的情况下,运行多轮的任务中Map任务占的比例远高于Reduce任务,因此本发明只优化Map任务。实验结果表明,本发明提出的基于资源重用的短作业优化方法能够有效的减少短作业的运行时间,并显著提高了集群资源的利用率。The beneficial effect of the present invention is to optimize Hadoop and improve the operating efficiency of short jobs. First, the present invention describes the problems existing in the processing of short jobs by analyzing the execution process of jobs in Hadoop. Then, according to the characteristics of tasks running for multiple rounds under high load conditions, a short job optimization mechanism based on resource reuse is proposed to reduce the waste of resources in the process of allocation and recycling by reusing the resources released by the executed tasks. In the case of a high cluster load, the Map task accounts for much more than the Reduce task in the tasks running multiple rounds, so the present invention only optimizes the Map task. Experimental results show that the short job optimization method based on resource reuse proposed by the present invention can effectively reduce the running time of short jobs and significantly improve the utilization rate of cluster resources.
附图说明Description of drawings
图1为长任务运行时序图;Figure 1 is a long task running timing diagram;
图2为短作业计算框架;Figure 2 is the short job calculation framework;
图3为短任务执行流程时序图;FIG. 3 is a sequence diagram of short task execution flow;
图4(a)为任务的进程取不同的值时,任务性能模型预测正在运行的任务的误差变化情况;Figure 4(a) shows the error variation of the running task predicted by the task performance model when the process of the task takes different values;
图4(b)为任务运行时间与误差之间的关系;Figure 4(b) shows the relationship between task running time and error;
图5(a)为词频统计作业运行时间;Figure 5(a) is the running time of the word frequency statistics job;
图5(b)为HiveSQL转化的作业运行时间;Figure 5(b) shows the job running time transformed by HiveSQL;
图5(c)为短作业优化的作业运行时间;Figure 5(c) shows the optimized job running time for short jobs;
图6为计算节点CPU利用率;Figure 6 shows the CPU utilization of computing nodes;
图7为计算节点内存利用率。Figure 7 shows the memory utilization of computing nodes.
具体实施方式detailed description
下面结合附图与实施例对本发明作进一步说明。The present invention will be further described below in conjunction with the accompanying drawings and embodiments.
Hadoop是一种用于处理大规模数据集的并行计算平台,具备良好的扩展性、高容错性、易编程等优点。虽然其初始设计目标是在大量计算节点中并行处理规模较大的作业,然而在实际生产中,Hadoop却经常被用来处理规模较小的短作业。由于Hadoop并未考虑短作业的特点,导致短作业在Hadoop中执行比较低效。针对上述挑战,本发明首先通过分析Hadoop中作业的执行过程,对短作业处理过程中存在的问题进行描述。然后,根据高负载情况下任务运行多轮的特点,提出一种基于资源重用的短作业优化机制——通过对已执行任务所释放资源的重用,减少资源在分配和回收过程中的浪费。实验结果表明,本发明提出的基于资源重用的短作业优化方法能够有效的减少短作业的运行时间,并显著提高了集群资源的利用率。Hadoop is a parallel computing platform for processing large-scale data sets, which has the advantages of good scalability, high fault tolerance, and easy programming. Although its original design goal is to process large-scale jobs in parallel in a large number of computing nodes, in actual production, Hadoop is often used to process small-scale short jobs. Since Hadoop does not consider the characteristics of short jobs, the execution of short jobs in Hadoop is relatively inefficient. In view of the above challenges, the present invention first describes the problems existing in the short job processing process by analyzing the job execution process in Hadoop. Then, according to the characteristics of tasks running for multiple rounds under high load conditions, a short job optimization mechanism based on resource reuse is proposed to reduce the waste of resources in the process of allocation and recycling by reusing the resources released by the executed tasks. Experimental results show that the short job optimization method based on resource reuse proposed by the present invention can effectively reduce the running time of short jobs and significantly improve the utilization rate of cluster resources.
本发明基于Hadoop2.x版本对短作业进行优化。本节首先分析Hadoop的计算框架和任务执行的过程,然后描述Hadoop执行短作业存在的问题。The present invention optimizes short jobs based on Hadoop2.x version. This section first analyzes Hadoop's computing framework and task execution process, and then describes the problems existing in Hadoop's execution of short jobs.
Hadoop采用主从结构,由一个主节点和多个从节点组成,参考图2。资源管理器(ResourceManager)运行在主节点上,负责全局资源的分配和监控,以及应用管理器的启动和监控。应用管理器(ApplicationMaster)运行在从节点上,其职责是把作业分解为Map任务和Reduce任务、为Map任务和Reduce任务申请资源、与节点管理器配合运行和监控任务。应用管理器是作业运行的控制单元,每个作业对应一个应用管理器。节点管理器(NodeManager)也运行在从节点上,负责监控任务使用的资源量(如内存,CPU等),防止任务运行过程中使用的资源量超过任务申请的资源量。Hadoop adopts a master-slave structure, which consists of a master node and multiple slave nodes, as shown in Figure 2. The resource manager (ResourceManager) runs on the master node and is responsible for the allocation and monitoring of global resources, as well as the startup and monitoring of the application manager. The application manager (ApplicationMaster) runs on the slave nodes, and its responsibilities are to decompose the job into Map tasks and Reduce tasks, apply for resources for the Map tasks and Reduce tasks, and cooperate with the node manager to run and monitor tasks. The application manager is the control unit for job running, and each job corresponds to an application manager. The node manager (NodeManager) also runs on the slave node and is responsible for monitoring the amount of resources (such as memory, CPU, etc.) used by the task to prevent the amount of resources used during the running of the task from exceeding the amount of resources requested by the task.
图1描述了应用管理器为任务申请资源和任务运行过程。①应用管理器将作业划分为Map任务和Reduce任务,通过心跳消息向资源管理器申请资源。②资源管理器按照某种调度算法从申请资源的任务队列中选择合适的任务,并将闲置的资源分配给选择的任务,应用管理器在下一次心跳消息时获取申请到的资源。③应用管理器把申请的资源分配给适合的任务,然后通知资源所在节点的节点管理器启动任务。④节点管理器启动任务进程运行任务,任务在运行的过程中间隔心跳周期向所属的应用管理器汇报任务的进度和健康状态。⑤任务完成后释放任务占用的资源,由节点管理器通过心跳消息将释放的资源归还给资源管理器。Figure 1 describes the process of the application manager applying for resources for a task and running the task. ① The application manager divides the job into Map tasks and Reduce tasks, and applies for resources from the resource manager through heartbeat messages. ②The resource manager selects the appropriate task from the task queue applying for resources according to a certain scheduling algorithm, and allocates the idle resources to the selected task, and the application manager obtains the applied resource in the next heartbeat message. ③ The application manager allocates the requested resources to appropriate tasks, and then notifies the node manager of the node where the resources are located to start the task. ④The node manager starts the task process to run the task, and the task reports the progress and health status of the task to the application manager to which it belongs at intervals of heartbeat cycles during the running process. ⑤ Release the resources occupied by the task after the task is completed, and the node manager will return the released resources to the resource manager through the heartbeat message.
从上述任务的执行过程得知,运行时间短的任务和运行时间长的任务都遵从上述过程,但运行时间短的任务按照上述过程执行存在如下问题:From the execution process of the above tasks, it is known that the tasks with short running time and tasks with long running time follow the above process, but the execution of tasks with short running time according to the above process has the following problems:
1)任务启动成本高。应用管理器为任务申请资源至少需要一个心跳周期(默认为3秒),从应用管理器通知节点管理器运行任务到任务初始化完成需要1~2秒。在Taobao的集群中运行时间低于10秒的Map任务占50%以上,在Yahoo!的即席查询和数据分析集群中大量Map任务的平均完成时间是20左右秒,因此任务的启动时间占总时间的20%(5/25)以上。在任务申请资源时,若资源管理器没有可以用的资源,任务需要排队等待可用资源,这将导致任务启动成本进一步增加。而任务排队等待可用资源的现象在集群中经常发生。1) The task startup cost is high. It takes at least one heartbeat period (3 seconds by default) for the application manager to apply for resources for the task, and it takes 1 to 2 seconds from the time the application manager notifies the node manager to run the task to the completion of the task initialization. In Taobao's cluster, Map tasks with a running time of less than 10 seconds account for more than 50%, and in Yahoo! The average completion time of a large number of Map tasks in the ad hoc query and data analysis cluster is about 20 seconds, so the task startup time accounts for more than 20% (5/25) of the total time. When a task applies for resources, if the resource manager has no available resources, the task needs to queue up for available resources, which will further increase the task startup cost. The phenomenon of tasks queuing and waiting for available resources often occurs in the cluster.
2)资源浪费严重。已执行完的任务释放所占用的资源,由节点管理器间隔一个心跳周期将释放的资源归还给资源管理器。资源管理器把闲置的资源分配给等待资源的任务,任务所属的应用管理器间隔一个心跳周期才能获得申请的资源。从启动任务进程到任务初始化完成需要1~2秒,所以释放的资源再次被重新利用需要7~8秒,其中心跳周期为3秒。因此,集群频繁的执行运行时间短的任务,将导致集群资源的严重浪费。2) Serious waste of resources. The completed tasks release the occupied resources, and the node manager returns the released resources to the resource manager at intervals of one heartbeat cycle. The resource manager allocates idle resources to tasks waiting for resources, and the application manager to which the tasks belong can only obtain the requested resources at intervals of one heartbeat cycle. It takes 1-2 seconds from the start of the task process to the completion of the task initialization, so it takes 7-8 seconds for the released resources to be reused again, and the heartbeat period is 3 seconds. Therefore, the frequent execution of short-running tasks by the cluster will result in a serious waste of cluster resources.
根据上述分析,当前Hadoop执行任务的流程不适合执行运行时间短的任务。为了方便描述问题,我们为运行时间短的任务引入一个新概念——短任务。According to the above analysis, the current process of Hadoop executing tasks is not suitable for executing tasks with short running time. In order to describe the problem conveniently, we introduce a new concept—short task—for short-running tasks.
定义1.设任务i的完成时间为Ti,用户设定任务完成时间为TshortTask,若任务的完成时间满足Ti≤TshortTesk,称任务i为短任务。Definition 1. Let the completion time of task i be T i , and the user set the task completion time as T shortTask . If the task completion time satisfies T i ≤ T shortTesk , task i is called a short task.
短作业是由大量的短任务构成,运行大量的短任务降低了短作业的执行性能。根据高负载情况下任务运行多轮的特点,本发明提出一种基于资源重用的短作业优化机制——通过对已执行任务所释放资源的重用,减少资源在分配和回收过程中的浪费。重用资源的任务得到提前运行,从而减少了作业的运行时间。A short job is composed of a large number of short tasks, and running a large number of short tasks reduces the execution performance of the short job. According to the characteristics of multiple rounds of task running under high load conditions, the present invention proposes a short job optimization mechanism based on resource reuse—reducing the waste of resources in the process of allocation and recovery by reusing the resources released by the executed tasks. Tasks that reuse resources are run ahead of time, reducing job runtime.
1资源重用的短作业计算框架1 Short job computing framework for resource reuse
本节描述资源重用的短作业计算框架和短作业的任务执行过程。任务运行需要一定数量的资源,包括内存、CPU、网络、磁盘存储空间、任务进程等,这些资源都可以被未调度的任务使用。由于不同作业的任务需要的资源数量有差别,本发明只考虑同一个作业的Map任务之间进行资源重用。This section describes the short job computing framework for resource reuse and the task execution process of short jobs. Task operation requires a certain amount of resources, including memory, CPU, network, disk storage space, task process, etc. These resources can be used by unscheduled tasks. Since the resource quantities required by tasks of different jobs are different, the present invention only considers resource reuse between Map tasks of the same job.
1.1短作业计算框架1.1 Short job computing framework
在Hadoop的框架中,应用管理器是一个作业的控制中心,负责为任务申请资源、与节点管理器配合执行和监控任务。本发明在应用管理器中增加两个组件——任务性能评估器(TaskPerformanceEstimator)和二级任务调度器(Sub-scheduler),框架如图2所示。In the framework of Hadoop, the application manager is a job control center, responsible for applying for resources for tasks, and cooperating with node managers to execute and monitor tasks. The present invention adds two components in the application manager——TaskPerformanceEstimator (TaskPerformanceEstimator) and a secondary task scheduler (Sub-scheduler), the frame of which is shown in FIG. 2 .
任务性能评估器基于任务性能模型预测两类任务的完成时间——正在运行的任务和未调度的任务。因为资源重用的机制只适用于短任务,根据短任务的定义需要任务的运行时间,而该时间在任务完成之前是未知的。未调度的任务在设定节点的运行时间是二级任务调度器选择任务的关键依据,其计算结果直接影响任务的调度顺序。由于任务可划分为多个子阶段,任务性能模型是基于任务子阶段消耗的时间建立的。任务执行的过程中,不间断的收集统计数据,计算任务在各个子阶段消耗的时间。The task performance estimator predicts the completion time of two types of tasks—running tasks and unscheduled tasks—based on the task performance model. Because the mechanism of resource reuse is only suitable for short tasks, the definition of short tasks requires the running time of the task, which is unknown before the task is completed. The running time of unscheduled tasks on the set node is the key basis for selecting tasks by the secondary task scheduler, and its calculation results directly affect the scheduling order of tasks. Since a task can be divided into sub-phases, the task performance model is built based on the time consumed by the sub-phases of the task. During the execution of the task, statistical data is collected continuously, and the time consumed by the task in each sub-stage is calculated.
二级任务调度器的职责是根据任务性能评估器的预测结果判断正在执行的任务是否属于短任务,以及从未调度的任务队列中选择任务。若正在执行的任务是短任务,二级任务调度器从未调度的任务队列中选择合适的任务重用短任务即将释放的资源。二级任务调度器在选择未调度任务时,综合考虑任务的本地性、任务的运行时间、资源的公平性和集群的异构性问题。The responsibility of the secondary task scheduler is to judge whether the task being executed is a short task according to the prediction result of the task performance evaluator, and to select a task from the unscheduled task queue. If the task being executed is a short task, the secondary task scheduler selects an appropriate task from the unscheduled task queue to reuse the resources that the short task will release. When selecting unscheduled tasks, the secondary task scheduler comprehensively considers the locality of tasks, the running time of tasks, the fairness of resources and the heterogeneity of clusters.
1.2短作业的任务执行过程1.2 Task execution process of short jobs
图3描述了短作业的任务执行流程。①应用管理器通过心跳消息向资源管理器申请资源。②资源管理器将闲置的资源分配给申请资源的任务,应用管理器在下一次心跳消息时获取申请到的资源。③应用管理器把申请的资源分配给适合的任务,然后通知对应的节点管理器启动任务。④节点管理器启动任务进程运行任务,任务在运行的过程中间隔心跳周期向所属的应用管理器汇报任务的进度、任务统计数据和健康状态。⑤应用管理器收到心跳消息,若当前任务是短任务,二级任务调度器从未调度的任务队列中选择合适的任务。⑥应用管理器将选择的任务通知任务进程,任务进程执行完正在运行的任务后继续运行新接收到的任务。⑦任务进程执行完正在运行的任务后未接收到新任务,任务进程退出并释放占用的资源,节点管理器通过心跳消息将释放的资源通知资源管理器。Figure 3 describes the task execution flow of the short job. ① The application manager applies for resources from the resource manager through the heartbeat message. ②The resource manager allocates idle resources to the task of applying for resources, and the application manager obtains the applied resources at the next heartbeat message. ③The application manager allocates the requested resources to the appropriate tasks, and then notifies the corresponding node manager to start the tasks. ④ The node manager starts the task process to run the task, and the task reports the task progress, task statistics and health status to the application manager to which it belongs at intervals of heartbeat cycles during the running process. ⑤ When the application manager receives the heartbeat message, if the current task is a short task, the secondary task scheduler selects a suitable task from the unscheduled task queue. ⑥The application manager notifies the task process of the selected task, and the task process continues to run the newly received task after executing the running task. ⑦ If the task process does not receive a new task after executing the running task, the task process exits and releases the occupied resources, and the node manager notifies the resource manager of the released resources through the heartbeat message.
短作业计算框架不影响长任务的执行,但长任务仍然遵循图1描述的过程执行,图3描述的过程只适用于短任务。The short job computing framework does not affect the execution of long tasks, but long tasks still follow the process described in Figure 1, and the process described in Figure 3 is only applicable to short tasks.
2资源重用的短作业优化实现2 Short job optimization implementation of resource reuse
根据短作业计算框架的设计,要在应用管理器端和任务进程分别实现。应用管理器端的实现基于任务进程收集的统计数据,本节首先介绍任务进程心跳消息的实现,然后介绍应用管理器端任务性能模型和二级任务调度器的实现。According to the design of the short job computing framework, it should be implemented separately on the application manager side and the task process. The implementation of the application manager is based on the statistical data collected by the task process. This section first introduces the implementation of the heartbeat message of the task process, and then introduces the implementation of the task performance model and the second-level task scheduler on the application manager.
2.1任务进程心跳消息2.1 Task process heartbeat message
任务与应用管理器基于心跳消息通信,心跳消息的内容包括任务的进度、任务的健康状况等信息。任务性能模型预测任务的完成时间基于任务运行过程中的统计数据——已处理的输入数量、已输出的数据量、任务输出率、读取输入数据的速率、map操作的速率和溢写输出数据的速率等信息。因为正常的心跳消息发送频率比较高,为了缓解应用管理器的压力,本发明在任务与应用管理器之间增加一种统计心跳消息,负责将统计数据发送给应用管理器。统计心跳消息只有任务的进度超过设定的阈值时才发送。The communication between the task and the application manager is based on the heartbeat message, and the content of the heartbeat message includes information such as the progress of the task and the health status of the task. The task performance model predicts the completion time of the task based on the statistics of the task running process - the number of processed inputs, the amount of output data, the task output rate, the rate of reading input data, the rate of map operation and the overflow of output data speed and other information. Because the sending frequency of normal heartbeat messages is relatively high, in order to alleviate the pressure of the application manager, the present invention adds a statistical heartbeat message between the task and the application manager, responsible for sending statistical data to the application manager. Statistical heartbeat messages are sent only when the progress of a task exceeds a set threshold.
算法1.sendHeartbeat算法.Algorithm 1. sendHeartbeat algorithm.
输入:发送统计心跳消息的最小任务进度,由用户设定;Input: the minimum task progress for sending statistical heartbeat messages, set by the user;
当前任务的进度;the progress of the current task;
输出:心跳消息Output: heartbeat message
步骤101:若任务进度超过设定的阈值,计算当前任务的统计数据(任务已处理的数量、已运行时间、输出数据量等),发送统计心跳消息,转步骤103;否则转步骤102;Step 101: If the task progress exceeds the set threshold, calculate the statistical data of the current task (the number of tasks processed, the elapsed running time, the amount of output data, etc.), send a statistical heartbeat message, and go to step 103; otherwise, go to step 102;
步骤102:任务的进度未超过设定阈值,发送任务健康心跳消息;Step 102: The progress of the task does not exceed the set threshold, and a task health heartbeat message is sent;
步骤103:任务收到应用管理器的心跳消息;Step 103: the task receives a heartbeat message from the application manager;
步骤104:若任务进程收到新任务,转步骤105;否则转106;Step 104: If the task process receives a new task, go to step 105; otherwise, go to step 106;
步骤105:将新任务的输入数据读取到当前节点。在当前任务执行完成后,运行新接收到的任务;Step 105: Read the input data of the new task to the current node. After the current task is executed, run the newly received task;
步骤106:若任务进程没有收到新任务,当前任务执行完成后任务进程释放任务占用的资源。Step 106: If the task process does not receive a new task, the task process releases the resources occupied by the task after the execution of the current task is completed.
算法1描述了任务进程发送心跳消息的过程,curTask为当前正在运行的任务,newTask为新接收的任务。在任务的进度超过设定的阈值时,计算任务已处理的数量、输出的数据量、读写速率等统计数据,然后向应用管理器发送统计心跳消息。任务进程在收到应用管理器反馈的心跳消息时,检查是否收到新任务。若任务进程收到新任务,将新任务的输入数据提前读取到当前节点。读取新任务输入数据的操作与正在运行任务并行执行,避免执行新任务时再读取数据。正在运行的任务完成后,继续执行新接收到的任务。若任务进程未接收到新任务,正在运行的任务运行完成后释放占用的资源。Algorithm 1 describes the process of the task process sending heartbeat messages, curTask is the currently running task, and newTask is the newly received task. When the progress of the task exceeds the set threshold, calculate the number of tasks processed, the amount of output data, the read and write rate and other statistical data, and then send a statistical heartbeat message to the application manager. When the task process receives the heartbeat message fed back by the application manager, it checks whether it has received a new task. If the task process receives a new task, it reads the input data of the new task to the current node in advance. The operation of reading the input data of the new task is executed in parallel with the running task to avoid reading data when the new task is executed. After the running task completes, continue with the newly received task. If the task process does not receive a new task, the running task will release the occupied resources after running.
2.2任务完成时间预测模型2.2 Task Completion Time Prediction Model
假如任务的运行过程可划分为多个子阶段,任务的完成时间与任务在子阶段消耗的时间密切相关,因此本发明根据任务在子阶段消耗的时间建立任务性能模型。任务的运行过程可以划分为多个子阶段,若子阶段之间不存在重叠,任务的完成时间为各个子阶段的时间之和;若子阶段之间存在重叠,要去除重叠的部分。假如任务i的运行过程分解为n个子阶段,每个子阶段处理的数据量用向量s表示,s=[s1,s1,…,sn],每个子阶段处理数据的速率用向量r表示,r=[r1,r1,…,rn]。任务i的完成时间Ti为:If the running process of the task can be divided into multiple sub-phases, the completion time of the task is closely related to the time consumed by the task in the sub-phases, so the present invention establishes a task performance model according to the time consumed by the task in the sub-phases. The running process of the task can be divided into multiple sub-phases. If there is no overlap between the sub-phases, the completion time of the task is the sum of the time of each sub-phase; if there is overlap between the sub-phases, the overlapping part should be removed. If the running process of task i is decomposed into n sub-stages, the amount of data processed by each sub-stage is represented by vector s, s=[s 1 , s 1 ,...,s n ], and the rate of data processed by each sub-stage is represented by vector r , r=[r 1 , r 1 , . . . , r n ]. The completion time T i of task i is:
其中,α为启动任务的时间,该值在固定的范围内变化,可以看做一个常量。Among them, α is the time to start the task, and this value changes within a fixed range, which can be regarded as a constant.
Map任务的运行过程分解为四个子阶段,分别为读取输入数据阶段,运行map操作阶段,输出数据溢写阶段,中间结果文件合并阶段,各阶段分别用read,map,spill,combine表示。根据公式(1)计算Map任务i的完成时间Ti:The running process of the Map task is decomposed into four sub-phases, namely, the phase of reading input data, the phase of running map operations, the phase of overflowing output data, and the phase of merging intermediate result files. Each phase is represented by read, map, spill, and combine. Calculate the completion time T i of Map task i according to formula (1):
对于短任务而言,中间结果文件合并阶段运行时间非常小,可以看做常量。读取输入数据阶段和map操作阶段处理的数量相同,为任务的输入数据量stask。由于map操作阶段与输出数据溢写阶段存在部分重叠,溢写阶段的数据量取map操作之后输出的数量,即最后一次溢写的数量。设已处理的输入数据量,为已输出的数据量,sbuffer配置项设定的缓存大小,为最后一次溢写的数据量,公式(2)演变为:For short tasks, the running time of the intermediate result file merging stage is very small and can be regarded as a constant. The number of processes processed in the phase of reading input data and the phase of map operation is the same, which is the amount of input data s task of the task . Since there is a partial overlap between the map operation phase and the output data overflow phase, the amount of data in the overflow phase is the amount output after the map operation, that is, the amount of the last overflow. Assume the amount of input data processed, For the amount of output data, the cache size set by the s buffer configuration item, is the amount of data written in the last overflow, formula (2) evolves into:
其中,stask为任务的输入数据量大小,β为常量,rread,rmap,rspill从任务端发送的统计心跳消息中获取。 为i阶段已处理的数据量,为在i阶段已消耗的时间。Among them, s task is the input data size of the task, β is a constant, r read , r map , and r spill are obtained from the statistical heartbeat message sent by the task end. is the amount of data processed in stage i, is the elapsed time in stage i.
当应用管理器收到任务端发送的统计心跳消息时,根据公式(3)可以计算当前任务的完成时间。假如任务k在节点w上运行,预测未调度的任务m在节点w运行的完成时间时,任务m各个子阶段的处理速率为
2.3二级任务调度器2.3 Secondary Task Scheduler
应用管理器收到任务发送的统计心跳消息后,基于任务性能模型预测任务的运行时间。若发送统计心跳消息的任务是短任务,从未调度的任务队列中选择合适的任务。本节描述选择未调度任务的过程,在选择任务时综合考虑任务本地性、集群的异构性和资源的公平性三个方面的问题。After receiving the statistical heartbeat message sent by the task, the application manager predicts the running time of the task based on the task performance model. If the task sending the statistical heartbeat message is a short task, select an appropriate task from the unscheduled task queue. This section describes the process of selecting unscheduled tasks, and comprehensively considers three aspects of task locality, cluster heterogeneity and resource fairness when selecting tasks.
问题1.任务的本地性。任务的运行时间是读取输入数据的时间和计算输入数据的时间之和,因此减少读取输入数据的时间能够缩短任务的运行时间。在集群中,本地磁盘的IO速率比网络的传输速率高,同一个机架内的网络带宽比机架之间的网络带宽大,所以在分配任务时要尽量将任务分配给靠近输入数据的节点。该策略不但能够减少数据拷贝的时间,而且也能缓解网络的负载。因此,二级任务调度器按照节点本地任务、机架本地任务、其他机架任务的顺序选择任务。Problem 1. Locality of tasks. The running time of a task is the sum of the time to read the input data and the time to calculate the input data, so reducing the time to read the input data can shorten the running time of the task. In the cluster, the IO rate of the local disk is higher than the transmission rate of the network, and the network bandwidth within the same rack is larger than the network bandwidth between racks, so when assigning tasks, try to assign tasks to nodes close to the input data . This strategy can not only reduce the time of data copy, but also relieve the load of the network. Therefore, the secondary task scheduler selects tasks in the order of node-local tasks, rack-local tasks, other rack tasks.
定义2.任务转移运行收益。设任务i在节点a的运行时间为在节点b的运行时间为任务i由在节点a运行转移到在节点b运行的收益为若为正值,说明任务i由节点a运行转移到节点b运行时间缩短;为任务转移运行的收益程度,值越大收益越高。若为负值,说明任务运行时间延长。Definition 2. Task transfer operation income. Let the running time of task i on node a be The running time at node b is The benefit of transferring task i from running on node a to running on node b is like is a positive value, indicating that the running time of task i transferred from node a to node b is shortened; The degree of income for task transfer operation, the larger the value, the higher the income. like A negative value indicates that the task running time is prolonged.
定义3.在某个节点运行的最优任务。在节点a运行的任务i满足(6)或(7),则称任务i为在节点a运行的最优任务。Definition 3. The optimal task to run on a certain node. Task i running on node a satisfies (6) or (7), then task i is called the optimal task running on node a.
其中,X为运行当前作业的节点集合,m为运行任务i时间最短的节点。(6)说明任务在a节点的运行时间最小,(7)说明任务i在运行时间最短的节点运行的收益小于设定的收益阈值。in, X is the set of nodes running the current job, and m is the node running task i for the shortest time. (6) Indicates that the running time of the task at node a is the minimum, and (7) indicates that the revenue of task i running on the node with the shortest running time is less than the set revenue threshold.
算法2.assignTask算法Algorithm 2. assignTask Algorithm
输入:未分配的任务集合;Input: set of unassigned tasks;
输出:将要调度的任务;Output: tasks to be scheduled;
步骤201:应用管理器收到任务进程发送的统计心跳消息,转步骤202;Step 201: the application manager receives the statistical heartbeat message sent by the task process, and proceeds to step 202;
步骤202:任务性能评估器预测发送心跳任务的完成时间,转步骤203;Step 202: The task performance evaluator predicts the completion time of the sending heartbeat task, then go to step 203;
步骤203:若发送心跳任务的完成时间小于设定的时间,则计算未调度任务在运行当前作业的全部节点上运行时间,转步骤204;Step 203: If the completion time of sending the heartbeat task is less than the set time, calculate the running time of unscheduled tasks on all nodes running the current job, and go to step 204;
步骤204:将未调度任务按照本地性分别加入到节点本地任务队列、机架任务队列和其他任务队列,转步骤205;Step 204: Add the unscheduled tasks to the node local task queue, the rack task queue and other task queues respectively according to the locality, and go to step 205;
步骤205:按照任务的运行时间对节点本地任务队列、机架任务队列和其他任务队列排序;转步骤206Step 205: sort the node local task queue, rack task queue and other task queues according to the running time of the task; go to step 206
步骤206:根据任务本地性优先级,判断未调度任务是否属于最优任务,若是最优任务则从未调度列表中删除,返回该任务;否则判断下一个未调度的任务,直到检查完全部未调度任务。Step 206: According to the local priority of the task, judge whether the unscheduled task belongs to the optimal task. If it is the optimal task, delete it from the unscheduled list and return the task; otherwise, judge the next unscheduled task until all unscheduled tasks are checked. Scheduling tasks.
问题2.集群的异构性。集群的异构性是指集群节点的硬件配置不一致,硬件主要指CPU,内存,磁盘三项。集群的异构性对任务的执行效率有重要的影响。同一个任务在性能高的节点执行与在性能低的节点执行相比,前者的运行时间与后者的运行时间差异巨大。二级任务调度器在选择未调度任务时,检查选择的任务对于某个节点是否为最优任务。若选择的任务为最优任务就执行该任务;否则跳过该任务,重新选择要执行的任务。Problem 2. Heterogeneity of clusters. The heterogeneity of the cluster refers to the inconsistency of the hardware configuration of the cluster nodes. The hardware mainly refers to CPU, memory, and disk. The heterogeneity of the cluster has an important impact on the execution efficiency of tasks. When the same task is executed on a high-performance node compared with a low-performance node, the running time of the former is significantly different from that of the latter. When selecting unscheduled tasks, the secondary task scheduler checks whether the selected task is the optimal task for a certain node. If the selected task is the optimal task, execute the task; otherwise, skip the task and re-select the task to be executed.
问题3.资源的公平性。资源的公平性是指各个作业公平的共享集群资源。由于集群是异构的,各节点的计算能力差异较大,要避免某个作业长时间占用性能高或性能低的节点资源。为了公平共享资源,由用户设定资源的最大的重用时间,资源的重用时间是相同的。为资源的最大的重用时间,为任务在节点x上的平均运行时间,资源的重用次数为任务在各个节点的平均运行时间不同,所以各节点中资源的重用次数不同,在性能高的节点中的资源重用次数多。Question 3. Fairness of resources. Resource fairness refers to the fair sharing of cluster resources by each job. Since the cluster is heterogeneous, the computing capabilities of each node vary greatly. It is necessary to avoid a job from occupying high-performance or low-performance node resources for a long time. In order to share resources fairly, the maximum resource reuse time is set by the user, and the resource reuse time is the same. is the maximum reuse time of resources, is the average running time of tasks on node x, and the resource reuse times are The average running time of tasks on each node is different, so the times of resource reuse in each node are different, and the times of resource reuse in nodes with high performance are more.
二级任务调度器分两步选择未调度的任务。第一步是按任务的本地性将未调度的任务分为节点本地任务,机架本地任务,其他机架任务。再按照本地性优先级顺序选择任务,优先选择每个优先级中运行时间最小的任务,见算法2。第二步是根据异构性原则选择最优任务,见算法3。The secondary task scheduler selects unscheduled tasks in two steps. The first step is to divide unscheduled tasks into node-local tasks, rack-local tasks, and other-rack tasks by task locality. Then select tasks according to the order of local priority, and select the task with the smallest running time in each priority priority, see Algorithm 2. The second step is to select the optimal task according to the principle of heterogeneity, see Algorithm 3.
算法2中,k为运行任务i的节点,为任务i使用资源的重用次数,dasknode为节点本地任务集合,Taskrack为机架本地任务集合,TaskoffRack为其他机架任务集合,为资源的最大重用时间,由用户设定。算法2描述了应用管理器收到心跳消息选择未调度任务的过程。首先判断发送心跳消息的任务是否为短任务,以及资源的重用次数是否超过最大值。然后将未调度的任务分为节点本地任务,机架本地任务,其他机架任务三组,最后按照本地性的优先级选择任务。In Algorithm 2, k is the node running task i, The resource reuse times for task i, dask node is the node local task collection, Task rack is the rack local task collection, Task offRack is the other rack task collection, It is the maximum resource reuse time, set by the user. Algorithm 2 describes the process that the application manager selects unscheduled tasks after receiving a heartbeat message. First determine whether the task sending the heartbeat message is a short task, and whether the resource reuse times exceed the maximum value. Then unscheduled tasks are divided into three groups: node local tasks, rack local tasks, and other rack tasks, and finally select tasks according to local priority.
算法3:selectOptimalTask算法.Algorithm 3: selectOptimalTask algorithm.
输入:运行当前作业的节点集合;Input: the set of nodes running the current job;
用户设定的最小收益;The minimum profit set by the user;
选择的任务;selected tasks;
运行当前任务的节点;the node running the current task;
输出:选择的任务是否为最优任务.Output: Whether the selected task is the optimal task.
步骤301:计算输入任务在运行当前作业的全节点上的运行时间,转步骤302;Step 301: Calculate the running time of the input task on all nodes running the current job, go to step 302;
步骤302:获取计算输入任务运行时间最短的节点,转步骤303;Step 302: Obtain the node with the shortest running time of the calculation input task, go to step 303;
步骤303:判断获取的节点与输入的节点是否相同,如果相同,返回任务是最优任务;否则转步骤304;Step 303: Determine whether the obtained node is the same as the input node, if they are the same, the returned task is the optimal task; otherwise, go to step 304;
步骤304:计算任务从输入节点转移到运行任务时间最短的节点上运行的收益,若收益超过设定的阈值,返回输入任务是最优任务;否则返回输入任务不是最优任务。Step 304: Calculate the revenue of transferring the task from the input node to the node with the shortest running time. If the revenue exceeds the set threshold, the returned input task is the optimal task; otherwise, the returned input task is not the optimal task.
算法3描述了选择的任务对当前节点而言是否为最优任务。首先,计算选择的任务在运行当前作业的其他节点上的运行时间,判断选择的任务在当前节点上的运行时间是否为最小。若选择的任务在节点m上的运行时间最小,计算任务在节点m上执行的收益。若任务的收益超过设定的阈值,则跳过选择的任务;否则在当前节点执行选择的任务。Algorithm 3 describes whether the selected task is the optimal task for the current node. First, calculate the running time of the selected task on other nodes running the current job, and judge whether the running time of the selected task on the current node is the minimum. If the running time of the selected task on node m is the minimum, calculate the income of the task executed on node m. If the income of the task exceeds the set threshold, the selected task is skipped; otherwise, the selected task is executed on the current node.
3实验与分析3 Experiment and Analysis
本发明基于Hadoop2.2.0实现短作业的优化,优化前的版本(ApacheHadoop)称为AH,优化后的版本称为SJH。我们用SJH与AH对比的方式评估短作业的优化效果。The present invention realizes the optimization of short jobs based on Hadoop2.2.0, the version before optimization (Apache Hadoop) is called AH, and the version after optimization is called SJH. We evaluate the optimization effect of short jobs by comparing SJH with AH.
实验集群由1个主节点和8个计算节点组成,节点的主要配置如表1所示。主节点和四个计算节点的使用配置一,其余四个计算节点使用配置二。节点分布在两个机架中,节点之间使用千兆网连接。HDFS的数据块大小为64M,数据块的副本数设置为3。集群使用Hadoop提供的FAIR调度器,运行Map任务需要1G内存和1个CPU的资源,Reduce任务和应用管理器分别需要1.5G内存和1个CPU的资源。The experimental cluster consists of 1 master node and 8 computing nodes. The main configuration of the nodes is shown in Table 1. Configuration 1 is used for the master node and four computing nodes, and configuration 2 is used for the remaining four computing nodes. The nodes are distributed in two racks, and the nodes are connected by gigabit network. The data block size of HDFS is 64M, and the number of copies of the data block is set to 3. The cluster uses the FAIR scheduler provided by Hadoop. Running the Map task requires 1G memory and 1 CPU resource. The Reduce task and the application manager require 1.5G memory and 1 CPU resource respectively.
表1集群节点配置信息Table 1 Cluster node configuration information
实验使用两个测试数据集,数据集一由Hadoop提供的randomtextwriter生成,数据集二为某省电力终端采集系统采集的用户用电数据。randomtextwriter用来生成设定数据量的数据集,数据集的内容由随机的单词组成。实验使用词频统计(wordcount),Hive的单表条件查询和Terasort作为基准测试程序。Hive是构建在HDFS之上的数据仓库,Hive提供的类SQL被转换为Hadoop作业。Hive的单表条件查询语句转换为Hadoop作业后,只有Map任务。词频统计用于统计每个单词在输入数据集中出现的频率,Terasort对输入数据集按字典顺序排序。The experiment uses two test data sets, the first data set is generated by the randomtextwriter provided by Hadoop, and the second data set is the user's electricity consumption data collected by a provincial electric power terminal collection system. randomtextwriter is used to generate a data set with a set amount of data, and the content of the data set consists of random words. The experiment uses word frequency statistics (wordcount), Hive's single-table conditional query and Terasort as benchmark programs. Hive is a data warehouse built on top of HDFS, and the SQL-like provided by Hive is converted into Hadoop jobs. After Hive's single-table conditional query statement is converted into a Hadoop job, there are only Map tasks. Word frequency statistics are used to count the frequency of each word in the input data set, and Terasort sorts the input data set in lexicographical order.
3.1任务性能模型的准确性3.1 Accuracy of task performance model
本发明首先测试任务性能模型预测正在运行的任务和未运行任务的准确性。使用词频统计程序作为基准测试程序,采用数据集一,数据量大小为2.5G,集群使用40个Map任务处理数据集。实验使用相对误差描述准确性,e为误差大小,为任务的预测完成时间,Ti为任务实际的完成时间。在任务的进度pmin分别为40%,60%,70%,80%时,预测正在运行任务完成时间的误差。The present invention first tests the accuracy of the task performance model in predicting running tasks and non-running tasks. Use the word frequency statistics program as the benchmark test program, use dataset 1, the data size is 2.5G, and the cluster uses 40 Map tasks to process the dataset. Experiments use relative error to describe accuracy, e is the size of the error, is the predicted completion time of the task, and T i is the actual completion time of the task. When the progress p min of the task is 40%, 60%, 70%, 80%, respectively, the error of predicting the completion time of the running task.
图4(a)描述了任务的进程取不同的值时,任务性能模型预测正在运行的任务的误差变化情况。由图4(a)看见,误差随着进度的增加而减小,而且误差趋于稳定。任务的进度超过60%,误差在20%以内;任务的进度超过70%时,误差在10%左右。图4(b)描述任务运行时间与误差之间的关系。误差随着任务运行时间的增加而增加,在任务的完成时间达到80秒时,误差超过20%。而且预测未运行任务的误差要高于预测正在运行任务的误差。任务性能模型使用短时间内的读速率、map操作处理速率和溢写速率计算任务的完成时间,而未考虑速率随时间的变化情况。因此,任务性能模型的预测误差会随着任务时间的增加而增大。从图4(b)中得知,任务完成时间低于30秒时预测误差小于15%,而短任务的平均完成时间为20秒,因此该预测模型满足实际需要。Figure 4(a) depicts how the task performance model predicts the error variation of a running task when the task's progress takes different values. It can be seen from Figure 4(a) that the error decreases with the increase of progress, and the error tends to be stable. When the progress of the task exceeds 60%, the error is within 20%; when the progress of the task exceeds 70%, the error is about 10%. Figure 4(b) depicts the relationship between task running time and error. The error increases with the running time of the task, and when the completion time of the task reaches 80 seconds, the error exceeds 20%. And the error in predicting non-running tasks is higher than the error in predicting running tasks. The task performance model uses the read rate, map operation processing rate, and overflow write rate in a short period of time to calculate the completion time of the task, without considering the change of the rate over time. Therefore, the prediction error of the task performance model will increase with the increase of task time. From Figure 4(b), we know that the prediction error is less than 15% when the task completion time is less than 30 seconds, and the average completion time of short tasks is 20 seconds, so the prediction model meets the actual needs.
3.2作业完成时间3.2 Job Completion Time
我们在短作业和长作业混合运行的情况下测试短作业优化效果,测试数据如表2所示。实验使用shell脚本轮询提交基准测试程序,每秒提交1个作业,所有作业在30秒内提交完成。集群提供的总容器数量为96个,因此每个作业的Map任务需要多轮才能调度完成。We test the short-job optimization effect when short-jobs and long-jobs are mixed, and the test data are shown in Table 2. The experiment uses a shell script to poll and submit the benchmark test program. One job is submitted per second, and all jobs are submitted within 30 seconds. The total number of containers provided by the cluster is 96, so the Map task of each job needs multiple rounds to be scheduled and completed.
图5(a)和图5(b)显示了短作业优化对作业完成时间的影响。词频统计作业运行时间缩短了6%-22%,HiveSQL转化的作业运行时间缩短了6%-27%。这个两种作业是短作业,Hive单表条件查询只有Map任务,其优化效果比词频统计作业更好。从图5(c)中可以看出,短作业优化对长作业没有显著的影响,作业最大的延长时间为5%。短任务优化操作使得短作业的任务抢占了资源,导致长作业完成时间有部分延长。Figure 5(a) and Figure 5(b) show the impact of short job optimization on job completion time. The running time of the word frequency statistics job is shortened by 6%-22%, and the running time of the HiveSQL conversion job is shortened by 6%-27%. These two jobs are short jobs. Hive single-table conditional query only has Map tasks, and its optimization effect is better than word frequency statistics jobs. From Figure 5(c), it can be seen that short job optimization has no significant impact on long jobs, and the maximum lengthening time of jobs is 5%. Short-task optimization makes short-job tasks seize resources, resulting in a partial extension of long-job completion time.
表2作业测试数据Table 2 job test data
3.3资源的利用率3.3 Resource utilization
本节关注计算节点的CPU和内存的利用率,基准测试程序和数据集如表2所示。资源的利用率使用Ganglia收集,最终的结果取在计算过程中各个资源的平均值。图6为计算节点的CPU利用情况,优化后的CPU利用率平均提高了4%-13%。基准测试程序中,词频统计和HiveSQL转换的作业属于计算密集型作业。N1、N2、N6和N7节点的CPU利用率提高的幅度大于N3、N4、N5和N8提高的幅度,因为前者是性能高的节点,后者是性能低的节点。图7描述了计算节点内存的变化,优化后内存的利用率提高了2.6%-6.18%。内存利用率提高的幅度不大,因为基准测试程序中只有terasort的输出数据比较大。实验测试结果表明,本发明提出的优化短作业方法在改善集群资源有效利用方面效果明显。This section focuses on the CPU and memory utilization of computing nodes, and the benchmark programs and data sets are shown in Table 2. The resource utilization is collected using Ganglia, and the final result is the average value of each resource during the calculation process. Figure 6 shows the CPU utilization of computing nodes, and the optimized CPU utilization increases by 4%-13% on average. In the benchmark test program, word frequency statistics and HiveSQL conversion tasks are computationally intensive. The increase in CPU utilization of nodes N1, N2, N6, and N7 is greater than that of N3, N4, N5, and N8, because the former are nodes with high performance, and the latter are nodes with low performance. Figure 7 describes the change of computing node memory, and the utilization rate of optimized memory is increased by 2.6%-6.18%. The increase in memory utilization is not large, because only terasort's output data is relatively large in the benchmark program. The experimental test results show that the optimized short job method proposed by the present invention has a significant effect in improving the effective utilization of cluster resources.
4总结4 Summary
由于Hadoop并未考虑短作业的特点,导致短作业在Hadoop中执行比较低效。针对上述挑战,本发明首先通过分析Hadoop中作业的执行过程,对短作业处理过程中存在的问题进行描述。然后,根据高负载情况下任务运行多轮的特点,提出一种基于资源重用的短作业优化机制——通过对已执行任务所释放资源的重用,减少资源在分配和回收过程中的浪费。本发明通过优化Map任务执行过程,减少短作业的运行时间。未来的工作将从Reduce任务的执行过程和任务调度方面来提高短作业的执行效率。Since Hadoop does not consider the characteristics of short jobs, the execution of short jobs in Hadoop is relatively inefficient. In view of the above challenges, the present invention first describes the problems existing in the short job processing process by analyzing the job execution process in Hadoop. Then, according to the characteristics of tasks running for multiple rounds under high load conditions, a short job optimization mechanism based on resource reuse is proposed to reduce the waste of resources in the process of allocation and recycling by reusing the resources released by the executed tasks. The present invention reduces the running time of short jobs by optimizing the Map task execution process. Future work will improve the execution efficiency of short jobs from the aspects of Reduce task execution process and task scheduling.
上述虽然结合附图对本发明的具体实施方式进行了描述,但并非对本发明保护范围的限制,所属领域技术人员应该明白,在本发明的技术方案的基础上,本领域技术人员不需要付出创造性劳动即可做出的各种修改或变形仍在本发明的保护范围以内。Although the specific implementation of the present invention has been described above in conjunction with the accompanying drawings, it does not limit the protection scope of the present invention. Those skilled in the art should understand that on the basis of the technical solution of the present invention, those skilled in the art do not need to pay creative work Various modifications or variations that can be made are still within the protection scope of the present invention.
Claims (10)
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN201610124760.2A CN105808334B (en) | 2016-03-04 | 2016-03-04 | A kind of short optimization of job system and method for MapReduce based on resource reuse |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN201610124760.2A CN105808334B (en) | 2016-03-04 | 2016-03-04 | A kind of short optimization of job system and method for MapReduce based on resource reuse |
Publications (2)
Publication Number | Publication Date |
---|---|
CN105808334A true CN105808334A (en) | 2016-07-27 |
CN105808334B CN105808334B (en) | 2016-12-28 |
Family
ID=56466747
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN201610124760.2A Active CN105808334B (en) | 2016-03-04 | 2016-03-04 | A kind of short optimization of job system and method for MapReduce based on resource reuse |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN105808334B (en) |
Cited By (19)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN106506255A (en) * | 2016-09-21 | 2017-03-15 | 微梦创科网络科技(中国)有限公司 | Method, device and system for stress testing |
CN106844027A (en) * | 2017-01-13 | 2017-06-13 | 广西电网有限责任公司电力科学研究院 | A kind of method for scheduling task based on node load |
CN106991070A (en) * | 2016-10-11 | 2017-07-28 | 阿里巴巴集团控股有限公司 | Real-time computing technique and device |
CN107391250A (en) * | 2017-08-11 | 2017-11-24 | 成都优易数据有限公司 | A kind of controller of raising Mapreduce task Shuffle performances |
CN107589985A (en) * | 2017-07-19 | 2018-01-16 | 山东大学 | A kind of two benches job scheduling method and system towards big data platform |
CN108733462A (en) * | 2017-04-18 | 2018-11-02 | 北京京东尚科信息技术有限公司 | The method and apparatus of delay task |
CN108874549A (en) * | 2018-07-19 | 2018-11-23 | 北京百度网讯科技有限公司 | resource multiplexing method, device, terminal and computer readable storage medium |
CN109117285A (en) * | 2018-07-27 | 2019-01-01 | 高新兴科技集团股份有限公司 | Support the distributed memory computing cluster system of high concurrent |
CN109416647A (en) * | 2016-12-07 | 2019-03-01 | 塔塔咨询服务有限公司 | The system and method for scheduler task and management computational resource allocation for closed-loop control system |
CN109697117A (en) * | 2017-10-20 | 2019-04-30 | 中国电信股份有限公司 | Terminal control method, device and computer readable storage medium |
CN110262896A (en) * | 2019-05-31 | 2019-09-20 | 天津大学 | A kind of data processing accelerated method towards Spark system |
CN110737521A (en) * | 2019-10-14 | 2020-01-31 | 中国人民解放军32039部队 | Disaster recovery method and device based on task scheduling center |
CN111274067A (en) * | 2018-12-04 | 2020-06-12 | 北京京东尚科信息技术有限公司 | Method and device for executing calculation task |
CN111475297A (en) * | 2018-06-27 | 2020-07-31 | 国家超级计算天津中心 | Flexible operation configuration method |
CN113391906A (en) * | 2021-06-25 | 2021-09-14 | 北京字节跳动网络技术有限公司 | Job updating method and device, computer equipment and resource management system |
CN113448719A (en) * | 2020-03-27 | 2021-09-28 | 北京沃东天骏信息技术有限公司 | Distributed task processing system |
CN114745606A (en) * | 2022-02-23 | 2022-07-12 | 江苏苏云信息科技有限公司 | Flexible industrial data acquisition system and method based on rule scheduling |
CN115904673A (en) * | 2023-03-09 | 2023-04-04 | 华南师范大学 | Cloud computing resource concurrent scheduling method, device, system, equipment and medium |
CN118295820A (en) * | 2024-06-06 | 2024-07-05 | 中国西安卫星测控中心 | Distributed task calculation scheduling method based on Map Reduce |
Citations (9)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN102945185A (en) * | 2012-10-24 | 2013-02-27 | 深信服网络科技(深圳)有限公司 | Task scheduling method and device |
CN103685492A (en) * | 2013-12-03 | 2014-03-26 | 北京智谷睿拓技术服务有限公司 | Dispatching method, dispatching device and application of Hadoop trunking system |
CN104657204A (en) * | 2013-11-22 | 2015-05-27 | 华为技术有限公司 | Short task processing method, device and operation system |
CN104933110A (en) * | 2015-06-03 | 2015-09-23 | 电子科技大学 | MapReduce-based data pre-fetching method |
CN105005503A (en) * | 2015-07-26 | 2015-10-28 | 孙凌宇 | Cellular automaton based cloud computing load balancing task scheduling method |
CN105117286A (en) * | 2015-09-22 | 2015-12-02 | 北京大学 | Task scheduling and pipelining executing method in MapReduce |
CN105138405A (en) * | 2015-08-06 | 2015-12-09 | 湖南大学 | To-be-released resource list based MapReduce task speculation execution method and apparatus |
CN105224612A (en) * | 2015-09-14 | 2016-01-06 | 成都信息工程大学 | Based on the MapReduce data Localization methodologies of dynamically labeled preferred value |
CN105302647A (en) * | 2015-11-06 | 2016-02-03 | 南京信息工程大学 | Optimization scheme of speculative execution strategy of backup task in MapReduce |
-
2016
- 2016-03-04 CN CN201610124760.2A patent/CN105808334B/en active Active
Patent Citations (9)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN102945185A (en) * | 2012-10-24 | 2013-02-27 | 深信服网络科技(深圳)有限公司 | Task scheduling method and device |
CN104657204A (en) * | 2013-11-22 | 2015-05-27 | 华为技术有限公司 | Short task processing method, device and operation system |
CN103685492A (en) * | 2013-12-03 | 2014-03-26 | 北京智谷睿拓技术服务有限公司 | Dispatching method, dispatching device and application of Hadoop trunking system |
CN104933110A (en) * | 2015-06-03 | 2015-09-23 | 电子科技大学 | MapReduce-based data pre-fetching method |
CN105005503A (en) * | 2015-07-26 | 2015-10-28 | 孙凌宇 | Cellular automaton based cloud computing load balancing task scheduling method |
CN105138405A (en) * | 2015-08-06 | 2015-12-09 | 湖南大学 | To-be-released resource list based MapReduce task speculation execution method and apparatus |
CN105224612A (en) * | 2015-09-14 | 2016-01-06 | 成都信息工程大学 | Based on the MapReduce data Localization methodologies of dynamically labeled preferred value |
CN105117286A (en) * | 2015-09-22 | 2015-12-02 | 北京大学 | Task scheduling and pipelining executing method in MapReduce |
CN105302647A (en) * | 2015-11-06 | 2016-02-03 | 南京信息工程大学 | Optimization scheme of speculative execution strategy of backup task in MapReduce |
Non-Patent Citations (1)
Title |
---|
顾荣 等: "Hadoop MapReduce短作业执行性能优化", 《计算机研究与发展》 * |
Cited By (27)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN106506255B (en) * | 2016-09-21 | 2019-11-05 | 微梦创科网络科技(中国)有限公司 | A kind of method, apparatus and system of pressure test |
CN106506255A (en) * | 2016-09-21 | 2017-03-15 | 微梦创科网络科技(中国)有限公司 | Method, device and system for stress testing |
CN106991070B (en) * | 2016-10-11 | 2021-02-26 | 创新先进技术有限公司 | Real-time computing method and device |
CN106991070A (en) * | 2016-10-11 | 2017-07-28 | 阿里巴巴集团控股有限公司 | Real-time computing technique and device |
CN109416647A (en) * | 2016-12-07 | 2019-03-01 | 塔塔咨询服务有限公司 | The system and method for scheduler task and management computational resource allocation for closed-loop control system |
CN106844027A (en) * | 2017-01-13 | 2017-06-13 | 广西电网有限责任公司电力科学研究院 | A kind of method for scheduling task based on node load |
CN108733462A (en) * | 2017-04-18 | 2018-11-02 | 北京京东尚科信息技术有限公司 | The method and apparatus of delay task |
CN107589985A (en) * | 2017-07-19 | 2018-01-16 | 山东大学 | A kind of two benches job scheduling method and system towards big data platform |
CN107391250A (en) * | 2017-08-11 | 2017-11-24 | 成都优易数据有限公司 | A kind of controller of raising Mapreduce task Shuffle performances |
CN109697117B (en) * | 2017-10-20 | 2021-03-09 | 中国电信股份有限公司 | Terminal control method, terminal control device and computer-readable storage medium |
CN109697117A (en) * | 2017-10-20 | 2019-04-30 | 中国电信股份有限公司 | Terminal control method, device and computer readable storage medium |
CN111475297A (en) * | 2018-06-27 | 2020-07-31 | 国家超级计算天津中心 | Flexible operation configuration method |
CN111475297B (en) * | 2018-06-27 | 2023-04-07 | 国家超级计算天津中心 | Flexible operation configuration method |
CN108874549A (en) * | 2018-07-19 | 2018-11-23 | 北京百度网讯科技有限公司 | resource multiplexing method, device, terminal and computer readable storage medium |
CN109117285B (en) * | 2018-07-27 | 2021-12-28 | 高新兴科技集团股份有限公司 | Distributed memory computing cluster system supporting high concurrency |
CN109117285A (en) * | 2018-07-27 | 2019-01-01 | 高新兴科技集团股份有限公司 | Support the distributed memory computing cluster system of high concurrent |
CN111274067A (en) * | 2018-12-04 | 2020-06-12 | 北京京东尚科信息技术有限公司 | Method and device for executing calculation task |
CN110262896A (en) * | 2019-05-31 | 2019-09-20 | 天津大学 | A kind of data processing accelerated method towards Spark system |
CN110737521A (en) * | 2019-10-14 | 2020-01-31 | 中国人民解放军32039部队 | Disaster recovery method and device based on task scheduling center |
CN110737521B (en) * | 2019-10-14 | 2021-03-05 | 中国人民解放军32039部队 | Disaster recovery method and device based on task scheduling center |
CN113448719A (en) * | 2020-03-27 | 2021-09-28 | 北京沃东天骏信息技术有限公司 | Distributed task processing system |
CN113391906A (en) * | 2021-06-25 | 2021-09-14 | 北京字节跳动网络技术有限公司 | Job updating method and device, computer equipment and resource management system |
CN113391906B (en) * | 2021-06-25 | 2024-03-01 | 北京字节跳动网络技术有限公司 | Job updating method, job updating device, computer equipment and resource management system |
CN114745606A (en) * | 2022-02-23 | 2022-07-12 | 江苏苏云信息科技有限公司 | Flexible industrial data acquisition system and method based on rule scheduling |
CN115904673A (en) * | 2023-03-09 | 2023-04-04 | 华南师范大学 | Cloud computing resource concurrent scheduling method, device, system, equipment and medium |
CN118295820A (en) * | 2024-06-06 | 2024-07-05 | 中国西安卫星测控中心 | Distributed task calculation scheduling method based on Map Reduce |
CN118295820B (en) * | 2024-06-06 | 2024-08-20 | 中国西安卫星测控中心 | Distributed task calculation scheduling method based on Map Reduce |
Also Published As
Publication number | Publication date |
---|---|
CN105808334B (en) | 2016-12-28 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN105808334B (en) | A kind of short optimization of job system and method for MapReduce based on resource reuse | |
CN107038069B (en) | Dynamic label matching DLMS scheduling method under Hadoop platform | |
US9442760B2 (en) | Job scheduling using expected server performance information | |
US8914805B2 (en) | Rescheduling workload in a hybrid computing environment | |
US8739171B2 (en) | High-throughput-computing in a hybrid computing environment | |
CN105117286B (en) | The dispatching method of task and streamlined perform method in MapReduce | |
CN102063336B (en) | A Distributed Computing Multi-Application Function Asynchronous Concurrent Scheduling Method | |
CN104915407B (en) | A kind of resource regulating method based under Hadoop multi-job environment | |
CN103473134B (en) | A kind of dependence task dispatching method of heterogeneous multi-nucleus processor | |
CN104298550B (en) | A kind of dynamic dispatching method towards Hadoop | |
CN111738434A (en) | Methods for executing deep neural networks on heterogeneous processing units | |
CN107291550B (en) | A Spark platform resource dynamic allocation method and system for iterative applications | |
JP2010079622A (en) | Multi-core processor system and task control method thereof | |
CN111104211A (en) | Method, system, device and medium for computing offloading based on task dependency | |
CN117707759A (en) | Multi-tenant GPU cluster elastic quota scheduling method and system | |
Dimopoulos et al. | Big data framework interference in restricted private cloud settings | |
Wolf et al. | On the optimization of schedules for MapReduce workloads in the presence of shared scans | |
US7844968B1 (en) | System for predicting earliest completion time and using static priority having initial priority and static urgency for job scheduling | |
CN116932201A (en) | Multi-resource sharing scheduling method for deep learning training task | |
CN115934362B (en) | Server-less perception computing cluster scheduling method and product for deep learning | |
CN104598311A (en) | Method and device for real-time operation fair scheduling for Hadoop | |
CN106897199B (en) | Batch job execution time prediction method based on big data processing framework | |
CN106201681A (en) | Task scheduling algorithm based on pre-release the Resources list under Hadoop platform | |
Henzinger et al. | Scheduling large jobs by abstraction refinement | |
CN116010064A (en) | Method, system and device for DAG job scheduling and cluster management |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
C06 | Publication | ||
PB01 | Publication | ||
C10 | Entry into substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
C14 | Grant of patent or utility model | ||
GR01 | Patent grant |