JP6940325B2 - Distributed processing system, distributed processing method, and distributed processing program - Google Patents
Distributed processing system, distributed processing method, and distributed processing program Download PDFInfo
- Publication number
- JP6940325B2 JP6940325B2 JP2017155083A JP2017155083A JP6940325B2 JP 6940325 B2 JP6940325 B2 JP 6940325B2 JP 2017155083 A JP2017155083 A JP 2017155083A JP 2017155083 A JP2017155083 A JP 2017155083A JP 6940325 B2 JP6940325 B2 JP 6940325B2
- Authority
- JP
- Japan
- Prior art keywords
- processing
- flow
- virtual machine
- parallel
- data
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Active
Links
Images
Landscapes
- Debugging And Monitoring (AREA)
Description
本発明は、分散処理システム、分散処理方法、及び分散処理プログラムに関する。 The present invention relates to a distributed processing system, a distributed processing method, and a distributed processing program.
近年、大量のデータを分析することにより新たな知見を得、これを活用していくというビッグデータ技術が注目されている。このような大量のデータを分析する手法としては、統計解析や機械学習等、様々な手法が存在する。さらに、データの整形といった他の手法を組み合わせることも行われている。これらの複数の手法を組み合わせる場合には、例えば、データの処理順序を分析フローに対応づけて予め決定しておき、前提となる処理が完了し次第、その処理の結果得られたデータを、次の処理の入力データとして用いることにより、最終的な結果を得ることができる。その結果を参照しても適当な知見が得られなければ、手法や条件を変更することで新たな分析フローを決定する。このように、大量のデータの分析は試行錯誤の繰り返しが必要となるため、データの処理量が多くなって分析に要する時間が長くなる場合が多い。 In recent years, big data technology, which obtains new knowledge by analyzing a large amount of data and utilizes it, has been attracting attention. As a method for analyzing such a large amount of data, there are various methods such as statistical analysis and machine learning. In addition, other techniques such as data shaping are being combined. When combining these multiple methods, for example, the data processing order is determined in advance in association with the analysis flow, and as soon as the prerequisite processing is completed, the data obtained as a result of the processing is displayed as follows. The final result can be obtained by using it as the input data of the processing of. If appropriate findings cannot be obtained by referring to the results, a new analysis flow is determined by changing the method and conditions. As described above, analysis of a large amount of data requires repeated trial and error, so that the amount of data processed is large and the time required for analysis is often long.
そこで一般的には、処理の時間を短縮するために、単一の計算機でデータを処理するのではなく、複数の計算機が分散して並列処理を行う。例えば、分析対象のデータを、分散ファイルシステム等を用いたデータレイクと呼ばれるシステムに格納しておく。そして、複数の計算機が、このデータレイクにおけるデータを複数の区間に分割して読み出し、読み出したデータを整形し又は分析し、分析したデータをデータレイク等に格納し、その内容をユーザに表示する。さらに、データの分析処理は一時的な処理であることから、構築が容易な仮想マシンシステムを活用する。すなわち、データの分析処理が必要なときに各仮想マシンを起動してこれらに分析処理を分散して実行させることで、当該分析処理に係るリソースの使用量を抑えることができる。 Therefore, in general, in order to shorten the processing time, instead of processing the data with a single computer, a plurality of computers are distributed to perform parallel processing. For example, the data to be analyzed is stored in a system called a data lake using a distributed file system or the like. Then, a plurality of computers divide the data in this data lake into a plurality of sections and read it, format or analyze the read data, store the analyzed data in the data lake or the like, and display the contents to the user. .. Furthermore, since the data analysis process is a temporary process, a virtual machine system that is easy to build will be used. That is, by starting each virtual machine when data analysis processing is required and causing them to perform the analysis processing in a distributed manner, it is possible to reduce the amount of resources used for the analysis processing.
仮想マシンにリソースを割り当てる際には、動作が不安定とならないようにするべく、予め定めた量を超えて各仮想マシンにリソース配分を行わないように配分を管理する手法が知られている。例えば、特許文献1には、計算機管理システムが、仮想マシンのうちで、リソースの平均使用率が予め決められているポリシー値を超えているものがあった場合に、ポリシー値を満たすように、ポリシー値を満たす他の仮想マシンからリソースを確保し、リソースを引き受ける仮想マシンのある物理サーバの余剰リソースより、前記確保したリソース容量が大きい場合、物理サーバのリソース容量を超えないようにリソース配分する変更指示を出すことが記載されている。
When allocating resources to virtual machines, there is known a method of managing allocation so that resource allocation is not performed to each virtual machine in excess of a predetermined amount so that the operation does not become unstable. For example, in
しかしながら、仮想マシン上で複数のデータ分析を実行する場合、データの分析内容が異なれば処理に利用するリソースの配分も異なってくるので、その結果、データ処理を効率よく行えない場合が生じる。特許文献1に記載の技術では、分析処理中、各データ分析の実行順序制約と物理サーバのリソース容量制約とを固定的に保持したままであるので、各データ分析の間でのリソースの配分も固定的となる。その結果、データ処理が不安定となり、処理の遅延が発生する等の問題が生じる可能性がある。
However, when a plurality of data analyzes are executed on a virtual machine, the allocation of resources used for processing differs depending on the data analysis content, and as a result, data processing may not be performed efficiently. In the technique described in
本発明はこのような現状に鑑みてなされたものであり、データを安定的に分散して処理することが可能な分散処理システム、分散処理方法、及び分散処理プログラムを提供することにある。 The present invention has been made in view of such a current situation, and an object of the present invention is to provide a distributed processing system, a distributed processing method, and a distributed processing program capable of stably distributing and processing data.
以上の課題を解決するための本発明の一つは、複数の仮想マシンを含んで構成され、複数の区画からなる所定のデータを前記区画ごとに処理する処理フローを複数備える所定の処理について、前記処理フローを前記複数の仮想マシンのうち少なくともいずれかに割り当てることにより前記処理フローを並列的に実行可能な、プロセッサ及びメモリを備える分散処理システムであって、前記区画のデータに対する前記複数の処理フローの処理順序を記憶するフローテーブル記憶部と、前記処理フローによる前記区画のデータの処理による前記仮想マシンに対する負荷を、前記区画ごとに算出する処理負荷算出部と、各前記処理フローの現在の実行状態、各前記処理フローの処理順序、及び各前記処理フローについて算出した前記負荷に基づき、並列的に実行される前記処理フロー、及び当該処理フローを実行する前記仮想マシンの組み合わせを決定するフロー管理部と、前記決定した組み合わせが示す並列的な処理を各前記仮想マシンに実行させるマシン制御部と、を備え、前記処理負荷算出部は、前記負荷として、前記処理フローの実行に係る予測時間を各前記処理フローについて算出し、前記フロー管理部は、並列的に実行される前記処理フローが複数ある場合、前記算出した予測時間に基づき、前記複数の処理フローのそれぞれに対して割り当てる前記仮想マシン又はその割り当てに関する優先度を決定し、前記並列的に実行される複数の処理フローのうち前記予測時間を算出していない前記処理フローがある場合には、前記複数の処理フローのそれぞれを実行する前記仮想マシンの数を互いに均等とする。
One of the present inventions for solving the above problems is a predetermined process including a plurality of virtual machines and including a plurality of process flows for processing predetermined data composed of a plurality of partitions for each partition. the processing flow of the processing flow can be executed in parallel by a assigned to at least one of the plurality of virtual machines, a distributed processing system having a processor and a memory, said plurality of processing on data of the partition A flow table storage unit that stores the processing order of flows , a processing load calculation unit that calculates the load on the virtual machine due to the processing of data in the partition by the processing flow for each partition, and the current processing load of each processing flow. execution state, the processing flow each of said processing flow of a processing order, and based on the load calculated for each said processing flow are executed in parallel, and flow to determine the combination of the virtual machine running the process flow A management unit and a machine control unit that causes each virtual machine to execute parallel processing indicated by the determined combination are provided, and the processing load calculation unit uses the load as an estimated time for executing the processing flow. Is calculated for each of the processing flows, and when there are a plurality of the processing flows executed in parallel, the flow management unit allocates the virtual to each of the plurality of processing flows based on the calculated predicted time. If there is a processing flow for which the estimated time has not been calculated among the plurality of processing flows executed in parallel by determining the priority regarding the machine or its allocation, each of the plurality of processing flows is executed. Make the number of virtual machines equal to each other .
本発明によれば、データを安定的に分散して処理することができる。 According to the present invention, data can be stably distributed and processed.
<システム構成>
図1は、本実施形態に係る分散処理システム100の構成の一例を示す図である。分散処理システム100は、所定のデータ(以下、対象データという)の変化を分析する処理(以下、分析処理という)を行う情報処理システムである。
<System configuration>
FIG. 1 is a diagram showing an example of the configuration of the
対象データは、例えば、温度、圧力、速度の時間変化のデータであり、大量に存在する
、いわゆるビッグデータ(Big Data)である。この対象データは複数のデータ区画からなり、例えば、所定の時間帯ごとに区切られたデータからなる。
The target data is, for example, data on changes in temperature, pressure, and velocity over time, and is so-called big data that exists in large quantities. This target data is composed of a plurality of data partitions, for example, data delimited by a predetermined time zone.
図1に示すように、分散処理システム100は、対象データを記憶するデータベースであるデータレイク104と、データレイク104から対象データを読み込んで分析処理を実行する少なくとも1台の仮想マシン103(いわゆる仮想サーバ)を備える仮想マシン実行サーバ102と、仮想マシン103のそれぞれに対して分析処理に関する指示を行う仮想マシン管理サーバ101と、分散処理システム100の管理者や使用者等(以下、ユーザという)が使用する、分析処理に関する指示や分析処理に関する情報の表示を行うユーザ操作端末105とを含んで構成されている。
As shown in FIG. 1, the
なお、仮想マシン管理サーバ101、仮想マシン実行サーバ102、データレイク104、及びユーザ操作端末105の各情報処理装置の間は、例えば、LAN(Local Area Network)、WAN(Wide Area Network)、インターネット、専用線等からなるネットワ
ーク108により通信可能に接続されている。
Between the information processing devices of the virtual
分析処理は、対象データを処理する複数の処理部(以下、分析フローともいう)を含んで構成されている。分析フローは、例えば、種々の統計解析の処理や、機械学習に係る処理等、ビッグデータを処理、分析するための様々な処理があり得る。 The analysis process includes a plurality of processing units (hereinafter, also referred to as analysis flows) for processing the target data. The analysis flow may include various processes for processing and analyzing big data, such as various statistical analysis processes and machine learning-related processes.
仮想マシン103は、分散処理システム100において少なくとも2台以上設けられており、分散処理システム100は、分析処理における各分析フロー(より具体的には、各データ区画におけるデータの処理)を複数の仮想マシン103のうち少なくともいずれかに割り当てることにより各分析フローを複数の仮想マシン103により並列的に実行可能である。
At least two or more
なお、図2は、分散処理システム100における各情報処理装置(仮想マシン管理サーバ101、仮想マシン実行サーバ102、データレイク104、及びユーザ操作端末105)が備えるハードウェア構成の一例を示す図である。同図に示すように、各情報処理装置は、CPU(Central Processing Unit)などのプロセッサ51と、RAM(Random Access Memory)、ROM(Read Only Memory)等の主記憶装置52と、HDD(Hard Disk
Drive)、SSD(Solid State Drive)等の補助記憶装置53と、キーボード、マウス
、タッチパネル等からなる入力装置54と、モニタ(ディスプレイ)等からなる出力装置55とを備える。
Note that FIG. 2 is a diagram showing an example of the hardware configuration included in each information processing device (virtual
なお、図1には示していないが、データレイク104に対して、対象データの時間変化を計測する情報処理装置(各種サーバ、センサ等)を接続し、これらの情報処理装置がその測定値(対象データ)をデータレイク104に送信して記憶させるものとしてもよい。
Although not shown in FIG. 1, an information processing device (various servers, sensors, etc.) that measures the time change of the target data is connected to the
次に、各情報処理装置が備える機能について説明する。
<機能>
図3は、各情報処理装置が備える機能の一例を説明する図である。
まず、仮想マシン管理サーバ101は、フローテーブル記憶部211、処理状態管理テーブル記憶部212、処理負荷算出部201、フロー管理部202、マシン管理テーブル記憶部213、及びマシン制御部203を備える。
Next, the functions provided by each information processing device will be described.
<Function>
FIG. 3 is a diagram illustrating an example of a function provided in each information processing device.
First, the virtual
フローテーブル記憶部211は、前記区画のデータに対する前記複数の処理部の処理順序を記憶する。
The flow
すなわち、フローテーブル記憶部211は、分析フローテーブル300を備える。分析
フローテーブル300の詳細は後述する。
That is, the flow
処理状態管理テーブル記憶部212は、各分析フローの現在の実行状態を処理状態管理テーブル400に記憶する。処理状態管理テーブル400の詳細は後述する。
The processing state management
マシン管理テーブル記憶部213は、前記情報処理装置(仮想マシン103)による前記処理部の並列的な実行に関する制約条件を記憶する。
The machine management
具体的には、前記マシン管理テーブル記憶部213は、前記制約条件として、並列的に前記処理部を実行可能な前記情報処理装置の最大数を記憶する。
Specifically, the machine management
すなわち、マシン管理テーブル記憶部213は、VM管理テーブル500を備える。VM管理テーブル500の詳細は後述する。
That is, the machine management
処理負荷算出部201は、前記処理部(分析フロー)が行う前記区画のデータの処理による前記情報処理装置(仮想マシン103)に対する負荷を、前記区画ごとに算出する。
The processing
具体的には、前記処理負荷算出部201は、前記負荷として、前記処理部の処理の実行に係る予測時間を各前記処理部について算出する。
Specifically, the processing
フロー管理部202は、各前記処理部(分析フロー)の現在の実行状態、各前記処理部の処理順序、及び処理負荷算出部201が各前記処理部について算出した前記負荷に基づき、並列的に実行される前記処理部、及び当該処理部を実行する前記情報処理装置の組み合わせを決定する。
The
具体的には、前記フロー管理部202は、前記制約条件を満たす前記情報処理装置を、前記処理部を並列的に実行する前記情報処理装置(仮想マシン103)として決定する。
Specifically, the
また、前記フロー管理部202は、並列的に実行される前記処理部(分析フロー)が複数ある場合、処理負荷算出部201が前記算出した予測時間に基づき、前記複数の処理部のそれぞれに対して割り当てる前記情報処理装置(仮想マシン103)又はその割り当てに関する優先度を決定する。
Further, when there are a plurality of the processing units (analysis flows) executed in parallel, the
また、前記フロー管理部202は、前記並列的に実行される複数の処理部のうち前記予測時間を算出していない前記処理部がある場合には、前記複数の処理部のそれぞれを実行する前記情報処理装置の台数を互いに均等とする。
Further, the
また、前記フロー管理部202は、並列的に実行される前記処理部を決定する際に、当該処理部が処理可能な前記データの区画が複数ある場合には、予め定められた、最初に処理される前記データの区画のみを前記処理部が処理することを決定する。
Further, when the
マシン制御部203は、フロー管理部202が前記決定した組み合わせが示す並列的な処理を各前記情報処理装置に実行させる。
The
次に、仮想マシン実行サーバ102は、仮想マシン103を動作させる。仮想マシン103は、送受信部204及び分析処理部205を備える。
Next, the virtual
送受信部204は、分析処理に関するデータの送受信を行う。分析処理部205は、種々の統計解析機能を備え、例えば、データの抽出、分析、及び記憶を行うことで分析処理における各分析フローを実行する。
The transmission /
データレイク104は、データ保存部206及びデータ読み書き部207を備える。データ保存部206は、対象データを記憶し、また、仮想マシン103からのデータの読み出し要求や書き込み要求に応じて、対象データを含む種々のデータの送受信を行う。データ読み書き部207は、対象データを含む各種データの読み出し及びデータの書き込みを行う。
The
データレイク104が記憶している対象データは、例えば、所定のフォーマットに従った複数のデータの集合である。例えば、対象データが時系列のデータである場合、対象データはその時刻又は時間帯に対応して前記の複数のデータ区画に分割されている。分析フローは、各データ区画のデータに対して所定の分析処理を行う。
The target data stored in the
なお、以下では、分析フローにおいて、あるデータ区画のデータを処理する処理を、区間フローという。 In the following, in the analysis flow, the process of processing the data of a certain data section is referred to as a section flow.
ユーザ操作端末105は、出力部208を備える。出力部208は、各前記情報処理装置が実行した前記処理部の処理の結果、又は前記処理部の処理により発生したデータの入出力量に関する情報を出力する。
The
次に、分散処理システム100が記憶している各テーブル(データベース)について説明する。
Next, each table (database) stored in the distributed
<分析フローテーブル>
図4は、分析フローテーブル300の一例を示す図である。同図に示すように、分析フローテーブル300は、分析フローの識別情報(以下、フロー名という)が格納されるフロー名611、フロー名611が示す分析フローに入力されるデータ(又はその種類)を特定する情報が格納される入力612、フロー名611が示す分析フローが実行する処理の種類を示す情報(例えば、統計解析や機械学習の種類に関する情報)が格納される処理方法613、及び、フロー名611が示す分析フローから出力されるデータ(又はその種類)を特定する情報が格納される出力614の各項目を有する、少なくとも1つ以上のレコードで構成されている。
<Analysis flow table>
FIG. 4 is a diagram showing an example of the analysis flow table 300. As shown in the figure, in the analysis flow table 300, the
なお、以下では、フロー名611が示す分析フローが分析処理において最初に実行される処理である場合に入力612に格納されるデータを、初期入力データという。また、フロー名611が示す分析フローが分析処理において最後に実行される処理である場合に出力614に格納されるデータを、最終出力データという。そして、それ以外の場合の入力612又は出力614に格納されるデータを、中間データという。
In the following, the data stored in the
同図の例では、「分析処理A」は最初からデータレイク104に保存されている対象データを読み込み(受信し)、「分析処理B」は「分析処理A」が出力した「中間データA」を読み込み(受信し)、「分析処理C」は「分析処理B」が出力した「中間データB」を読み込む(受信する)。「分析処理A」に入力されるデータは初期入力データであり、また、「分析処理A」と「分析処理B」が出力するデータは中間データであり、「分析処理C」が出力するデータは最終出力データである。
In the example of the figure, "analysis process A" reads (receives) the target data stored in the
このように、分析フローテーブル300は、各分析フローが送信又は受信するデータを特定する情報を記憶することで、分析処理における各分析フロー間の処理順序について規定している。なお、分析フローテーブル300の内容は、例えば、分析処理の前にユーザによって予め入力される。 As described above, the analysis flow table 300 defines the processing order between the analysis flows in the analysis process by storing the information that identifies the data transmitted or received by each analysis flow. The contents of the analysis flow table 300 are input in advance by the user, for example, before the analysis process.
<処理状態管理テーブル>
図5は、処理状態管理テーブル400の一例を示す図である。処理状態管理テーブル400は、分析フローテーブル300が規定する分析フローの処理順序を前提に、その分析フローにおける区間フローの実行状態について記憶している。
<Processing status management table>
FIG. 5 is a diagram showing an example of the processing state management table 400. The processing state management table 400 stores the execution state of the section flow in the analysis flow on the premise of the processing order of the analysis flow defined by the analysis flow table 300.
すなわち、処理状態管理テーブル400は、フロー名が格納されるフロー名621、フロー名621が示す分析フローにおける区間フローの識別情報(以下、区間名という)が格納される区間名622、区間名622が示す区間フローの現在の実行状態(例えば、実行中であるか(「実行中」)、実行が完了したか(「実行完了」)、実行されていないが実行が可能な状態であるか(「実行可能」)、又は、実行を開始するのに必要なデータが生成されていないため実行が不可能であるか(「実行不可」)等)を示す情報が格納される実行状態623、実行状態623に「実行完了」が格納されている場合に、その実行に要した時間(以下、実行時間という)が格納される処理時間624、及び、区間名622が示す区間フローの実行に要する時間の推定時間(予測時間)が格納される予測時間625の各項目を有する、少なくとも1つ以上のレコードで構成されている。なお、予測時間625には、処理時間624に実行時間が格納されていない場合(すなわち、区間名622が示す区間フローの実行が完了していない場合)に、予測時間が格納される。
That is, in the processing state management table 400, the
予測時間は、後述するように、例えば、過去に実行された他の区間フローの実行時間に基づいて算出される。 The predicted time is calculated based on, for example, the execution time of another section flow executed in the past, as will be described later.
同図の例では、「分析処理A」の区間フローである「区間A」は実行が完了しているため、処理時間624に「12分」が記録されている。また、「分析処理A」の「区間C」は、同じ種類の統計解析が行われた「区間A」の実行が完了していることから、前記と同じ「12分」が予測時間625に記録されている。なお、予測時間625には、各区間フローの間の予測時間の正確な比較のために、単一の仮想マシン103が当該区間フローを実行した場合の予測時間が格納される。
In the example of the figure, since the execution of "section A", which is the section flow of "analysis process A", has been completed, "12 minutes" is recorded in the
処理状態管理テーブル400の内容は、所定のタイミング、又は所定の時間間隔で更新される。 The contents of the processing state management table 400 are updated at a predetermined timing or at a predetermined time interval.
<VM管理テーブル>
図6は、VM管理テーブル500の一例を示す図である。同図に示すように、VM管理テーブル500は、仮想マシン実行サーバ102の識別情報(以下、実行サーバ名という)が格納される実行サーバ名711、実行サーバ名711が示す仮想マシン実行サーバ102における仮想マシン103に対して分析フロー(における区間フロー)を割り当てることが可能な仮想マシン103の最大の台数(以下、最大数という)を示す情報が格納される最大VM割当可能数712、及び、実行サーバ名711が示す仮想マシン実行サーバ102に現在割り当てられている仮想マシン103の台数(以下、現在台数という)を示す情報が格納される割当VM数713の各項目を有する、少なくとも1つ以上のレコードで構成される。
<VM management table>
FIG. 6 is a diagram showing an example of the VM management table 500. As shown in the figure, the VM management table 500 is used in the virtual
なお、VM管理テーブル500の割当VM数713の内容は、所定のタイミング(例えば、所定の時間間隔、仮想マシン103の起動時、又は仮想マシン103の停止時)にて更新される。その他の項目は、例えば、ユーザによって分析処理の実行前に入力される。
The content of the allocated
同図の例では、「実行サーバA」、「実行サーバB」、及び「実行サーバC」の3台の仮想マシン実行サーバ102が登録されており、全ての仮想マシン実行サーバ102の最大数が「4」である。
In the example of the figure, three virtual
以上に説明した各情報処理装置の機能は、各情報処理装置のハードウェアによって、もしくは、各情報処理装置のプロセッサ51が、主記憶装置52や補助記憶装置53に記憶されている各プログラムを読み出して実行することにより実現される。なお、このプログラムは、例えば、二次記憶デバイスや不揮発性半導体メモリ、ハードディスクドライブ、SSDなどの記憶デバイス、又は、ICカード、SDカード、DVDなどの、計算機で読み取り可能な非一時的データ記憶媒体に格納される。
The function of each information processing device described above is that the hardware of each information processing device or the
次に、分散処理システム100で行われる各処理について説明する。
<分析処理>
図7は、分散処理システム100が行う分析処理の一例を示すシーケンス図である。この処理は、例えば、ユーザ操作端末105から仮想マシン管理サーバ101に、実行する分析処理を指定する情報が入力された際に開始される。
Next, each process performed by the distributed
<Analysis processing>
FIG. 7 is a sequence diagram showing an example of analysis processing performed by the distributed
まず、仮想マシン管理サーバ101のフロー管理部202は、指定された分析処理において、仮想マシン103により並列的に実行する分析フロー(以下、対象フローという)及びその区間フロー(以下、対象区間フローという)を特定する処理(以下、フロー実行対象判定処理という)を実行する(S301)。この処理の詳細は後述する。
First, the
次に、フロー管理部202は、S301で特定した各対象フロー及び対象区間フローの情報を付帯させた、各仮想マシン103への処理の割り当ての指示を、マシン制御部203に送信する(S302)。
Next, the
マシン制御部203は、前記の指示を受信すると、各対象フローにおける各対象区間フローを、仮想マシン103のいずれに割り当てて並列処理を各仮想マシン103に実行させるかを決定する処理(以下、仮想マシン割り当て判定処理という)を実行する(S303)。この処理の詳細は後述する。なお、この処理の終了後、マシン制御部203はVM管理テーブル500を更新する。
Upon receiving the above instruction, the
マシン制御部203は、S303により処理を割り当てた各仮想マシン103に対して、当該仮想マシン103の起動の指示を送信する(S304)。そして、フロー管理部202は、S304により起動した各仮想マシン103に対して、当該仮想マシン103に割り当てられた対象区間フローの実行(並列処理の実行)の指示を、各仮想マシン103に送信する(S305)。
The
指示の送信を仮想マシン管理サーバ101から受信した各仮想マシン103の送受信部204は、分析処理部205に、対象区間フローを実行する指示を行う(S306)。
The transmission /
S306により指示を受けた各仮想マシン103の各分析処理部205は、自身に割り当てられた対象区間フローを実行する(S307)。
Each
例えば、ある分析処理部205は、データレイク104のデータ保存部206から対象フローの初期入力データを読み出して対象区間フローを実行する。また、他の分析処理部205は、中間データに基づき対象区間フローを実行する。また、さらに他の分析処理部205は、対象区間フローを実行して最終出力データを出力し、これをデータレイク104のデータ保存部206に送信する。
For example, a certain
なお、複数の仮想マシン103(分析処理部205)が同一の対象フローの同一の対象区間フローを並列して実行する場合、その各分析処理部205は、例えば、初期入力データ又は中間データを分割して当該分析処理部205に割り当て、各分析処理部205は割り当てられたデータに基づき、対象区間フローを並列して実行する。
When a plurality of virtual machines 103 (analysis processing unit 205) execute the same target section flow of the same target flow in parallel, each
各仮想マシン103の各分析処理部205は、自身に割り当てられた対象区間フローの実行を終えると、その旨の通知を送受信部204に送信する(S308)。
When each
各仮想マシン103の送受信部204は、前記の通知を分析処理部205から受信すると、対象区間フローの実行が終了した旨の通知を、仮想マシン管理サーバ101のフロー管理部202に送信する(S309)。
When the transmission /
フロー管理部202は、S305で実行の指示を送信した全ての仮想マシン103から前記の終了の通知を受信すると、マシン制御部203に、S302で指示した割り当てを解放する旨の指示を送信する(S310)。この指示を受信したマシン制御部203は、割り当ての対象となっていた全ての仮想マシン103の動作(実行)を停止させる(S311)。
When the
その後、マシン制御部203は、現在の各仮想マシン103の各分析フロー(区間フロー)の実行状態を各仮想マシン103から取得し、取得した状態に基づきVM管理テーブル500を更新する(S312)。具体的には、例えば、マシン制御部203は、VM管理テーブル500の割当VM数713の値を、現在、分析フロー(区間フロー)を実行している仮想マシン103の数で更新する。
After that, the
次に、フロー管理部202は、分析処理の実行状態に関する情報を生成して更新する(S313)。
Next, the
具体的には、まず、フロー管理部202は、各仮想マシン103が行った対象区間フローの実行時間を算出する。例えば、フロー管理部202は、S304の処理を行った時刻からS309の処理を行った時刻までの時間を実行時間として算出する。
Specifically, first, the
また、フロー管理部202は、処理負荷算出部201に、算出した対象区間フローの実行時間に基づき、未だ実行していない区間フローの予測時間を算出する旨を指示する。具体的には、例えば、処理負荷算出部201は、前記で実行時間を算出した対象区間フローと同じ種類の統計解析を行う、未実行の分析フローの各区間フローの予測時間を前記の実行時間と同じにする。なお、実行時間を算出した対象区間フローが複数の仮想マシン103で実行されていた場合には、これに基づく予測時間を、単一の仮想マシン103が処理を実行した場合の時間に変換する。
Further, the
フロー管理部202は、以上のようにして算出した実行時間及び予測時間を、処理状態管理テーブル400に記憶する。具体的には、処理状態管理テーブル記憶部212は、算出した各対象区間フローの実行時間を処理状態管理テーブル400の各レコードの処理時間624に格納し、各予測時間を処理状態管理テーブル400の各レコードの予測時間625に格納する。
The
以上のS301からS313までの処理が、全ての分析フローが実行を完了するまで繰り返される。以上で分析処理は終了する。 The above processes from S301 to S313 are repeated until all the analysis flows complete the execution. This completes the analysis process.
次に、前記のフロー実行対象判定処理及び仮想マシン割り当て判定処理の詳細を説明する。 Next, the details of the flow execution target determination process and the virtual machine allocation determination process will be described.
<フロー実行対象判定処理>
図8は、フロー実行対象判定処理の詳細を説明するフローチャートである。同図に示すように、フロー管理部202は、まず、現在実行可能な分析フローが1つだけであるか否
かを判断する(S401)。具体的には、例えば、フロー管理部202は、処理状態管理テーブル400の各レコードの実行状態623のうちで、「実行可能」が格納されているレコードの数を確認する。これにより、同時に実行可能なフローの数を、各分析フローの実行順序の制約を崩さずに判定できる。
<Flow execution target judgment processing>
FIG. 8 is a flowchart illustrating the details of the flow execution target determination process. As shown in the figure, the
現在実行可能な分析フローが1つだけである場合(S401:YES)、フロー管理部202は、その分析フローを対象フローとして記憶する(S402)。例えば、フロー管理部202は、その対象フローが記憶されている処理状態管理テーブル400のレコードを記憶する。その後はS410の処理が行われる。
When there is only one analysis flow that can be executed at present (S401: YES), the
他方、現在実行可能な分析フローが複数ある場合(S401:NO)、フロー管理部202は、その複数の全ての分析フローの予測時間が算出されているか否かを判断する(S403)。具体的には、例えば、フロー管理部202は、処理状態管理テーブル400における前記の複数の分析フローのそれぞれのレコードの予測時間625を参照し、予測時間が格納されているか否かを確認する。
On the other hand, when there are a plurality of analysis flows that can be executed at present (S401: NO), the
その複数の全ての分析フローの予測時間が算出されている場合は(S403:YES)、フロー管理部202は、分析フローを実行する仮想マシン103の台数又は優先度に関して適当な重み付けをする旨を、仮想マシン割当判定処理のために記憶すると共に、S401で実行可能とした全ての分析フロー(S403で予測時間が算出されている全ての分析フロー)を対象フローとして記憶し(S404)、その後はS410の処理が行われる。
When the predicted times of all the plurality of analysis flows have been calculated (S403: YES), the
例えば、フロー管理部202は、処理状態管理テーブル400のうち全ての実行状態623が「実行可能」である(「実行中」の区間がない)分析フローのレコードを対象フローとして記憶する。
For example, the
他方、予測時間が予測されていない分析フローがある場合は(S403:NO)、フロー管理部202は、分析フローを実行する仮想マシン103の台数又は優先度を各分析フローの間で均等にする旨を、仮想マシン割当判定処理のために記憶すると共に、S401で実行可能とした全ての分析フローを対象フローとして記憶し(S405)、その後はS410の処理が行われる。
On the other hand, when there is an analysis flow in which the predicted time is not predicted (S403: NO), the
S410において、まずフロー管理部202は、S404又はS405で実行を決定した各対象フローのうち一つを選択し、その対象フローにおいて実行可能な区間フローが複数存在するか否かを判定する(S406)。具体的には、例えば、フロー管理部202は、処理状態管理テーブル400における、前記で選択した対象フローにおける各区画のレコードの実行状態623を参照し、「実行可能」が格納されているレコードの数を確認する。
In S410, the
前記で選択した対象フローに実行可能な区間フローが複数存在する場合には(S406:YES)、フロー管理部202は、そのうち最先の区間フロー(最初に実行される、例えば対象データの時間が最も早い区間フロー)のみを、前記で選択した対象フローにおける対象区間フローとする(S407)。他方、前記で選択した対象フローに実行可能な区間が1つのみ存在する場合には(S406:NO)、フロー管理部202は、その1つの区間を、前記で選択した対象フローにおける対象区間フローとする(S408)。
When there are a plurality of executable section flows in the target flow selected above (S406: YES), the
このような処理を行うことで、一部の対象フローが他の対象フローと比較して実行可能な区間フローが多い場合に、一部の対象フローに区間フローが多く割り当てられてしまい、各対象フローの間で並列処理を平準化できなくなることを防ぐことができる。 By performing such processing, when some target flows have more executable section flows than other target flows, many section flows are assigned to some target flows, and each target flows. It is possible to prevent the parallel processing from being unable to be leveled between the flows.
フロー管理部202は、以上のS406、S407、及びS408の処理を全ての対象フローについて繰り返す。以上でフロー実行対象判定処理は終了する(S409)。
The
<仮想マシン割り当て判定処理>
図9は、仮想マシン割当判定処理の詳細を示すフローチャートである。まず、マシン制御部203は、フロー実行対象判定処理で決定した各対象区間フローを割り当て可能な仮想マシン103が存在するか否かを判定する(S501)。具体的には、例えば、マシン制御部203は、VM管理テーブル500の各レコードの割当VM数713及び最大VM割当可能数712を参照して判断する。
<Virtual machine allocation judgment processing>
FIG. 9 is a flowchart showing details of the virtual machine allocation determination process. First, the
対象区間フローを割り当て可能な仮想マシン103が存在しない場合は(S501:NO)、マシン制御部203は、仮想マシン割り当て判定処理を終了する(S507)。他方、対象区間フローを割り当て可能な仮想マシン103が存在する場合は(S501:YES)、S502の処理が行われる。
If there is no
S502においてマシン制御部203は、フロー実行対象判定処理で決定した対象フローの数(以下、対象フロー数という)が1か、もしくは1より大きい(2以上)か否かを確認する(S502)。対象フロー数が1の場合は(S502:NO)、マシン制御部203は、その対象フローの処理を、S501で特定した割り当て可能な仮想マシン103の全てに割り当てる旨を記憶し(決定し)(S503)、仮想マシン割り当て判定処理は終了する(S507)。
In S502, the
他方、対象フロー数が2以上の場合は(S502:YES)、マシン制御部203はS504の処理を実行する。
On the other hand, when the number of target flows is 2 or more (S502: YES), the
すなわち、S504においてマシン制御部203は、重み付けにより分析フローを実行する旨がフロー実行対象判定処理で記憶されていた場合には(S504:重み付け)、各対象フローにおける対象区間フローの予測時間に基づき、各対象フローにおける対象区間フローの終了時刻が互いに概ね同じになるように、各対象フローにおける対象区間フローを割り当てる仮想マシン103を決定する(S505)。その後、仮想マシン割り当て判定処理は終了する(S507)。
That is, in S504, when the fact that the analysis flow is executed by weighting is stored in the flow execution target determination process (S504: weighting), the
この割り当ては、例えば、マシン制御部203は、予測時間の逆数を各対象フローにおける対象区間フローについて算出し、この各逆数に対応する比率にて、各対象区間フローを割り当てる仮想マシン103の台数を決定するといった方法で実行される。なお、適当な台数が算出できない場合(例えば、自然数としての台数が算出できない場合)は、マシン制御部203は、各仮想マシン103に処理の優先度(例えば、CPUやメモリのリソース配分量)を設定することで、各対象フローにおける対象区間フローを割り当てる。
For this allocation, for example, the
他方、分析フローを均等割り付けにより実行する旨がフロー実行対象判定処理で記憶されていた場合には(S504:均等割り)、マシン制御部203は、各対象フローにおける対象区間フローの処理を実行する仮想マシン103の台数が互いに概ね同数になるように、各対象フロー(対象区間フロー)を割り当てる仮想マシン103を決定する(S506)。その後、仮想マシン割り当て判定処理は終了する(S507)。
On the other hand, when it is stored in the flow execution target determination process that the analysis flow is to be executed evenly (S504: equal allocation), the
以上のような仮想マシン割り当て判定処理により、複数の対象区間フローを同時に並列して実行することができ、かつ、複数の対象区間フローの実行に要する時間を互いに同じ時間に調整することができる。これにより、一部の分析フローのみが長時間引き続き実行されることを防止することができる。 By the virtual machine allocation determination process as described above, a plurality of target section flows can be executed in parallel at the same time, and the time required for executing the plurality of target section flows can be adjusted to the same time. This makes it possible to prevent only a part of the analysis flow from being continuously executed for a long time.
以上のようにして実行される分散処理の経過又は結果は、ユーザ操作端末105等に表示される。
The progress or result of the distributed processing executed as described above is displayed on the
<ユーザ操作端末105による表示例>
図10は、ユーザ操作端末105が表示する、分析処理の経過又は結果を示す画面の一例である。同図に示すように、この表示画面1000には、分析処理の実行状態又は実行結果を示すテーブル1010が表示される。このテーブル1010には、仮想マシン実行サーバ102における各仮想マシン103ごとに、各分析フローにおける各区間フローの現在の処理状況1011(又はその処理結果)が時系列に沿って表示される。そして、この処理状況1011には、その処理により発生したI/O量も表示される。また、分析処理によって発生したI/O量の合計1012が、各時間帯ごとに(各データ区画ごとに)表示される。
<Display example by
FIG. 10 is an example of a screen displayed by the
このように、表示画面1000には、各仮想マシン103による分析フローの結果の情報、又はその分析フローにより発生したデータの入出力量に関する情報が出力されるので、ユーザは、分散処理システム100により分析フローが順次並列処理されていることや、これによりI/Oに係る負荷が分散され、安定して分散処理が実行されていることを確認することができる。
In this way, the information on the result of the analysis flow by each
以上のように、本実施形態の分散処理システム100によれば、対象データの区画データに対する処理部(分析フロー)の処理順序を記憶し、各処理部(分析フロー)が行う対象データの各区画データの処理による情報処理装置(仮想マシン103)に対する負荷を、そのデータ区画ごとに算出し、そして、各処理部の現在の実行状態、各処理部の処理順序、及び各処理部について算出した負荷に基づき、並列的に実行される処理部及びその処理部を実行する情報処理装置の組み合わせを決定し、これらを各情報処理装置に並列的に実行させる。このように、本実施形態の分散処理システム100は、対象データの各データ区画に係る処理の負荷に応じて、分析フローの処理を各仮想マシン103にデータ区画単位で割り当てることができる。これにより、対象データの処理をデータ区画単位で各仮想マシン103に分散させて処理することができ、対象データを安定的に分散して処理することができる。
As described above, according to the distributed
この効果について具体的に説明すると、以下のようになる。 A specific explanation of this effect is as follows.
まず、図11は、従来の分散処理システムにおいて分析処理を実行した場合における、仮想マシン実行サーバ102及びデータレイク104のハードウェアリソースの使用状況の時系列変化の一例を示す図である。
First, FIG. 11 is a diagram showing an example of time-series changes in the usage status of the hardware resources of the virtual
この分析処理は、分析フローである分析処理Aの生成した中間データを用いて分析処理B(分析フロー)を実行し、分析処理Bの生成した中間データを用いて分析処理C(分析フロー)を実行し、これにより最終出力データを出力するものである。すなわち、分析処理Aの実行完了後、分析処理Bが実行され、その実行完了後、分析処理Cが実行される。 In this analysis process, analysis process B (analysis flow) is executed using the intermediate data generated by analysis process A, which is an analysis flow, and analysis process C (analysis flow) is performed using the intermediate data generated by analysis process B. It executes and outputs the final output data. That is, after the execution of the analysis process A is completed, the analysis process B is executed, and after the execution is completed, the analysis process C is executed.
この場合、各仮想マシン103に対する処理の割り当ての時系列変化は以下の通りである。まず、全ての仮想マシン103が分析処理Aの区間Aを実行し(符号801)、その処理の終了後、次の区間(分析処理Aの区間B)の処理が実行される(符号802)。そして、分析処理Aの全区間の処理の実行が終了後、次の分析フローである分析処理Bの区間Aの処理が実行され(符号803)、その後、分析処理Bの分析フローの全区間の処理の実行が完了する。そして、その次の分析フローである分析処理Cの区間Aの処理が実行され(符号804)、その後、分析処理Cの全区間の処理が実行される。
In this case, the time-series changes in the processing allocation to each
このような処理が実行されると、分析処理Aの実行中におけるデータレイク104へのデータの単位時間あたりの読み書き量811と比べ、分析処理Bの実行中におけるデータレイク104へのデータの単位時間あたりの読み書き量812の方が少なくなるため、データレイクのI/O(Input/Output)に係るリソースの使用量は、分析処理Bにおいて余裕があり、I/Oに係るリソースが余剰となる。
When such a process is executed, the unit time of the data to the
他方で、分析処理Cを実行中のデータレイクへの単位時間あたりの読み書き量813は、分析処理Bの実行中におけるデータレイク104へのデータの単位時間あたりの読み書き量812よりも多くなるため、分析処理Cにおけるデータレイク104への読み書きに遅延が発生し、分析処理の実行時間が長くなる可能性がある。すなわち、I/Oに係るリソースが不足する状態となる。
On the other hand, the read /
このように、従来の分散処理システムでは、単に各分析フローの間の実行順序に基づいて処理を行うため、各分析フローのI/O量が処理の種類によって大きく変化する結果、その際に処理上の遅延が発生し、分析処理の効率が低下する可能性がある。特に、各分析フロー間でのI/O量の違いが大きく異なると、実行する分析フローが切り替わる際に予期しない大きなボトルネックが発生して処理が大幅に遅延し、処理の安定性、信頼性を損ねることになる。 In this way, in the conventional distributed processing system, processing is performed simply based on the execution order between each analysis flow, and as a result, the amount of I / O of each analysis flow changes greatly depending on the type of processing, and the processing is performed at that time. The above delay may occur and the efficiency of the analysis process may be reduced. In particular, if the difference in the amount of I / O between each analysis flow is significantly different, an unexpectedly large bottleneck will occur when the analysis flow to be executed is switched, and the processing will be significantly delayed, resulting in processing stability and reliability. Will hurt.
一方、図12は、本実施形態の分散処理システム100において分析処理を実行した場合における仮想マシン実行サーバ102及びデータレイク104のハードウェアリソース使用状況の時系列変化の一例を示す図である。
On the other hand, FIG. 12 is a diagram showing an example of time-series changes in the hardware resource usage status of the virtual
本実施形態の分散処理システム100は、各区間のデータ処理が終わるごとに、対象フロー及び対象区間フローを決定し、これらの処理を、適当な配分割合にて(例えば、各区間での処理が概ね同時に終了するように)仮想マシン103に割り当てる(図7のS301及びS303)。
The distributed
すなわち、最初に行われたフロー実行対象判定処理においては、対象フローが分析処理Aのみと判定され、さらに、先頭のデータ区画に対応する区間フローのみが対象区間フローであると判定される結果(図8のS407)、分析処理Aの区間Aのみが実行される(符号901)。 That is, in the flow execution target determination process performed first, the target flow is determined to be only the analysis process A, and further, only the section flow corresponding to the first data partition is determined to be the target section flow ( In S407) of FIG. 8, only the section A of the analysis process A is executed (reference numeral 901).
そして、次に行われるフロー実行対象判定処理においては、まず、対象フローが分析処理A及び分析処理Bの複数の分析フローと判定され、さらに、分析処理Bの予測時間が算出されていないため均等割付と判定される結果(図8のS405)、分析処理Aの区間Bの処理、及び分析処理Bの区間Aの処理が均等に(同じ台数の)各仮想マシン103に割り当てられ、これらの処理が並列して実行される(符号902、符号903)。
Then, in the flow execution target determination process to be performed next, the target flow is first determined to be a plurality of analysis flows of the analysis process A and the analysis process B, and further, the predicted time of the analysis process B is not calculated, so that the flow is equal. The result determined to be allocation (S405 in FIG. 8), the process of the section B of the analysis process A, and the process of the section A of the analysis process B are evenly assigned to each virtual machine 103 (of the same number), and these processes. Are executed in parallel (
さらに、その次に行われるフロー実行対象判定処理においては、まず、対象フローが、前記の分析処理A(の区間Bの処理)及び分析処理B(の区間Aの処理)に加えて分析処理Cである旨判定され、その分析処理Cの対象区間フローが区間Aと判定される結果、分析処理Aの区間Bの処理、分析処理Bの区間Aの処理、及び分析処理Cの区間Aの処理が並列して実行される(符号904)。 Further, in the flow execution target determination process to be performed next, first, the target flow is analyzed process C in addition to the analysis process A (process of section B) and analysis process B (process of section A). As a result of determining that the target section flow of the analysis process C is the section A, the process of the section B of the analysis process A, the process of the section A of the analysis process B, and the process of the section A of the analysis process C. Are executed in parallel (reference numeral 904).
このように、分析フローにおける各区間の処理が並列して実行することにより、分析フローに係る処理が、分析フロー単位だけなく、さらにその区間単位でも分散されるため、前記した従来の分散処理システムと比較して、分析処理がより安定的に分散され、処理に係る負荷も分散される。すなわち、リソースの分散による安定化のみならず、時間軸方向
の処理の分散による安定化が実現される。
In this way, by executing the processing of each section in the analysis flow in parallel, the processing related to the analysis flow is distributed not only in the analysis flow unit but also in the section unit. Therefore, the conventional distributed processing system described above. The analysis process is more stably distributed, and the load related to the process is also distributed. That is, not only stabilization by distribution of resources but also stabilization by distribution of processing in the time axis direction is realized.
さらに、本実施形態の分散処理システム100では、分析フローは、処理するデータの時系列(データ区画)に応じて複数の区間フローに分割されているので、CPU等の演算処理に係る負荷だけでなく、入出力処理(I/O)に係る負荷も時間軸方向に分散されることになる。
Further, in the distributed
例えば、最初に行われたフロー実行対象判定処理では分析処理Aのみが実行されるため、分析処理Aの実行に係る、データレイク104への単位時間あたりのデータの読み書き量911(以下、単位I/O量という)が発生する(符号911)。
For example, since only the analysis process A is executed in the first flow execution target determination process, the amount of data read / written to the
そして、2回目のフロー実行対象判定処理においては、分析処理Aを単独で実行している場合、及び分析処理Bを単独で実行している場合の間の中間程度の単位I/O量が発生することになる(符号912)。さらに、3回目のフロー実行対象判定処理においては、分析処理Aを単独で実行している場合、分析処理Bを単独で実行している場合、及び分析処理Cを単独で実行している場合の間の三者の中間程度の単位I/O量が発生することになる(符号913)。 Then, in the second flow execution target determination process, an intermediate unit I / O amount is generated between the case where the analysis process A is executed independently and the case where the analysis process B is executed alone. (Code 912). Further, in the third flow execution target determination process, the case where the analysis process A is executed independently, the case where the analysis process B is executed independently, and the case where the analysis process C is executed independently. An intermediate unit I / O amount of the three parties will be generated (reference numeral 913).
このように、本実施形態の分散処理システム100によれば、データレイク104への単位時間あたりのデータの読み書き量が、前記した通常の分散処理システムの場合と比較して、時間軸方向により詳細に平準化される。これにより、例えば、データレイク104へのデータの読み書きが分析処理におけるボトルネックとなることを防ぎ、分析処理を安定して行うことができるようになる。
As described above, according to the distributed
すなわち、本実施形態の分散処理システム100によれば、分析処理におけるリソースとして、単にプロセッサに係る負荷だけでなく、I/Oに係る負荷も平準化させる結果、分散処理システム100における処理負荷全体を安定化させることができる。
That is, according to the distributed
以上の効果に加えて、本実施形態の分散処理システム100は、情報処理装置(仮想マシン103)による処理部(分析フロー)の並列的な実行に関する制約条件を記憶し(VM管理テーブル500)、その制約条件を満たす仮想マシン103を、処理部を並列的に実行する仮想マシン103として決定するので、不可抗力的な仮想マシン103の制約の範囲内で、分散処理及びこれによるデータ処理を安定的に行うことができる。
In addition to the above effects, the distributed
例えば、制約条件として、並列的に分析フローの処理を実行可能な仮想マシン103の最大数を記憶することで、例えば仮想マシン実行サーバ102や仮想マシン103のハードウェアないしソフトウェアの仕様を満たした、安定的な分散処理及びデータ処理を行うことができる。
For example, by storing the maximum number of
また、本実施形態の分散処理システム100は、情報処理装置(仮想マシン実行サーバ102又は仮想マシン103)に対する負荷として、処理部(分析フロー)の処理の実行に係る予測時間を各処理部について算出し、この予測時間に基づき、処理部の部分の処理を並列的に実行させる各情報処理装置の配分(処理の優先度等)を決定するので、仮想マシン実行サーバ102又は仮想マシン103の実際の稼動実績に基づき合理的な処理の分配を行うことができる。これにより、安定的な分散処理及びデータ処理を行うことができる。
Further, the distributed
また、本実施形態の分散処理システム100は、並列的に実行する複数の処理部(分析フロー)のうち予測時間を算出していない処理部がある場合には、複数の処理部のそれぞれを実行する情報処理装置(仮想マシン103)の台数を互いに均等とするので、例えば
ある仮想マシン103の稼動実績が少なく分析フローの処理時間が不明又は不正確である場合であっても、合理的に分散処理を行うことができる。
Further, the distributed
また、本実施形態の分散処理システム100は、並列的に実行する処理部(分析フロー)を決定する際に、当該処理部において処理可能なデータの区画が複数ある場合には、処理順序が最先である区画のデータのみを処理するので、実行可能な区間フローの数が分析フローの間で異なる場合であっても、一部の分析フローのみが偏って実行され、並列処理を平準化できなくなることを防ぐことができる。
Further, in the distributed
以上の実施形態の説明は、本発明の理解を容易にするためのものであり、本発明を限定するものではない。本発明はその趣旨を逸脱することなく、変更、改良され得ると共に本発明にはその等価物が含まれる。 The above description of the embodiment is for facilitating the understanding of the present invention, and does not limit the present invention. The present invention can be modified and improved without departing from the spirit of the present invention, and the present invention includes its equivalents.
例えば、本実施形態では、分析処理を行う主体は複数の仮想マシン103としたが、複数の物理サーバがこれらの分析処理を行うものとしてもよい。本実施形態の分散処理システム100は、いずれのタイプの情報処理装置による分散処理にも対応することができるので、ハードウェアリソース及びソフトウェアリソースを効率的に使用することができる。
For example, in the present embodiment, the main body that performs the analysis process is a plurality of
例えば、本実施形態では、データの記憶形式としてテーブル(データベース)を示したが、データの記憶形式はデータベースに限られず、いかなる方法であってもよい。 For example, in the present embodiment, a table (database) is shown as a data storage format, but the data storage format is not limited to the database and may be any method.
また、本実施形態では、仮想マシン実行サーバ102は複数台あるものとしたが、1台の仮想マシン実行サーバ102のみを設け、この仮想マシン実行サーバ102に複数の仮想マシン103を設けてもよい。
Further, in the present embodiment, it is assumed that there are a plurality of virtual
100 分散処理システム、101 仮想マシン管理サーバ、102 仮想マシン実行サーバ、103 仮想マシン、104 データレイク、211 フローテーブル記憶部、201 処理負荷算出部、202 フロー管理部、203 マシン制御部 100 Distributed processing system, 101 Virtual machine management server, 102 Virtual machine execution server, 103 Virtual machine, 104 Data lake, 211 Flow table storage unit, 201 Processing load calculation unit, 202 Flow management unit, 203 Machine control unit
Claims (10)
前記区画のデータに対する前記複数の処理フローの実行順序を記憶するフローテーブル記憶部と、
前記処理フローによる前記区画のデータの処理による前記仮想マシンに対する負荷を、前記区画ごとに算出する処理負荷算出部と、
各前記処理フローの現在の実行状態、各前記処理フローの実行順序、及び各前記処理フローについて算出した前記負荷に基づき、並列的に実行される前記処理フロー、及び当該処理フローを実行する前記仮想マシンの組み合わせを決定するフロー管理部と、
前記決定した組み合わせが示す並列的な処理を各前記仮想マシンに実行させるマシン制御部と、
を備え、
前記処理負荷算出部は、前記負荷として、前記処理フローの実行に係る予測時間を各前記処理フローについて算出し、
前記フロー管理部は、
並列的に実行される前記処理フローが複数ある場合、前記算出した予測時間に基づき、前記複数の処理フローのそれぞれに対して割り当てる前記仮想マシン又はその割り当てに関する優先度を決定し、
前記並列的に実行される複数の処理フローのうち前記予測時間を算出していない前記処理フローがある場合には、前記複数の処理フローのそれぞれを実行する前記仮想マシンの数を互いに均等とする、
分散処理システム。 Is configured to include a plurality of virtual machines, for a given process in which a plurality comprises a processing flow for processing predetermined data comprising a plurality of compartments in each of the compartments, the processing flow to at least one of the plurality of virtual machines A distributed processing system equipped with a processor and a memory capable of executing the processing flow in parallel by allocating.
A flow table storage unit that stores the execution order of the plurality of processing flows for the data in the partition, and
A processing load calculation unit that calculates the load on the virtual machine by processing the data of the partition according to the processing flow for each partition,
The processing flow executed in parallel based on the current execution state of each processing flow , the execution order of each processing flow , and the load calculated for each processing flow , and the virtual machine that executes the processing flow. The flow management department that decides the combination of machines and
A machine control unit that causes each virtual machine to execute parallel processing indicated by the determined combination.
Equipped with a,
The processing load calculation unit calculates the estimated time for executing the processing flow as the load for each of the processing flows.
The flow management unit
When there are a plurality of the processing flows executed in parallel, the priority regarding the virtual machine or its allocation to be assigned to each of the plurality of processing flows is determined based on the calculated estimated time.
When there is the processing flow for which the predicted time has not been calculated among the plurality of processing flows executed in parallel, the number of the virtual machines executing each of the plurality of processing flows is made equal to each other. ,
Distributed processing system.
前記フロー管理部は、前記制約条件を満たす前記仮想マシンを、前記処理フローを並列的に実行する前記仮想マシンとして決定する、
請求項1に記載の分散処理システム。 A machine management table storage unit that stores constraints related to parallel execution of the processing flow by the virtual machine is provided.
The flow management section, the constraint satisfies the virtual machine is determined as the virtual machine running the process flow in parallel,
The distributed processing system according to claim 1.
前記フロー管理部は、前記制約条件が示す最大数を満たす数の前記仮想マシンを、前記処理フローを並列的に実行する前記仮想マシンとして決定する、
請求項2に記載の分散処理システム。 As the constraint condition, the machine management table storage unit stores the maximum number of virtual machines capable of executing the processing flow in parallel.
The flow management unit determines the number of virtual machines satisfying the maximum number indicated by the constraint condition as the virtual machines that execute the processing flow in parallel.
The distributed processing system according to claim 2.
各前記仮想マシンが実行した前記処理フローの結果、又は前記処理フローにより発生したデータの入出力量に関する情報を出力する出力部とをさらに備え、
前記マシン管理テーブル記憶部は、前記制約条件として、並列的に前記処理フローを実行可能な前記仮想マシンの最大数を記憶し、
前記フロー管理部は、
前記制約条件が示す最大数を満たす数の前記仮想マシンを、前記処理フローを並列的に実行する前記仮想マシンとして決定し、
並列的に実行される前記処理フローを決定する際に、当該処理フローにおいて処理可能な前記データの区画が複数ある場合には、予め定められた、最初に処理される前記データの区画のみが前記処理フローにおいて処理されることを決定する、
請求項1に記載の分散処理システム。 A machine management table storage unit that stores constraints related to parallel execution of the processing flow by the virtual machine, and
Further provided with an output unit that outputs information regarding the result of the processing flow executed by each of the virtual machines or the amount of data input / output generated by the processing flow.
As the constraint condition, the machine management table storage unit stores the maximum number of virtual machines capable of executing the processing flow in parallel.
The flow management unit
The number of the virtual machines satisfying the maximum number indicated by the constraint condition is determined as the virtual machines that execute the processing flow in parallel.
When determining the processing flow to be executed in parallel, if there are a plurality of data partitions that can be processed in the processing flow, only the predetermined data partition that is processed first is said. Determine to be processed in the processing flow,
The distributed processing system according to claim 1.
プロセッサ及びメモリを備えるマシン管理サーバが、
前記区画のデータに対する前記複数の処理フローの実行順序を記憶するフローテーブル記憶処理と、
前記処理フローによる前記区画のデータの処理による前記仮想マシンに対する負荷を、前記区画ごとに算出する処理負荷算出処理と、
各前記処理フローの現在の実行状態、各前記処理フローの実行順序、及び各前記処理フローについて算出した前記負荷に基づき、並列的に実行される前記処理フロー、及び当該処理フローを実行する前記仮想マシンの組み合わせを決定するフロー管理処理と、
前記決定した組み合わせが示す並列的な処理を各前記仮想マシンに実行させるマシン制御処理と、
を実行し、
前記処理負荷算出処理において、前記負荷として、前記処理フローの実行に係る予測時間を各前記処理フローについて算出し、
前記フロー管理処理において、
並列的に実行される前記処理フローが複数ある場合、前記算出した予測時間に基づき、前記複数の処理フローのそれぞれに対して割り当てる前記仮想マシン又はその割り当てに関する優先度を決定し、
前記並列的に実行される複数の処理フローのうち前記予測時間を算出していない前記処理フローがある場合には、前記複数の処理フローのそれぞれを実行する前記仮想マシンの数を互いに均等とする、
分散処理方法。 Is configured to include a plurality of virtual machines, for a given process in which a plurality comprises a processing flow for processing predetermined data comprising a plurality of compartments in each of the compartments, the processing flow to at least one of the plurality of virtual machines It is a distributed processing method in a distributed processing system that can execute the processing flow in parallel by allocating.
A machine management server with a processor and memory
A flow table storage process that stores the execution order of the plurality of processing flows for the data in the partition, and
A processing load calculation process that calculates the load on the virtual machine by processing the data of the partition according to the processing flow for each partition, and
The processing flow executed in parallel based on the current execution state of each processing flow , the execution order of each processing flow , and the load calculated for each processing flow , and the virtual machine that executes the processing flow. Flow management process that determines the combination of machines and
Machine control processing that causes each virtual machine to execute parallel processing indicated by the determined combination, and
The execution,
In the processing load calculation process, as the load, the estimated time for executing the processing flow is calculated for each processing flow.
In the flow management process
When there are a plurality of the processing flows executed in parallel, the priority regarding the virtual machine or its allocation to be assigned to each of the plurality of processing flows is determined based on the calculated estimated time.
When there is the processing flow for which the predicted time has not been calculated among the plurality of processing flows executed in parallel, the number of the virtual machines executing each of the plurality of processing flows is made equal to each other. ,
Distributed processing method.
前記フロー管理処理は、前記制約条件を満たす前記仮想マシンを、前記処理フローを並列的に実行する前記仮想マシンとして決定する、
請求項7に記載の分散処理方法。 The machine management server further executes machine management table storage processing that stores constraints related to parallel execution of the processing flow by the virtual machine.
The flow management process, the constraints satisfying the virtual machine is determined as the virtual machine running the process flow in parallel,
The distributed processing method according to claim 7.
前記区画のデータに対する前記複数の処理フローの実行順序を記憶するフローテーブル記憶処理と、
前記処理フローによる前記区画のデータの処理による前記仮想マシンに対する負荷を、前記区画ごとに算出する処理負荷算出処理と、
各前記処理フローの現在の実行状態、各前記処理フローの実行順序、及び各前記処理フローについて算出した前記負荷に基づき、並列的に実行される前記処理フロー、及び当該処理フローを実行する前記仮想マシンの組み合わせを決定するフロー管理処理と、
前記決定した組み合わせが示す並列的な処理を各前記仮想マシンに実行させるマシン制御処理と、
を実行させ、
前記処理負荷算出処理において、前記負荷として、前記処理フローの実行に係る予測時間を各前記処理フローについて算出し、
前記フロー管理処理において、
並列的に実行される前記処理フローが複数ある場合、前記算出した予測時間に基づき、前記複数の処理フローのそれぞれに対して割り当てる前記仮想マシン又はその割り当てに関する優先度を決定し、
前記並列的に実行される複数の処理フローのうち前記予測時間を算出していない前記処理フローがある場合には、前記複数の処理フローのそれぞれを実行する前記仮想マシンの数を互いに均等とする、
分散処理プログラム。 Is configured to include a plurality of virtual machines, for a given process in which a plurality comprises a processing flow for processing predetermined data comprising a plurality of compartments in each of the compartments, the processing flow to at least one of the plurality of virtual machines To a distributed processing system equipped with a processor and memory that can execute the processing flow in parallel by allocating
A flow table storage process that stores the execution order of the plurality of processing flows for the data in the partition, and
A processing load calculation process that calculates the load on the virtual machine by processing the data of the partition according to the processing flow for each partition, and
The processing flow executed in parallel based on the current execution state of each processing flow , the execution order of each processing flow , and the load calculated for each processing flow , and the virtual machine that executes the processing flow. Flow management process that determines the combination of machines and
Machine control processing that causes each virtual machine to execute parallel processing indicated by the determined combination, and
To run ,
In the processing load calculation process, as the load, the estimated time for executing the processing flow is calculated for each processing flow.
In the flow management process
When there are a plurality of the processing flows executed in parallel, the priority regarding the virtual machine or its allocation to be assigned to each of the plurality of processing flows is determined based on the calculated estimated time.
When there is the processing flow for which the predicted time has not been calculated among the plurality of processing flows executed in parallel, the number of the virtual machines executing each of the plurality of processing flows is made equal to each other. ,
Distributed processing program.
前記フロー管理処理は、前記制約条件を満たす前記仮想マシンを、前記処理フローを並列的に実行する前記仮想マシンとして決定する、
請求項9に記載の分散処理プログラム。 Further execute the machine management table storage process that stores the constraint conditions related to the parallel execution of the process flow by the virtual machine.
The flow management process, the constraints satisfying the virtual machine is determined as the virtual machine running the process flow in parallel,
The distributed processing program according to claim 9.
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
JP2017155083A JP6940325B2 (en) | 2017-08-10 | 2017-08-10 | Distributed processing system, distributed processing method, and distributed processing program |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
JP2017155083A JP6940325B2 (en) | 2017-08-10 | 2017-08-10 | Distributed processing system, distributed processing method, and distributed processing program |
Publications (2)
Publication Number | Publication Date |
---|---|
JP2019035996A JP2019035996A (en) | 2019-03-07 |
JP6940325B2 true JP6940325B2 (en) | 2021-09-29 |
Family
ID=65637518
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
JP2017155083A Active JP6940325B2 (en) | 2017-08-10 | 2017-08-10 | Distributed processing system, distributed processing method, and distributed processing program |
Country Status (1)
Country | Link |
---|---|
JP (1) | JP6940325B2 (en) |
Family Cites Families (10)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
JP3063582B2 (en) * | 1995-08-25 | 2000-07-12 | 富士ゼロックス株式会社 | Image processing device |
JP2003208323A (en) * | 2002-01-11 | 2003-07-25 | Hitachi Ltd | Method, system and program for executing batch job |
JP2011053995A (en) * | 2009-09-03 | 2011-03-17 | Hitachi Ltd | Data processing control method and computer system |
US9244737B2 (en) * | 2011-02-04 | 2016-01-26 | Hitachi, Ltd. | Data transfer control method of parallel distributed processing system, parallel distributed processing system, and recording medium |
US9112750B2 (en) * | 2011-05-31 | 2015-08-18 | Hitachi, Ltd. | Job management server and job management method |
US9141430B2 (en) * | 2012-04-30 | 2015-09-22 | Hewlett-Packard Development Company, L.P. | Scheduling mapreduce job sets |
CN105308579B (en) * | 2013-07-01 | 2018-06-08 | 株式会社日立制作所 | Series data parallel parsing infrastructure and its parallel decentralized approach |
JP2015090688A (en) * | 2013-11-07 | 2015-05-11 | 株式会社日立製作所 | Mapreduce job management system and mapreduce job management method |
US20160154684A1 (en) * | 2014-02-19 | 2016-06-02 | Hitachi, Ltd. | Data processing system and data processing method |
JP6565391B2 (en) * | 2015-07-03 | 2019-08-28 | 富士ゼロックス株式会社 | Information processing apparatus and information processing program |
-
2017
- 2017-08-10 JP JP2017155083A patent/JP6940325B2/en active Active
Also Published As
Publication number | Publication date |
---|---|
JP2019035996A (en) | 2019-03-07 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
US10007461B1 (en) | Redistributing data in a distributed storage system based on attributes of the data | |
US20210211492A1 (en) | Pairwise comparison for load balancing | |
US9760294B2 (en) | Computer system, storage management computer, and storage management method | |
CN106502791B (en) | A kind of method for allocating tasks and device | |
JP6241300B2 (en) | Job scheduling apparatus, job scheduling method, and job scheduling program | |
US9092266B2 (en) | Scalable scheduling for distributed data processing | |
US9483288B2 (en) | Method and system for running a virtual appliance | |
US8479205B2 (en) | Schedule control program and schedule control method | |
US20130263142A1 (en) | Control device, control method, computer readable recording medium in which program is recorded, and distributed processing system | |
JP6191691B2 (en) | Abnormality detection apparatus, control method, and program | |
JP2015517147A5 (en) | ||
WO2015001850A1 (en) | Task allocation determination device, control method, and program | |
JP6519111B2 (en) | Data processing control method, data processing control program and data processing control device | |
JP2017041191A (en) | Resource management apparatus, resource management program, and resource management method | |
JP5471822B2 (en) | I / O control program, information processing apparatus, and I / O control method | |
US20170295221A1 (en) | Apparatus and method for processing data | |
US9778854B2 (en) | Computer system and method for controlling hierarchical storage therefor | |
JP6940325B2 (en) | Distributed processing system, distributed processing method, and distributed processing program | |
CN105528303B (en) | Method and apparatus for managing storage system | |
US10824640B1 (en) | Framework for scheduling concurrent replication cycles | |
JPWO2018225752A1 (en) | Control device, control method, and control program | |
JP6127754B2 (en) | Program, exclusive control request distribution method and system | |
JP6374059B2 (en) | Computer resource allocation determination method, computer resource allocation determination method program, and control computer | |
US20170147408A1 (en) | Common resource updating apparatus and common resource updating method | |
TW201942741A (en) | Information processing apparatus, control method, and program product |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
A621 | Written request for application examination |
Free format text: JAPANESE INTERMEDIATE CODE: A621 Effective date: 20200626 |
|
A977 | Report on retrieval |
Free format text: JAPANESE INTERMEDIATE CODE: A971007 Effective date: 20210519 |
|
A131 | Notification of reasons for refusal |
Free format text: JAPANESE INTERMEDIATE CODE: A131 Effective date: 20210615 |
|
A521 | Written amendment |
Free format text: JAPANESE INTERMEDIATE CODE: A523 Effective date: 20210728 |
|
A521 | Written amendment |
Free format text: JAPANESE INTERMEDIATE CODE: A523 Effective date: 20210816 |
|
TRDD | Decision of grant or rejection written | ||
A01 | Written decision to grant a patent or to grant a registration (utility model) |
Free format text: JAPANESE INTERMEDIATE CODE: A01 Effective date: 20210831 |
|
A61 | First payment of annual fees (during grant procedure) |
Free format text: JAPANESE INTERMEDIATE CODE: A61 Effective date: 20210902 |
|
R150 | Certificate of patent or registration of utility model |
Ref document number: 6940325 Country of ref document: JP Free format text: JAPANESE INTERMEDIATE CODE: R150 |