CN107609008A - A kind of data importing device and method from relevant database to Kafka based on Apache Sqoop - Google Patents
A kind of data importing device and method from relevant database to Kafka based on Apache Sqoop Download PDFInfo
- Publication number
- CN107609008A CN107609008A CN201710619621.1A CN201710619621A CN107609008A CN 107609008 A CN107609008 A CN 107609008A CN 201710619621 A CN201710619621 A CN 201710619621A CN 107609008 A CN107609008 A CN 107609008A
- Authority
- CN
- China
- Prior art keywords
- parameter
- data
- kafka
- module
- sqoop
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Pending
Links
Landscapes
- Stored Programmes (AREA)
Abstract
The present invention provides a kind of data importing device and method from relevant database to Kafka based on Apache Sqoop, in the original connection databases of Sqoop and on the basis of reading data, increase transmits data to Kafka MapReduce modules, realize that efficiently easily data import, on the one hand avoid exploitation again import needed for all modules the duplication of labour, further perfect functions of the Sqoop as a data transfer tool of another reverse side.
Description
Technical field
The present invention relates to computer software application technical field, and in particular to a kind of to quickly introduce place for mass data
Reason technology.
Background technology
In social fast-developing today, all trades and professions can all produce substantial amounts of data daily, and data source enumerates me
Around any types data that can capture, create in website, social media, trade type business data and other business environments
The data built.In such a case, Apache frameworks Hadoop arises at the historic moment, and it is a more and more general Distributed Calculation
Environment, it is mainly used to handle big data, user can develop distributed journey in the case where not knowing about distributed low-level details
Sequence, the power of cluster is made full use of to carry out high-speed computation and storage.As cloud provider utilizes this framework, more users will
Data set shifts between Hadoop and traditional database, can help the instrument of data transfer and become more important.
Apache Sqoop are exactly so a instrument, and a large amount of numbers can be shifted between Hadoop and relevant database
According to, it is mainly used in carrying out the transmission of data between Hadoop and traditional database, can be by the number in a relevant database
According to importeding into HDFS, Hive, Hbase, HDFS data can be also imported into relevant database.
Kafka is that a kind of distributed post of high-throughput subscribes to message system.Reality scene is also required to data sometimes
It is sent to from database in kafka.Data imported into Kafka from relevant database a variety of methods, wherein most directly square
Method is exactly oneself to write program using database device and Kafka API to be imported.Sqoop is as data transfer work
Tool, does not support the scene temporarily.
Therefore for it is above-mentioned data are imported from relevant database to kafka in the prior art existing for it is inconvenient, not high
The problem of effect, be necessary in fact carry out developmental research, to provide a kind of scheme, can realize data from relevant database to
Kafka's efficiently easily imports.
The content of the invention
In order to solve the above technical problems, the present invention provide be it is a kind of based on Apache Sqoop from relevant database to
Kafka data import plan, it is specific as follows:
In a first aspect, the present invention provides a kind of data from relevant database to Kafka based on Apache Sqoop
Gatherer, it is characterised in that including analysis module, parameter injection module, input module, correction verification module, task judge module,
Data cutting module, mapping block, memory module, wherein:
Analysis module, for analyzing the parameter for importing data from relevant database and being needed to Kafka, parameter includes being used for
Specify the first parameter of Kafka server address and the second parameter for specifying data receiver topic;
Parameter injection module, for carrying out the injection of the first parameter and the second parameter according to the analysis result of analysis module;
Input module, parameter is inputted for user;
Correction verification module, verified for the first parameter inputted to user and the second parameter;
Task judge module, judge whether to be introduced into Kafka task for the input according to user;
Data cutting module, the data read from relevant database are subjected to cutting for Sqoop;
Mapping block, for extracting data line by line from data cutting module, and transmit data to what the first parameter was specified
Address;
The message storage of reception is arrived " topic " corresponding catalogue by memory module, Kafka.
Further, Kafka server address is that message pushes destination address.
Further, the parameter that task judge module inputs according to user, whether the parameter of input is detected comprising the first ginseng
Number and the second parameter decide whether to be introduced into Kafka task.
Further, carried out by the way that the data extracted from data cutting module are handled and sent it to Kafka
Concurrent operation, to ensure the operation of mapping block, data processing include timestamp caused by increase data, from database name
And table name information.
Second aspect, the present invention provide a kind of data from relevant database to Kafka based on Apache Sqoop
Introduction method, it is characterised in that including:
SS1:The parameter for importing data from relevant database and being needed to Kafka is analyzed, parameter includes being used to specify Kafka
First parameter of server address and the second parameter for specifying data receiver topic;
SS2:The injection of the first parameter and the second parameter is carried out according to the analysis result of analysis module;
SS3:User inputs to parameter;
SS4:The first parameter and the second parameter of user's input are verified;
SS5:Judge whether to be introduced into Kafka task according to the input of user;
SS6:The data read from relevant database are carried out cutting by Sqoop;
SS7:Data are extracted line by line, and transmit data to the address that the first parameter is specified;
SS8:The message storage of reception is arrived " topic " corresponding catalogue by Kafka.
Further, Kafka server address is that message pushes destination address in step SS1.
Further, the parameter that step SS5 inputs according to user, whether the parameter of input is detected comprising the first parameter and the
Two parameters decide whether to be introduced into Kafka task.
Further, concurrent operation is carried out by the way that the data of extraction are handled and sent it to Kafka, to ensure
Step SS7 operation, data processing include increase data caused by timestamp, from database name and table name information.
By the solution of the present invention, afka MapReduce journeys can be sent to using the data of database as message
Sequence, realize data efficiently easily importing from relevant database to Kafka.
Brief description of the drawings
Fig. 1 shows the data importing device from relevant database to Kafka of the invention based on Apache Sqoop
Structured flowchart.
Fig. 2 shows the data importing device work from relevant database to Kafka of the invention based on Apache Sqoop
Make schematic diagram.
Embodiment
Technical scheme is further illustrated below in conjunction with Figure of description and specific embodiment.It should be appreciated that this
The specific embodiment of place description is not intended to limit the present invention only to explain the present invention.
Fig. 1 is the data from relevant database to Kafka based on Apache Sqoop of one embodiment of the invention
The structured flowchart of gatherer.
As shown in figure 1, it is according to embodiments of the present invention based on Apache Sqoop from relevant database to Kafka
Data importing device, including analysis module, parameter injection module, input module, correction verification module, task judge module, data
Cutting module, mapping block, memory module:
Analysis module, for analyzing the parameter for importing data from relevant database and being needed to Kafka, parameter includes being used for
Specify the first parameter of Kafka server address and the second parameter for specifying data receiver topic;
Parameter injection module, for carrying out the injection of the first parameter and the second parameter according to the analysis result of analysis module;
Input module, parameter is inputted for user;
Correction verification module, verified for the first parameter inputted to user and the second parameter;
Task judge module, judge whether to be introduced into Kafka task for the input according to user;
Data cutting module, the data read from relevant database are subjected to cutting for Sqoop;
Mapping block, for extracting data line by line from data cutting module, and transmit data to what the first parameter was specified
Address;
The message storage of reception is arrived " topic " corresponding catalogue by memory module, Kafka.
According to one embodiment of the invention, analysis module analysis imports what data needed to Kafka from relevant database
First parameter can be " broker_list ", and the second parameter can be " topic ", be respectively used to specify Kafka server address
Which topic be sent to data;
In another example parameter injection module, exists for the analysis result according to analysis module
Enter in org.apache.sqoop.tool.BaseSqoolTool classes and org.apache.sqoop.tool.ImportTool classes
The injection of row parameter " broker_list " and " topic ";
Correction verification module, verified for the input " broker_list " for user and " topic " parameter;
" broker_list " and " topic " two parameter must be specified clearly, indispensable.Simultaneously in order to avoid unnecessary
Processing, it is necessary to carry out cross check to parameter, i.e., if it is intended to importing Kafka, then do not need other importing tasks needs
Parameter, such as import " target_dir " that HDFS needs.
According to one embodiment of the invention, Kafka server address is the address that the producer wants PUSH message, and form is<
IP address:Port>.
According to one embodiment of the invention, in org.apache.sqoop.manager.MainframeManager classes and
Increase judges that Kafka imports the module of task in org.apache.sqoop.manager.SqlManager classes.
The parameter inputted according to user according to one embodiment of the invention, task judge module, detect whether to include
" broker_list " and " topic " parameter decide whether to be introduced into Kafka task.
According to one embodiment of the invention, appointed by increasing processing data and sending it to Kafka MapReduce
Business program:Org.apache.sqoop.mapreduce.KafkaImportJob classes,
Org.apache.sqoop.mapreduce.KafkaImportMapper classes and
Org.apache.sqoop.kafka.KafkaPutProcessor classes, to ensure the operation of mapping block.
Here the processing to data timestamp mainly caused by increase data, from database name, the details such as table name
Information.
Fig. 2 shows the data importing device work from relevant database to Kafka of the invention based on Apache Sqoop
Make schematic diagram.
As shown in Fig. 2 Sqoop clients and generate fortune using table structure in the driver reading database of database
Row class.
The MapReduce tasks that data are imported to Kafka are designed, the task can send the data read from database
To Kafka Broker.
In addition, corresponding, the present invention also provide it is a kind of based on Apache Sqoop from relevant database to Kafka
Data lead-in method, including:
SS1:The parameter for importing data from relevant database and being needed to Kafka is analyzed, parameter includes " broker_list "
" topic ", it is respectively used to specify which topic Kafka server address and data are sent to;
SS2:For analysis result in org.apache.sqoop.tool.BaseSqoolTool classes and
The injection of parameter " broker_list " and " topic " is carried out in org.apache.sqoop.tool.ImportTool classes;
SS3:User inputs to parameter;
SS4:Input " broker_list " and " topic " parameter to user verify;
SS5:Judge whether to be introduced into Kafka task according to the input of user;
SS6:The data read from database are carried out cutting by Sqoop;
SS7:MapReduce Map modules take data line by line, then call the interface function in Kafka, and by number
The address specified according to " broker_list " is sent to;
SS8:The message storage of reception is arrived " topic " corresponding catalogue by Kafka.
It should be noted that need first to create the topic to be imported in Kafka before importing.
According to one embodiment of the invention, Kafka server address is the ground that the producer wants PUSH message in step SS1
Location, form are<IP address:Port>.
According to one embodiment of the invention, in org.apache.sqoop.manager.MainframeManager classes and
Increase judges that Kafka imports the module of task in org.apache.sqoop.manager.SqlManager classes.
The parameter inputted according to user according to one embodiment of the invention, step SS5, detect whether to include " broker_
List " and " topic " parameter decide whether to be introduced into Kafka task.
According to one embodiment of the invention, by increasing processing data and sending it to Kafka MapReduce tasks
Program:Org.apache.sqoop.mapreduce.KafkaImportJob classes,
Org.apache.sqoop.mapreduce.KafkaImportMapper classes and
Org.apache.sqoop.kafka.KafkaPutProcessor classes, to ensure step SS7 operation.
The principle of the present invention is:
1) Sqoop Import instruments are extended, increase imported into Kafka function;
2) parameter that analysis needs from database importing data to Kafka, design parameter correction verification module, and do correlation and match somebody with somebody
Put;
3) monitoring for the MapReduce programs that the database data of reading is sent to Kafka is designed and Implemented.
Advantage is:
(1) Sqoop instruments add the function of importing data to Kafka;
(2) an order is run, you can realize and Kafka is imported data to by MapReduce tasks.
By the present invention the data import plan from relevant database to Kafka based on Apache Sqoop,
Original connection databases of Sqoop and on the basis of reading data, increase transmits data to Kafka MapReduce moulds
Block, realize that efficiently easily data import, on the one hand avoid the duplication of labour of all modules needed for exploitation importing again, it is another
Further perfect functions of the Sqoop as a data transfer tool of reverse side.
Although having been described for some aspects in the context of device, it is apparent that these aspects also illustrate that corresponding method
Description, wherein block or apparatus and method for step or the feature of method and step be corresponding.Similarly, in the context of method and step
Described in each side also illustrate that corresponding to block or project or the feature of corresponding intrument description.(or use) can be passed through
Hardware unit such as microprocessor, programmable calculator or electronic circuit etc is some or all of in method and step to perform.
Can be performed by such device in most important method and step some or it is multiple.
The realization can use hardware or using software or can use for example floppy disk, DVD, blue light, CD, ROM,
There is the stored digital for being stored in electronically readable control signal thereon to be situated between for PROM, EPROM, EEPROM or flash memory etc
Matter performs, and the electronically readable control signal coordinates (or can coordinate with it) with so that performing with programmable computer system
Corresponding method.The data medium with electronically readable control signal can be provided, the electronically readable control signal can be with
Programmable computer system coordinates to perform approach described herein.
The realization can also use the form of the computer program product with program code, work as computer program product
When running on computers, program code is operated to perform this method.Can in machine-readable carrier storage program generation
Code.
Described above be only it is illustrative, and it is to be understood that it is described herein arrangement and details modification and
Change will be apparent to those skilled in the art.It is therefore intended that only by scope of the following claims rather than by
The specific detail that is presented limits by way of above description and explaining.
Claims (8)
- A kind of 1. data importing device from relevant database to Kafka based on Apache Sqoop, it is characterised in that bag Include analysis module, parameter injection module, input module, correction verification module, task judge module, data cutting module, mapping block, Memory module, wherein:Analysis module, for analyzing the parameter for importing data from relevant database and being needed to Kafka, parameter includes being used to specify First parameter of Kafka server address and the second parameter for specifying data receiver topic;Parameter injection module, for carrying out the injection of the first parameter and the second parameter according to the analysis result of analysis module;Input module, parameter is inputted for user;Correction verification module, verified for the first parameter inputted to user and the second parameter;Task judge module, judge whether to be introduced into Kafka task for the input according to user;Data cutting module, the data read from relevant database are subjected to cutting for Sqoop;Mapping block, for extracting data line by line from data cutting module, and transmit data to the address that the first parameter is specified;The message storage of reception is arrived " topic " corresponding catalogue by memory module, Kafka.
- 2. the data from relevant database to Kafka according to claim 1 based on Apache Sqoop import dress Put, wherein Kafka server address is that message pushes destination address.
- 3. the data from relevant database to Kafka according to claim 1 based on Apache Sqoop import dress Put, the parameter that wherein task judge module inputs according to user, whether detect the parameter of input comprising the first parameter and the second ginseng Count to decide whether to be introduced into Kafka task.
- 4. the data from relevant database to Kafka according to claim 1 based on Apache Sqoop import dress Put, wherein carrying out concurrent operation by the way that the data extracted from data cutting module are handled and sent it to Kafka, come Ensure mapping block operation, data processing include increase data caused by timestamp, from database name and table name letter Breath.
- A kind of 5. data lead-in method from relevant database to Kafka based on Apache Sqoop, it is characterised in that bag Include:SS1:The parameter for importing data from relevant database and being needed to Kafka is analyzed, parameter includes being used to specify Kafka to service First parameter of device address and the second parameter for specifying data receiver topic;SS2:The injection of the first parameter and the second parameter is carried out according to the analysis result of analysis module;SS3:User inputs to parameter;SS4:The first parameter and the second parameter of user's input are verified;SS5:Judge whether to be introduced into Kafka task according to the input of user;SS6:The data read from relevant database are carried out cutting by Sqoop;SS7:Data are extracted line by line, and transmit data to the address that the first parameter is specified;SS8:The message storage of reception is arrived " topic " corresponding catalogue by Kafka.
- 6. the data importing side from relevant database to Kafka according to claim 5 based on Apache Sqoop Kafka server address is that message pushes destination address in method, wherein step SS1.
- 7. the data importing side from relevant database to Kafka according to claim 5 based on Apache Sqoop Whether method, the parameter that wherein step SS5 inputs according to user, the parameter for detecting input include the first parameter and the second parameter to determine Fixed whether being introduced into Kafka of the task.
- 8. the data importing side from relevant database to Kafka according to claim 5 based on Apache Sqoop Method, wherein concurrent operation is carried out by the way that the data of extraction are handled and sent it to Kafka, to ensure step SS7's Operation, data processing include increase data caused by timestamp, from database name and table name information.
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN201710619621.1A CN107609008A (en) | 2017-07-26 | 2017-07-26 | A kind of data importing device and method from relevant database to Kafka based on Apache Sqoop |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN201710619621.1A CN107609008A (en) | 2017-07-26 | 2017-07-26 | A kind of data importing device and method from relevant database to Kafka based on Apache Sqoop |
Publications (1)
Publication Number | Publication Date |
---|---|
CN107609008A true CN107609008A (en) | 2018-01-19 |
Family
ID=61059607
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN201710619621.1A Pending CN107609008A (en) | 2017-07-26 | 2017-07-26 | A kind of data importing device and method from relevant database to Kafka based on Apache Sqoop |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN107609008A (en) |
Cited By (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN109359139A (en) * | 2018-10-24 | 2019-02-19 | 拉扎斯网络科技(上海)有限公司 | Data synchronization method, system, electronic device and computer readable storage medium |
CN114090661A (en) * | 2021-11-29 | 2022-02-25 | 阳光保险集团股份有限公司 | Data transmission method and device, electronic equipment and computer readable storage medium |
Citations (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN103810272A (en) * | 2014-02-11 | 2014-05-21 | 北京邮电大学 | Data processing method and system |
US20160224667A1 (en) * | 2015-02-04 | 2016-08-04 | Xinyu Xingbang Information Industry Co., Ltd. | Method and system of implementing an integrated interface supporting operation in multi-type databases |
CN106897411A (en) * | 2017-02-20 | 2017-06-27 | 广东奡风科技股份有限公司 | ETL system and its method based on Spark technologies |
CN106919697A (en) * | 2017-03-07 | 2017-07-04 | 郑州云海信息技术有限公司 | A kind of method that data are imported multiple Hadoop components simultaneously |
-
2017
- 2017-07-26 CN CN201710619621.1A patent/CN107609008A/en active Pending
Patent Citations (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN103810272A (en) * | 2014-02-11 | 2014-05-21 | 北京邮电大学 | Data processing method and system |
US20160224667A1 (en) * | 2015-02-04 | 2016-08-04 | Xinyu Xingbang Information Industry Co., Ltd. | Method and system of implementing an integrated interface supporting operation in multi-type databases |
CN106897411A (en) * | 2017-02-20 | 2017-06-27 | 广东奡风科技股份有限公司 | ETL system and its method based on Spark technologies |
CN106919697A (en) * | 2017-03-07 | 2017-07-04 | 郑州云海信息技术有限公司 | A kind of method that data are imported multiple Hadoop components simultaneously |
Non-Patent Citations (1)
Title |
---|
FSERSUN: "kafka快速指南", 《CSDN博客》 * |
Cited By (2)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN109359139A (en) * | 2018-10-24 | 2019-02-19 | 拉扎斯网络科技(上海)有限公司 | Data synchronization method, system, electronic device and computer readable storage medium |
CN114090661A (en) * | 2021-11-29 | 2022-02-25 | 阳光保险集团股份有限公司 | Data transmission method and device, electronic equipment and computer readable storage medium |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
KR102317535B1 (en) | Methods and systems for implementing data tracking with software development kits | |
US10237295B2 (en) | Automated event ID field analysis on heterogeneous logs | |
CN112491602B (en) | Behavior data monitoring method and device, computer equipment and medium | |
CN103620601B (en) | Joining tables in a mapreduce procedure | |
CN113987074A (en) | Distributed service full-link monitoring method and device, electronic equipment and storage medium | |
CN112162965B (en) | Log data processing method, device, computer equipment and storage medium | |
US11676345B1 (en) | Automated adaptive workflows in an extended reality environment | |
US10175954B2 (en) | Method of processing big data, including arranging icons in a workflow GUI by a user, checking process availability and syntax, converting the workflow into execution code, monitoring the workflow, and displaying associated information | |
CN107957940B (en) | Test log processing method, system and terminal | |
CN112732567B (en) | Mock data testing method and device based on ip, electronic equipment and storage medium | |
JP2023036681A (en) | Task processing method, processing device, electronic equipment, storage medium, and computer program | |
CN113836014B (en) | Interface testing method, device, electronic device and storage medium | |
CN113806434B (en) | Big data processing method, device, equipment and medium | |
CN113282854A (en) | Data request response method and device, electronic equipment and storage medium | |
CN113393288A (en) | Order processing information generation method, device, equipment and computer readable medium | |
US10574765B2 (en) | Method, device, and non-transitory computer-readable recording medium | |
CN113051171A (en) | Interface test method, device, equipment and storage medium | |
CN115809241A (en) | Data storage method and device, computer equipment and storage medium | |
CN113434542B (en) | Data relationship identification method and device, electronic equipment and storage medium | |
CN107609008A (en) | A kind of data importing device and method from relevant database to Kafka based on Apache Sqoop | |
CN114221988A (en) | Content distribution network hotspot analysis method and system | |
CN112818204A (en) | Service processing method, device, equipment and storage medium | |
CN109948251B (en) | CAD-based data processing method, device, equipment and storage medium | |
CN107071553A (en) | Method, device and computer readable storage medium for modifying video and voice | |
CN114268559B (en) | Directional network detection method, device, equipment and medium based on TF-IDF algorithm |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
TA01 | Transfer of patent application right | ||
TA01 | Transfer of patent application right |
Effective date of registration: 20200518 Address after: Building S01, Inspur Science Park, No. 1036, Inspur Road, high tech Zone, Jinan City, Shandong Province, 250000 Applicant after: Tidal Cloud Information Technology Co.,Ltd. Address before: 450000 Henan province Zheng Dong New District of Zhengzhou City Xinyi Road No. 278 16 floor room 1601 Applicant before: ZHENGZHOU YUNHAI INFORMATION TECHNOLOGY Co.,Ltd. |
|
RJ01 | Rejection of invention patent application after publication | ||
RJ01 | Rejection of invention patent application after publication |
Application publication date: 20180119 |