CN113010542B - 业务数据处理方法、装置、计算机设备及存储介质 - Google Patents
业务数据处理方法、装置、计算机设备及存储介质 Download PDFInfo
- Publication number
- CN113010542B CN113010542B CN202110271789.4A CN202110271789A CN113010542B CN 113010542 B CN113010542 B CN 113010542B CN 202110271789 A CN202110271789 A CN 202110271789A CN 113010542 B CN113010542 B CN 113010542B
- Authority
- CN
- China
- Prior art keywords
- service data
- data
- real
- kafka
- time service
- Prior art date
- Legal status (The legal status is an assumption and is not a legal conclusion. Google has not performed a legal analysis and makes no representation as to the accuracy of the status listed.)
- Active
Links
Classifications
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/24—Querying
- G06F16/242—Query formulation
- G06F16/2433—Query languages
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/21—Design, administration or maintenance of databases
- G06F16/215—Improving data quality; Data cleansing, e.g. de-duplication, removing invalid entries or correcting typographical errors
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/24—Querying
- G06F16/248—Presentation of query results
-
- G—PHYSICS
- G06—COMPUTING; CALCULATING OR COUNTING
- G06F—ELECTRIC DIGITAL DATA PROCESSING
- G06F16/00—Information retrieval; Database structures therefor; File system structures therefor
- G06F16/20—Information retrieval; Database structures therefor; File system structures therefor of structured data, e.g. relational data
- G06F16/25—Integrating or interfacing systems involving database management systems
-
- H—ELECTRICITY
- H04—ELECTRIC COMMUNICATION TECHNIQUE
- H04L—TRANSMISSION OF DIGITAL INFORMATION, e.g. TELEGRAPHIC COMMUNICATION
- H04L67/00—Network arrangements or protocols for supporting network services or applications
- H04L67/50—Network services
- H04L67/55—Push-based network services
Landscapes
- Engineering & Computer Science (AREA)
- Theoretical Computer Science (AREA)
- Databases & Information Systems (AREA)
- Physics & Mathematics (AREA)
- General Physics & Mathematics (AREA)
- Data Mining & Analysis (AREA)
- General Engineering & Computer Science (AREA)
- Computational Linguistics (AREA)
- Mathematical Physics (AREA)
- Quality & Reliability (AREA)
- Computer Networks & Wireless Communication (AREA)
- Signal Processing (AREA)
- Information Retrieval, Db Structures And Fs Structures Therefor (AREA)
Abstract
本申请实施例属于大数据领域,涉及一种业务数据处理方法,包括获取实时业务数据;将所述实时业务数据推送到Kafka;通过flink获取所述Kafka中的所述实时业务数据,并给所述实时业务数据添加对应的离线业务数据,得到业务数据体;通过所述Kafka将所述业务数据体推送至druid,以通过所述druid对所述业务数据体进行汇总计算,得到库存业务数据。本申请还提供一种业务数据处理装置、计算机设备及存储介质。此外,本申请还涉及区块链技术,库存业务数据可存储于区块链中。本申请提高了业务数据处理效率。
Description
技术领域
本申请涉及大数据技术领域,尤其涉及一种业务数据处理方法、装置、计算机设备及存储介质。
背景技术
在与实时交易相关联的业务系统中,还经常需要对海量的离线业务数据进行汇总统计,而离线业务数据通常与实时业务数据相关联,因此业务系统需要同时处理大量的实时业务数据与离线业务数据。
然而,业务系统的数据算力是有限的,面对海量数据,业务系统常常表现出算力不足的问题,导致业务数据处理效率较低。为了解决上述问题,传统的解决方案通常是读写分离,即将业务系统搭建在读服务器和写服务器上,以隔绝实时业务数据的读取和离线业务数据的计算,但是这种架构改造成本高,资源消耗大,并未真正提高数据处理能力,无法有效地解决业务数据处理效率较低的问题。
发明内容
本申请实施例的目的在于提出一种业务数据处理方法、装置、计算机设备及存储介质,以解决业务数据处理效率较低的问题。
为了解决上述技术问题,本申请实施例提供一种业务数据处理方法,采用了如下所述的技术方案:
获取实时业务数据;
将所述实时业务数据推送到Kafka;
通过flink获取所述Kafka中的所述实时业务数据,并给所述实时业务数据添加对应的离线业务数据,得到业务数据体;
通过所述Kafka将所述业务数据体推送至druid,以通过所述druid对所述业务数据体进行汇总计算,得到库存业务数据。
进一步的,在所述通过flink获取所述Kafka中的所述实时业务数据,并给所述实时业务数据添加对应的离线业务数据,得到业务数据体的步骤之前,还包括:
查询所述实时业务数据的链路传输方式;
当所述链路传输方式为第一传输方式时,执行所述通过flink获取所述Kafka中的所述实时业务数据,并给所述实时业务数据添加对应的离线业务数据,得到业务数据体的步骤;
当所述链路传输方式为第二传输方式时,将所述实时业务数据确定为业务数据体,并执行所述通过所述Kafka将所述业务数据体推送至druid,以通过所述druid对所述业务数据体进行汇总计算,得到库存业务数据的步骤。
进一步的,所述通过flink获取所述Kafka中的所述实时业务数据,并给所述实时业务数据添加对应的离线业务数据,得到业务数据体的步骤包括:
通过所述kafka将所述实时业务数据推送至flink;
获取所述实时业务数据中的关键字;
根据所述关键字从离线数据库中获取与所述实时业务数据相对应的离线业务数据;
通过所述flink将所述离线业务数据与所述实时业务数据进行合并,得到业务数据体。
进一步的,所述通过所述kafka将所述实时业务数据推送至flink的步骤之后,还包括:
获取所述实时业务数据的数据标识;
根据获取到的数据标识,确定所述实时业务数据中的重复数据;
对所述实时业务数据中的重复数据进行去重处理,得到去重后的实时业务数据。
进一步的,所述通过所述Kafka将所述业务数据体推送至druid,以通过所述druid对所述业务数据体进行汇总计算,得到库存业务数据的步骤包括:
通过kafka将所述业务数据体推送至druid;
获取与所述业务数据体对应的处理策略;
指示所述druid根据所述处理策略对所述业务数据体进行数据重组,得到数据宽表;
根据所述处理策略对所述数据宽表进行计算,得到计算结果;
根据所述计算结果和所述数据宽表得到库存业务数据。
进一步的,在所述通过所述Kafka将所述业务数据体推送至druid,以通过所述druid对所述业务数据体进行汇总计算,得到库存业务数据的步骤之后,还包括:
接收终端发送的业务数据查询指令;
根据所述业务数据查询指令在所述druid中查询库存业务数据;
将查询到的库存业务数据通过所述终端进行展示。
进一步的,所述根据所述业务数据查询指令在所述druid中查询库存业务数据的步骤包括:
提取所述业务数据查询指令中的查询语句;
将所述查询语句转化为符合druid语法的druid查询语句;
运行所述druid查询语句,以便从所述druid中查询库存业务数据。
为了解决上述技术问题,本申请实施例还提供一种业务数据处理装置,采用了如下所述的技术方案:
数据获取模块,用于获取实时业务数据;
数据推送模块,用于将所述实时业务数据推送到Kafka;
离线添加模块,用于通过flink获取所述Kafka中的所述实时业务数据,并给所述实时业务数据添加对应的离线业务数据,得到业务数据体;
数据计算模块,用于通过所述Kafka将所述业务数据体推送至druid,以通过所述druid对所述业务数据体进行汇总计算,得到库存业务数据。
为了解决上述技术问题,本申请实施例还提供一种计算机设备,采用了如下所述的技术方案:
获取实时业务数据;
将所述实时业务数据推送到Kafka;
通过flink获取所述Kafka中的所述实时业务数据,并给所述实时业务数据添加对应的离线业务数据,得到业务数据体;
通过所述Kafka将所述业务数据体推送至druid,以通过所述druid对所述业务数据体进行汇总计算,得到库存业务数据。
为了解决上述技术问题,本申请实施例还提供一种计算机可读存储介质,采用了如下所述的技术方案:
获取实时业务数据;
将所述实时业务数据推送到Kafka;
通过flink获取所述Kafka中的所述实时业务数据,并给所述实时业务数据添加对应的离线业务数据,得到业务数据体;
通过所述Kafka将所述业务数据体推送至druid,以通过所述druid对所述业务数据体进行汇总计算,得到库存业务数据。
与现有技术相比,本申请实施例主要有以下有益效果:将业务数据拆分为实时业务数据和离线业务数据,从业务系统获取到实时业务数据后,将实时业务数据推送到flink,由flink给实时业务数据添加对应的离线业务数据,得到业务数据体,减少业务系统对业务数据的处理;flink可以通过kafka将业务数据体推送到druid,kafka是一种高吞吐量的数据传输管道,提高了数据流转的速度;druid具有强大的数据计算能力,能够根据设置的处理逻辑对业务数据体进行汇总计算,得到库存业务数据,进一步提高了对业务数据的处理效率。
附图说明
为了更清楚地说明本申请中的方案,下面将对本申请实施例描述中所需要使用的附图作一个简单介绍,显而易见地,下面描述中的附图是本申请的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他的附图。
图1是本申请可以应用于其中的示例性系统架构图;
图2是根据本申请的业务数据处理方法的一个实施例的流程图;
图3是根据本申请的业务数据处理装置的一个实施例的结构示意图;
图4是根据本申请的计算机设备的一个实施例的结构示意图。
具体实施方式
除非另有定义,本文所使用的所有的技术和科学术语与属于本申请的技术领域的技术人员通常理解的含义相同;本文中在申请的说明书中所使用的术语只是为了描述具体的实施例的目的,不是旨在于限制本申请;本申请的说明书和权利要求书及上述附图说明中的术语“包括”和“具有”以及它们的任何变形,意图在于覆盖不排他的包含。本申请的说明书和权利要求书或上述附图中的术语“第一”、“第二”等是用于区别不同对象,而不是用于描述特定顺序。
在本文中提及“实施例”意味着,结合实施例描述的特定特征、结构或特性可以包含在本申请的至少一个实施例中。在说明书中的各个位置出现该短语并不一定均是指相同的实施例,也不是与其它实施例互斥的独立的或备选的实施例。本领域技术人员显式地和隐式地理解的是,本文所描述的实施例可以与其它实施例相结合。
为了使本技术领域的人员更好地理解本申请方案,下面将结合附图,对本申请实施例中的技术方案进行清楚、完整地描述。
如图1所示,系统架构100可以包括终端设备101、102、103,网络104和服务器105。网络104用以在终端设备101、102、103和服务器105之间提供通信链路的介质。网络104可以包括各种连接类型,例如有线、无线通信链路或者光纤电缆等等。
用户可以使用终端设备101、102、103通过网络104与服务器105交互,以接收或发送消息等。终端设备101、102、103上可以安装有各种通讯客户端应用,例如网页浏览器应用、购物类应用、搜索类应用、即时通信工具、邮箱客户端、社交平台软件等。
终端设备101、102、103可以是具有显示屏并且支持网页浏览的各种电子设备,包括但不限于智能手机、平板电脑、电子书阅读器、MP3播放器(Moving PictureExpertsGroup Audio Layer III,动态影像专家压缩标准音频层面3)、MP4(MovingPictureExperts Group Audio Layer IV,动态影像专家压缩标准音频层面4)播放器、膝上型便携计算机和台式计算机等等。
服务器105可以是提供各种服务的服务器,例如对终端设备101、102、103上显示的页面提供支持的后台服务器。本申请中的业务系统、flink和druid可以架设于服务器105中。服务器105可以是一台服务器,也可以是多台服务器。当服务器105为多台服务器时,业务系统、flink和druid可以分别架设于一台服务器中。
需要说明的是,本申请实施例所提供的业务数据处理方法一般由服务器执行,相应地,业务数据处理装置一般设置于服务器中。
应该理解,图1中的终端设备、网络和服务器的数目仅仅是示意性的。根据实现需要,可以具有任意数目的终端设备、网络和服务器。
继续参考图2,示出了根据本申请的业务数据处理方法的一个实施例的流程图。所述的业务数据处理方法,包括以下步骤:
步骤S201,获取实时业务数据。
在本实施例中,服务器方法运行于其上的电子设备(例如图1所示的服务器)可以通过有线连接方式或者无线连接方式与终端进行通信。需要指出的是,上述无线连接方式可以包括但不限于3G/4G连接、WiFi连接、蓝牙连接、WiMAX连接、Zigbee连接、UWB(ultrawideband)连接、以及其他现在已知或将来开发的无线连接方式。
其中,实时业务数据是业务数据的一部分,实时业务数据可以是业务交易中实时变化的数据。例如,实时业务数据可以是保单中客户每个月支付的保费。
具体地,实时业务数据来自于业务系统,业务系统用于处理业务交易,当业务系统所关联的业务发生变动时,业务系统产生实时业务数据。服务器从业务系统获取实时业务数据。
步骤S202,将实时业务数据推送到Kafka。
具体地,Kafka是一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,可以作为信息传输管道,高效地将信息进行传输。从业务系统获取的实时业务数据将被推送到kafka。
在一个实施例中,服务器对业务系统进行监测,当监测到业务系统产生新的实时业务数据时,以多线程异步的方式将实时业务数据推送给kafka。
步骤S203,通过flink获取Kafka中的实时业务数据,并给实时业务数据添加对应的离线业务数据,得到业务数据体。
其中,flink是一种开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时可以执行批处理和流处理程序。对于某一业务,该业务可以存在实时业务数据以及离线业务数据,实时业务数据和离线业务数据组成的整体即为业务数据体。
离线业务数据是业务数据的一部分,时效性较低,在业务交易中不经常发生变化。例如,在保险业务场景中,保单中的用户名可以是离线业务数据。
具体地,可以由flink获取kafka中的实时业务数据。Flink作为数据加工处理方,可以从存储离线业务数据的数据库中,提取与接收到的实时业务数据相关联的离线业务数据,并将离线业务数据追加到实时业务数据中,得到完整的业务数据体。
步骤S204,通过Kafka将业务数据体推送至druid,以通过druid对业务数据体进行汇总计算,得到库存业务数据。
其中,druid是一个分布式的、支持实时多维OLAP(Online AnalyticalProcessing,联机分析处理)分析的数据处理系统。它既支持高速的数据实时摄入处理,也支持实时且灵活的多维数据分析查询。druid可用于大数据背景下灵活快速的多维OLAP分析。另外,druid还支持根据时间戳对数据进行预聚合摄入和聚合分析,因此在有时序数据处理分析的场景中也会用到druid。
具体地,flink生成业务数据体后,可以通过kafka将业务数据体推送至druid。用户可以通过druid对业务数据体进行处理,例如,通过druid对业务数据体进行计算、统计、宽表重建等,得到库存业务数据,从而实现对业务数据的一站式处理。得到的库存业务数据可以存储在druid中。
需要强调的是,为进一步保证上述库存业务数据的私密和安全性,上述库存业务数据还可以存储于一区块链的节点中。
本申请所指区块链是分布式数据存储、点对点传输、共识机制、加密算法等计算机技术的新型应用模式。区块链(Blockchain),本质上是一个去中心化的数据库,是一串使用密码学方法相关联产生的数据块,每一个数据块中包含了一批次网络交易的信息,用于验证其信息的有效性(防伪)和生成下一个区块。区块链可以包括区块链底层平台、平台产品服务层以及应用服务层等。
本实施例中,将业务数据拆分为实时业务数据和离线业务数据,从业务系统获取到实时业务数据后,将实时业务数据推送到flink,由flink给实时业务数据添加对应的离线业务数据,得到业务数据体,减少业务系统对业务数据的处理;flink可以通过kafka将业务数据体推送到druid,kafka是一种高吞吐量的数据传输管道,提高了数据流转的速度;druid具有强大的数据计算能力,能够根据设置的处理逻辑对业务数据体进行汇总计算,得到库存业务数据,进一步提高了对业务数据的处理效率。
进一步的,上述步骤S203之前,还可以包括:查询实时业务数据的链路传输方式;当链路传输方式为第一传输方式时,通过flink获取Kafka中的实时业务数据,并给实时业务数据添加对应的离线业务数据,得到业务数据体;当链路传输方式为第二传输方式时,将实时业务数据确定为业务数据体,并通过Kafka将业务数据体推送至druid,以通过druid对业务数据体进行汇总计算,得到库存业务数据。
其中,链路传输方式可以是业务数据的传输顺序。
具体地,来自业务系统的实时业务数据可以具有多于一种的链路传输方式。在典型的链路传输方式中,当需要对实时业务数据对应的离线业务数据进行加工时,可以选择第一传输方式,先将实时业务数据推送至kafka,由kafka将实时业务数据推送至flink,由flink给实时业务数据添加对应的离线业务数据,得到业务数据体;当实时业务数据较为完整,可以满足处理需求时,可以选择第二传输方式,将实时业务数据作为业务数据体,经过kafka推送至druid,由druid对业务数据体进行汇总计算,得到库存业务数据。
实时业务数据与业务相关,可以预先定义来自某业务的实时业务数据的链路传输方式。查询产生实时业务数据的业务,即可确定实时业务数据的链路传输方式。
本实施例中,实时业务数据的传输可以选择不同的链路传输方式,满足了对实时业务数据的多种处理方式。
进一步的,上述步骤S203可以包括:通过kafka将实时业务数据推送至flink;获取实时业务数据中的关键字;根据关键字从离线数据库中获取与实时业务数据相对应的离线业务数据;通过flink将离线业务数据与实时业务数据进行合并,得到业务数据体。
其中,关键字可以是业务数据中的字段,关键字可以标识产生实时业务数据和离线业务数据的业务交易,即,实时业务数据和离线业务数据中可以包含相同的关键字。具体地,由flink获取kafka中的实时业务数据,由flink获取kafka中的实时业务数据,以便对实时业务数据进行加工处理。
实时业务数据是业务交易中产生的实时数据,在进行业务数据处理的时候,可能还需要添加对应的离线业务数据,例如姓名、机构名称等。如果由业务系统进行离线业务数据的合并,会影响业务数据的处理效率;为了减少业务系统的压力,可以由flink从离线数据库中获取与实时业务数据对应的离线业务数据,并由flink完成实时业务数据和离线业务数据的合并。
在一个实施例中,离线业务数据可以存储在hive中。hive是基于Hadoop的一个数据仓库工具,用来进行数据提取、转化、加载,这是一种可以存储、查询和分析存储在Hadoop中的大规模数据的机制。hive数据仓库工具能将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,能将SQL语句转变成MapReduce任务来执行。
Flink可以在离线数据库查询实时业务数据中的关键字,将查询到的关键字所对应存储的离线业务数据与实时业务数据进行合并,得到业务数据体。
实时业务数据所对应的离线业务数据可以由flink自动合并,在实际应用中,如果离线业务数据存储在多个离线数据库中,也可以由操作人员在数据管理平台查看实时业务数据,手动配置如何获取实时业务数据对应的离线业务数据,比如,可以配置离线业务数据所在的离线数据库、离线数据库中的哪个数据表,根据哪些字段从离线业务数据库中进行取数等,而不必编写SQL或HQL取数语句,由数据管理平台自动根据配置的信息生成取数语句获取数据,节约了人力操作,提高了数据获取效率。可以理解,操作人员也可以编写取数语句获取离线业务数据。
本实施例中,由flink获取与实时业务数据相关的离线业务数据,而不是由业务系统获取离线业务数据,减小了业务系统的数据处理压力,提高了业务数据处理效率;将离线业务数据和实时业务数据拼装为业务数据体,得到完整的业务数据,保证了对业务数据的正常处理。
进一步的,上述通过kafka将实时业务数据推送至flink的步骤之后,还包括:获取实时业务数据的数据标识;根据获取到的数据标识,确定实时业务数据中的重复数据;对实时业务数据中的重复数据进行去重处理,得到去重后的实时业务数据。
其中,数据标识可以是每条业务数据的标识。
具体地,在应用中,由于网络原因或集群故障,消费者获取并处理kafka中的消息后,kafka可能未收到处理完成的结果,会再次将消息推送至消费者。因此,实时业务数据中可能存在重复的数据。在大数据领域中,消费者通常用于指代获取并处理数据的主体,例如,消费者可以是数据处理组件flink。
Flink可以比对实时业务数据的数据标识,确定发生重复的实时业务数据,重复的实时业务数据将被标记为重复数据。数据标识可以是每条实时业务数据的数据编号,例如,在保险业务场景中,数据标识可以是保单号。在一个实施例中,数据标识也可以是按照预设顺序将实时业务数据中的每个数据拼接后,再进行哈希运算得到的哈希值。
Flink可以对确定的重复数据进行去重处理,得到去重后的实时业务数据。可以理解,kafka也可能向druid重复推送数据,druid也可以对接收到的业务数据体进行去重处理。
本实施例中,确定实时业务数据中的重复数据并进行去重处理,可以避免对实时业务数据进行重复的处理,并避免了重复数据可能带来的数据错误。
进一步的,上述步骤S204可以包括:通过kafka将业务数据体推送至druid;获取与业务数据体对应的处理策略;指示druid根据处理策略对业务数据体进行数据重组,得到数据宽表;根据处理策略对数据宽表进行计算,得到计算结果;根据计算结果和数据宽表得到库存业务数据。
其中,处理策略是一种信息,可以指示如何处理业务数据体。
具体地,可以预先配置对业务数据体的处理策略。druid可以根据业务数据体中的关键字,查询与业务数据体对应的处理策略。
业务系统或hive中的数据都是凌乱的、一条一条存在的,数据的排序并没有规律,因此业务数据体的排序不一定符合数据处理的需求。druid可以按照预先配置好的处理策略,对业务数据体中的业务数据进行数据重组,数据重组包括字段选取以及数据排序,可以保留预设字段的数据,并根据字段对业务数据进行重新排列,得到数据宽表。比如,可以按照用户对业务数据进行重新排列,使同一个用户产生的业务数据排列在一起。
可以预先在JSON文件中配置业务数据体中的字段和数据宽表的映射关系,在druid中建立数据宽表,而不在flink中建立数据宽表。flink中可能随时有数据流入,如果在flink中建立数据宽表,服务器开销较大,由druid建立数据宽表可以减小建表开销,提高业务数据处理效率。
Druid还可以根据处理策略对数据宽表进行统计计算,得到计算结果。计算结果可以写入数据宽表中,得到库存业务数据。库存业务数据可以存储在druid中。
本实施例中,获取业务数据体的处理策略,由druid根据处理策略对业务数据体进行宽表重建以及计算,实现了对业务数据体的处理,并提高了业务数据处理效率。
进一步的,上述步骤S204之后,还可以包括:接收终端发送的业务数据查询指令;根据业务数据查询指令在druid中查询库存业务数据;将查询到的库存业务数据通过终端进行展示。
其中,数据查询指令可以是指示druid进行数据查询的指令。
具体地,本申请还提供数据管理平台,数据管理平台可以对业务数据处理进行控制与管理,例如,配置链路传输方式、处理策略等。数据管理平台还可以访问druid中的库存业务数据,数据管理平台提供数据查询接口,在没有业务数据应用层的情况下实现库存业务数据的查询。
数据管理平台提供用户页面,用户可以在终端使用用户页面进行库存业务数据的查询,触发业务数据查询指令。用户可以根据任意的字段维度进行查询。Druid根据数据查询指令查询库存业务数据,并将查询到的库存业务数据通过终端进行展示。
本实施例中,使用druid实现查询功能,可以查看库存业务数据,从而获取到druid对业务数据的处理结果。
进一步的,上述根据业务数据查询指令在druid中查询库存业务数据的步骤包括:提取业务数据查询指令中的查询语句;将查询语句转化为符合druid语法的druid查询语句;运行druid查询语句,以便从druid中查询库存业务数据。
具体地,用户可以在数据管理平台编写查询语句,查询语句可以是SQL语句或者HQL语句。Druid接收到业务数据查询指令后,对业务数据查询指令进行解析以提取查询语句,并将查询语句转换为符合druid语法的druid查询语句。Druid运行druid查询语句,实现对库存业务数据的查询。Druid可以通过服务器与终端之间的http(Hypertext TransferProtocol,超文本传输协议)等链接,将查询到的库存业务数据发送至终端进行展示。
本实施例中,从业务数据查询指令中提取到查询语句后,将查询语句转化为符合druid语法的druid查询语句,从而使得druid可以实现库存业务数据的查询。
本领域普通技术人员可以理解实现上述实施例方法中的全部或部分流程,是可以通过计算机程序来指令相关的硬件来完成,该计算机程序可存储于一计算机可读取存储介质中,该程序在执行时,可包括如上述各方法的实施例的流程。其中,前述的存储介质可为磁碟、光盘、只读存储记忆体(Read-Only Memory,ROM)等非易失性存储介质,或随机存储记忆体(Random Access Memory,RAM)等。
应该理解的是,虽然附图的流程图中的各个步骤按照箭头的指示依次显示,但是这些步骤并不是必然按照箭头指示的顺序依次执行。除非本文中有明确的说明,这些步骤的执行并没有严格的顺序限制,其可以以其他的顺序执行。而且,附图的流程图中的至少一部分步骤可以包括多个子步骤或者多个阶段,这些子步骤或者阶段并不必然是在同一时刻执行完成,而是可以在不同的时刻执行,其执行顺序也不必然是依次进行,而是可以与其他步骤或者其他步骤的子步骤或者阶段的至少一部分轮流或者交替地执行。
进一步参考图3,作为对上述图2所示方法的实现,本申请提供了一种业务数据处理装置的一个实施例,该装置实施例与图2所示的方法实施例相对应,该装置具体可以应用于各种电子设备中。
如图3所示,本实施例所述的业务数据处理装置300包括:数据获取模块301、数据推送模块302、离线添加模块303以及数据计算模块304。其中:
数据获取模块301,用于获取实时业务数据;
数据推送模块302,用于将所述实时业务数据推送到Kafka;
离线添加模块303,用于通过flink获取所述Kafka中的所述实时业务数据,并给所述实时业务数据添加对应的离线业务数据,得到业务数据体;
数据计算模块304,用于通过所述Kafka将所述业务数据体推送至druid,以通过所述druid对所述业务数据体进行汇总计算,得到库存业务数据。
本实施例中,将业务数据拆分为实时业务数据和离线业务数据,从业务系统获取到实时业务数据后,将实时业务数据推送到flink,由flink给实时业务数据添加对应的离线业务数据,得到业务数据体,减少业务系统对业务数据的处理;flink可以通过kafka将业务数据体推送到druid,kafka是一种高吞吐量的数据传输管道,提高了数据流转的速度;druid具有强大的数据计算能力,能够根据设置的处理逻辑对业务数据体进行汇总计算,得到库存业务数据,进一步提高了对业务数据的处理效率。
在本实施例的一些可选的实现方式中,业务数据处理装置300还可以包括方式查询模块,其中:
方式查询模块,用于查询实时业务数据的链路传输方式。
所述离线添加模块303还用于当链路传输方式为第一传输方式时,通过flink获取Kafka中的实时业务数据,并给实时业务数据添加对应的离线业务数据,得到业务数据体。
所述数据计算模块304还用于当链路传输方式为第二传输方式时,将实时业务数据确定为业务数据体,并通过Kafka将业务数据体推送至druid,以通过druid对业务数据体进行汇总计算,得到库存业务数据。
本实施例中,实时业务数据的传输可以选择不同的链路传输方式,满足了对实时业务数据的多种处理方式。
在本实施例的一些可选的实现方式中,离线添加模块303可以包括数据推送子模块、关键字获取子模块、离线获取子模块以及数据合并子模块,其中:
数据推送子模块,用于通过kafka将实时业务数据推送至flink。
关键字获取子模块,用于获取实时业务数据中的关键字。
离线获取子模块,用于根据关键字从离线数据库中获取与实时业务数据相对应的离线业务数据。
数据合并子模块,用于通过flink将离线业务数据与实时业务数据进行合并,得到业务数据体。
本实施例中,由flink获取与实时业务数据相关的离线业务数据,而不是由业务系统获取离线业务数据,减小了业务系统的数据处理压力,提高了业务数据处理效率;将离线业务数据和实时业务数据拼装为业务数据体,得到完整的业务数据,保证了对业务数据的正常处理。
在本实施例的一些可选的实现方式中,离线添加模块303还可以包括标识获取子模块、数据确定子模块以及数据去重子模块,其中:
标识获取子模块,用于获取实时业务数据的数据标识。
数据确定子模块,用于根据获取到的数据标识,确定实时业务数据中的重复数据。
数据去重子模块,用于对实时业务数据中的重复数据进行去重处理,得到去重后的实时业务数据。
本实施例中,确定实时业务数据中的重复数据并进行去重处理,可以避免对实时业务数据进行重复的处理,并避免了重复数据可能带来的数据错误。
在本实施例的一些可选的实现方式中,数据计算模块304可以包括:数据体推送子模块、策略获取子模块、数据重组子模块、宽表计算子模块以及库存生成子模块,其中:
数据体推送子模块,用于通过kafka将业务数据体推送至druid。
策略获取子模块,用于获取与业务数据体对应的处理策略。
数据重组子模块,用于指示druid根据处理策略对业务数据体进行数据重组,得到数据宽表。
宽表计算子模块,用于根据处理策略对数据宽表进行计算,得到计算结果。
库存生成子模块,用于根据计算结果和数据宽表得到库存业务数据。
本实施例中,获取业务数据体的处理策略,由druid根据处理策略对业务数据体进行宽表重建以及计算,实现了对业务数据体的处理,并提高了业务数据处理效率。
在本实施例的一些可选的实现方式中,业务数据处理装置300还可以包括:指令接收模块、库存查询模块以及数据展示模块,其中:
指令接收模块,用于接收终端发送的业务数据查询指令。
库存查询模块,用于根据业务数据查询指令在druid中查询库存业务数据。
数据展示模块,用于将查询到的库存业务数据通过终端进行展示。
本实施例中,使用druid实现查询功能,可以查看库存业务数据,从而获取到druid对业务数据的处理结果。
在本实施例的一些可选的实现方式中,库存查询模块可以包括:语句提取子模块、语句转化子模块以及语句运行子模块,其中:
语句提取子模块,用于提取业务数据查询指令中的查询语句。
语句转化子模块,用于将查询语句转化为符合druid语法的druid查询语句。
语句运行子模块,用于运行druid查询语句,以便从druid中查询库存业务数据。
本实施例中,从业务数据查询指令中提取到查询语句后,将查询语句转化为符合druid语法的druid查询语句,从而使得druid可以实现库存业务数据的查询。
为解决上述技术问题,本申请实施例还提供计算机设备。具体请参阅图4,图4为本实施例计算机设备基本结构框图。
所述计算机设备4包括通过系统总线相互通信连接存储器41、处理器42、网络接口43。需要指出的是,图中仅示出了具有组件41-43的计算机设备4,但是应理解的是,并不要求实施所有示出的组件,可以替代的实施更多或者更少的组件。其中,本技术领域技术人员可以理解,这里的计算机设备是一种能够按照事先设定或存储的指令,自动进行数值计算和/或信息处理的设备,其硬件包括但不限于微处理器、专用集成电路(ApplicationSpecific Integrated Circuit,ASIC)、可编程门阵列(Field-Programmable GateArray,FPGA)、数字处理器(Digital Signal Processor,DSP)、嵌入式设备等。
所述计算机设备可以是桌上型计算机、笔记本、掌上电脑及云端服务器等计算设备。所述计算机设备可以与用户通过键盘、鼠标、遥控器、触摸板或声控设备等方式进行人机交互。
所述存储器41至少包括一种类型的可读存储介质,所述可读存储介质包括闪存、硬盘、多媒体卡、卡型存储器(例如,SD或DX存储器等)、随机访问存储器(RAM)、静态随机访问存储器(SRAM)、只读存储器(ROM)、电可擦除可编程只读存储器(EEPROM)、可编程只读存储器(PROM)、磁性存储器、磁盘、光盘等。在一些实施例中,所述存储器41可以是所述计算机设备4的内部存储单元,例如该计算机设备4的硬盘或内存。在另一些实施例中,所述存储器41也可以是所述计算机设备4的外部存储设备,例如该计算机设备4上配备的插接式硬盘,智能存储卡(Smart Media Card,SMC),安全数字(Secure Digital,SD)卡,闪存卡(FlashCard)等。当然,所述存储器41还可以既包括所述计算机设备4的内部存储单元也包括其外部存储设备。本实施例中,所述存储器41通常用于存储安装于所述计算机设备4的操作系统和各类应用软件,例如业务数据处理方法的计算机可读指令等。此外,所述存储器41还可以用于暂时地存储已经输出或者将要输出的各类数据。
所述处理器42在一些实施例中可以是中央处理器(Central Processing Unit,CPU)、控制器、微控制器、微处理器、或其他数据处理芯片。该处理器42通常用于控制所述计算机设备4的总体操作。本实施例中,所述处理器42用于运行所述存储器41中存储的计算机可读指令或者处理数据,例如运行所述业务数据处理方法的计算机可读指令。
所述网络接口43可包括无线网络接口或有线网络接口,该网络接口43通常用于在所述计算机设备4与其他电子设备之间建立通信连接。
本实施例中提供的计算机设备可以执行上述业务数据处理方法。此处业务数据处理方法可以是上述各个实施例的业务数据处理方法。
本实施例中,将业务数据拆分为实时业务数据和离线业务数据,从业务系统获取到实时业务数据后,将实时业务数据推送到flink,由flink给实时业务数据添加对应的离线业务数据,得到业务数据体,减少业务系统对业务数据的处理;flink可以通过kafka将业务数据体推送到druid,kafka是一种高吞吐量的数据传输管道,提高了数据流转的速度;druid具有强大的数据计算能力,能够根据设置的处理逻辑对业务数据体进行汇总计算,得到库存业务数据,进一步提高了对业务数据的处理效率。
本申请还提供了另一种实施方式,即提供一种计算机可读存储介质,所述计算机可读存储介质存储有计算机可读指令,所述计算机可读指令可被至少一个处理器执行,以使所述至少一个处理器执行如上述的业务数据处理方法的步骤。
本实施例中,将业务数据拆分为实时业务数据和离线业务数据,从业务系统获取到实时业务数据后,将实时业务数据推送到flink,由flink给实时业务数据添加对应的离线业务数据,得到业务数据体,减少业务系统对业务数据的处理;flink可以通过kafka将业务数据体推送到druid,kafka是一种高吞吐量的数据传输管道,提高了数据流转的速度;druid具有强大的数据计算能力,能够根据设置的处理逻辑对业务数据体进行汇总计算,得到库存业务数据,进一步提高了对业务数据的处理效率。
通过以上的实施方式的描述,本领域的技术人员可以清楚地了解到上述实施例方法可借助软件加必需的通用硬件平台的方式来实现,当然也可以通过硬件,但很多情况下前者是更佳的实施方式。基于这样的理解,本申请的技术方案本质上或者说对现有技术做出贡献的部分可以以软件产品的形式体现出来,该计算机软件产品存储在一个存储介质(如ROM/RAM、磁碟、光盘)中,包括若干指令用以使得一台终端设备(可以是手机,计算机,服务器,空调器,或者网络设备等)执行本申请各个实施例所述的方法。
显然,以上所描述的实施例仅仅是本申请一部分实施例,而不是全部的实施例,附图中给出了本申请的较佳实施例,但并不限制本申请的专利范围。本申请可以以许多不同的形式来实现,相反地,提供这些实施例的目的是使对本申请的公开内容的理解更加透彻全面。尽管参照前述实施例对本申请进行了详细的说明,对于本领域的技术人员来而言,其依然可以对前述各具体实施方式所记载的技术方案进行修改,或者对其中部分技术特征进行等效替换。凡是利用本申请说明书及附图内容所做的等效结构,直接或间接运用在其他相关的技术领域,均同理在本申请专利保护范围之内。
Claims (7)
1.一种业务数据处理方法,其特征在于,包括下述步骤:
获取实时业务数据;
将所述实时业务数据推送到Kafka;
通过flink获取所述Kafka中的所述实时业务数据,并给所述实时业务数据添加对应的离线业务数据,得到业务数据体;
通过所述Kafka将所述业务数据体推送至druid,以通过所述druid对所述业务数据体进行汇总计算,得到库存业务数据;
在所述通过flink获取所述Kafka中的所述实时业务数据,并给所述实时业务数据添加对应的离线业务数据,得到业务数据体的步骤之前,还包括:
查询所述实时业务数据的链路传输方式;
当所述链路传输方式为第一传输方式时,执行所述通过flink获取所述Kafka中的所述实时业务数据,并给所述实时业务数据添加对应的离线业务数据,得到业务数据体的步骤;
当所述链路传输方式为第二传输方式时,将所述实时业务数据确定为业务数据体,并执行所述通过所述Kafka将所述业务数据体推送至druid,以通过所述druid对所述业务数据体进行汇总计算,得到库存业务数据的步骤;
所述通过flink获取所述Kafka中的所述实时业务数据,并给所述实时业务数据添加对应的离线业务数据,得到业务数据体的步骤包括:
通过所述Kafka将所述实时业务数据推送至flink;
获取所述实时业务数据中的关键字;
根据所述关键字从离线数据库中获取与所述实时业务数据相对应的离线业务数据;
通过所述flink将所述离线业务数据与所述实时业务数据进行合并,得到业务数据体;
所述通过所述Kafka将所述业务数据体推送至druid,以通过所述druid对所述业务数据体进行汇总计算,得到库存业务数据的步骤包括:
通过Kafka将所述业务数据体推送至druid;
获取与所述业务数据体对应的处理策略;
指示所述druid根据所述处理策略对所述业务数据体进行数据重组,得到数据宽表;
根据所述处理策略对所述数据宽表进行计算,得到计算结果;
根据所述计算结果和所述数据宽表得到库存业务数据。
2.根据权利要求1所述的业务数据处理方法,其特征在于,所述通过所述Kafka将所述实时业务数据推送至flink的步骤之后,还包括:
获取所述实时业务数据的数据标识;
根据获取到的数据标识,确定所述实时业务数据中的重复数据;
对所述实时业务数据中的重复数据进行去重处理,得到去重后的实时业务数据。
3.根据权利要求1所述的业务数据处理方法,其特征在于,在所述通过所述Kafka将所述业务数据体推送至druid,以通过所述druid对所述业务数据体进行汇总计算,得到库存业务数据的步骤之后,还包括:
接收终端发送的业务数据查询指令;
根据所述业务数据查询指令在所述druid中查询库存业务数据;
将查询到的库存业务数据通过所述终端进行展示。
4.根据权利要求3所述的业务数据处理方法,其特征在于,所述根据所述业务数据查询指令在所述druid中查询库存业务数据的步骤包括:
提取所述业务数据查询指令中的查询语句;
将所述查询语句转化为符合druid语法的druid查询语句;
运行所述druid查询语句,以便从所述druid中查询库存业务数据。
5.一种业务数据处理装置,所述装置用于实现权利要求1至4中任一项所述的业务数据处理方法,其特征在于,所述装置包括:
数据获取模块,用于获取实时业务数据;
数据推送模块,用于将所述实时业务数据推送到Kafka;
离线添加模块,用于通过flink获取所述Kafka中的所述实时业务数据,并给所述实时业务数据添加对应的离线业务数据,得到业务数据体;
数据计算模块,用于通过所述Kafka将所述业务数据体推送至druid,以通过所述druid对所述业务数据体进行汇总计算,得到库存业务数据。
6.一种计算机设备,包括存储器和处理器,所述存储器中存储有计算机可读指令,所述处理器执行所述计算机可读指令时实现如权利要求1至4中任一项所述的业务数据处理方法的步骤。
7.一种计算机可读存储介质,其特征在于,所述计算机可读存储介质上存储有计算机可读指令,所述计算机可读指令被处理器执行时实现如权利要求1至4中任一项所述的业务数据处理方法的步骤。
Priority Applications (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202110271789.4A CN113010542B (zh) | 2021-03-12 | 2021-03-12 | 业务数据处理方法、装置、计算机设备及存储介质 |
Applications Claiming Priority (1)
Application Number | Priority Date | Filing Date | Title |
---|---|---|---|
CN202110271789.4A CN113010542B (zh) | 2021-03-12 | 2021-03-12 | 业务数据处理方法、装置、计算机设备及存储介质 |
Publications (2)
Publication Number | Publication Date |
---|---|
CN113010542A CN113010542A (zh) | 2021-06-22 |
CN113010542B true CN113010542B (zh) | 2023-09-19 |
Family
ID=76406430
Family Applications (1)
Application Number | Title | Priority Date | Filing Date |
---|---|---|---|
CN202110271789.4A Active CN113010542B (zh) | 2021-03-12 | 2021-03-12 | 业务数据处理方法、装置、计算机设备及存储介质 |
Country Status (1)
Country | Link |
---|---|
CN (1) | CN113010542B (zh) |
Families Citing this family (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN113407617B (zh) * | 2021-06-25 | 2024-10-11 | 交控科技股份有限公司 | 基于大数据技术的实时与离线业务统一处理方法和装置 |
CN113256355B (zh) * | 2021-07-14 | 2021-09-17 | 北京宇信科技集团股份有限公司 | 一种积分权益实时确定方法、装置、介质、设备和系统 |
CN113934785B (zh) * | 2021-09-28 | 2024-09-13 | 杭州玳数科技有限公司 | 实时计算中的批量数据查询方法、装置和计算机设备 |
CN114416849A (zh) * | 2022-01-25 | 2022-04-29 | 平安科技(深圳)有限公司 | 数据处理方法、装置、电子设备及存储介质 |
Citations (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN109684352A (zh) * | 2018-12-29 | 2019-04-26 | 江苏满运软件科技有限公司 | 数据分析系统、方法、存储介质及电子设备 |
CN109783512A (zh) * | 2018-12-13 | 2019-05-21 | 平安科技(深圳)有限公司 | 数据处理方法、装置、计算机设备及存储介质 |
CN111666296A (zh) * | 2020-04-28 | 2020-09-15 | 中国平安财产保险股份有限公司 | 基于Flink的SQL数据实时处理方法、装置、计算机设备和介质 |
CN111953713A (zh) * | 2019-05-14 | 2020-11-17 | 上海博泰悦臻网络技术服务有限公司 | Kafka数据展示方法及装置、计算机可读存储介质和终端 |
-
2021
- 2021-03-12 CN CN202110271789.4A patent/CN113010542B/zh active Active
Patent Citations (4)
Publication number | Priority date | Publication date | Assignee | Title |
---|---|---|---|---|
CN109783512A (zh) * | 2018-12-13 | 2019-05-21 | 平安科技(深圳)有限公司 | 数据处理方法、装置、计算机设备及存储介质 |
CN109684352A (zh) * | 2018-12-29 | 2019-04-26 | 江苏满运软件科技有限公司 | 数据分析系统、方法、存储介质及电子设备 |
CN111953713A (zh) * | 2019-05-14 | 2020-11-17 | 上海博泰悦臻网络技术服务有限公司 | Kafka数据展示方法及装置、计算机可读存储介质和终端 |
CN111666296A (zh) * | 2020-04-28 | 2020-09-15 | 中国平安财产保险股份有限公司 | 基于Flink的SQL数据实时处理方法、装置、计算机设备和介质 |
Also Published As
Publication number | Publication date |
---|---|
CN113010542A (zh) | 2021-06-22 |
Similar Documents
Publication | Publication Date | Title |
---|---|---|
CN113010542B (zh) | 业务数据处理方法、装置、计算机设备及存储介质 | |
CN106649670B (zh) | 基于流式计算的数据监控方法及装置 | |
CN112162965B (zh) | 一种日志数据处理的方法、装置、计算机设备及存储介质 | |
CN109471893B (zh) | 网络数据的查询方法、设备及计算机可读存储介质 | |
CN113836235B (zh) | 基于数据中台的数据处理方法及其相关设备 | |
CN112182004A (zh) | 实时查看数据方法、装置、计算机设备及存储介质 | |
CN113407785A (zh) | 一种基于分布式储存系统的数据处理方法和系统 | |
CN112860662B (zh) | 自动化生产数据血缘关系建立方法、装置、计算机设备及存储介质 | |
WO2019071907A1 (zh) | 基于操作页面识别帮助信息的方法及应用服务器 | |
CN113962597A (zh) | 一种数据分析方法、装置、电子设备及存储介质 | |
CN116860856A (zh) | 一种财务数据处理方法、装置、计算机设备及存储介质 | |
CN111488386A (zh) | 数据查询方法和装置 | |
CN117217684A (zh) | 指标数据的处理方法、装置、计算机设备及存储介质 | |
CN116450723A (zh) | 数据提取方法、装置、计算机设备及存储介质 | |
CN116842012A (zh) | 一种Redis集群的分片存储方法、装置、设备及存储介质 | |
CN113590372A (zh) | 基于日志的链路追踪方法、装置、计算机设备及存储介质 | |
CN114968725A (zh) | 任务依赖关系校正方法、装置、计算机设备及存储介质 | |
CN113687881A (zh) | 元数据调用方法、装置、电子设备及存储介质 | |
CN111143328A (zh) | 一种敏捷商业智能数据构建方法、系统、设备、存储介质 | |
CN111782677A (zh) | 基于多引擎数据分群方法、装置、计算机设备和存储介质 | |
CN114328214B (zh) | 报表软件的接口测试用例的提效方法、装置、计算机设备 | |
US11947540B1 (en) | Query language for metric data | |
CN117272077A (zh) | 数据处理方法、装置、计算机设备及存储介质 | |
CN116775649A (zh) | 一种数据分类存储方法、装置、计算机设备及存储介质 | |
CN116842050A (zh) | 多维度状态数据处理方法、装置、计算机设备及存储介质 |
Legal Events
Date | Code | Title | Description |
---|---|---|---|
PB01 | Publication | ||
PB01 | Publication | ||
SE01 | Entry into force of request for substantive examination | ||
SE01 | Entry into force of request for substantive examination | ||
GR01 | Patent grant | ||
GR01 | Patent grant |