Nothing Special   »   [go: up one dir, main page]

Open access peer-reviewed chapter

Trends in High-Performance Data Engineering for Data Analytics

Written By

Vibhatha Abeykoon and Geoffrey Charles Fox

Submitted: 20 March 2023 Reviewed: 24 March 2023 Published: 03 May 2023

DOI: 10.5772/intechopen.1001458

From the Edited Volume

New Trends and Challenges in Open Data

Vijayalakshmi Kakulapati

Chapter metrics overview

111 Chapter Downloads

View Full Metrics

Abstract

Over the past decade, data analytics has undergone significant transformation due to the increasing availability of data and the need to extract valuable insights from it. However, the classical big data stack needs to be faster in data engineering, highlighting the need for high-performance computing. Data analytics has motivated the engineering community to build diverse frameworks, including Apache Arrow, Apache Parquet, Twister2, Cylon, Velox, and Datafusion. These frameworks have been designed to provide high-performance data processing on C++-backed core APIs, with extended usability through support for Python and R. Our research focuses on the trends in the evolution of data engineering, which have been characterized by a tendency towards high-performance computing, with frameworks designed to keep up with the evolving demands of the field. Our findings show that the modern-day data analytics frameworks have been developed with C++ core compute and communication kernels and are designed to facilitate high-performance data processing. And this has been a critical motivation to develop scalable components for data engineering frameworks.

Keywords

  • data engineering
  • data analytics
  • high-performance computing
  • big-data
  • data-engineering trends

1. Introduction

In today’s data-driven world, data analytics has become essential for businesses and organizations to extract insights from large volumes of data. However, as data grows, traditional data processing methods have proven inadequate, highlighting the need for high-performance data engineering. High-performance data engineering is a process that involves the efficient processing of large volumes of data using advanced computing systems and state-of-the-art hardware. The field of high-performance data engineering for data analytics has been evolving rapidly in recent years, driven by the need for faster, more efficient ways of processing data. This has led to the development of a range of frameworks, tools, and technologies that are designed to facilitate high-performance data processing on a large scale. In our study, we observe that the evolution in data engineering directly impacts data analytics frameworks. Sequential or multi-threaded approaches to computation cannot help analyze more extensive datasets. In replacing parameter servers, data analytics frameworks have evolved to use high-performance computing concepts to build the core compute and communication kernels. Driven by this fact, modern-day data engineering has developed in many ways. Our study mainly focuses on a few aspects of the evolution of data engineering.

The first key aspect is re-writing existing big-data stack with high-performance computing concepts. Language preference has changed from using Java to C++ when it comes to modern-day data engineering frameworks. The second key aspect is the shift in communication models. Rather than using master-slave communication architecture, more platforms have introduced bulk-synchronous parallel data processing to gain more performance. In addition, some frameworks have a preference for remote procedure call-based approaches. The third and final key aspect is the usability and extensibility of data analytics workloads. In developing data analytics algorithms for production, a development stage is defined as an exploratory data analytics stage. In this stage, it is vital that the data scientist can run various feature engineering algorithms and extract features to try new ideas. Unlike classical Java-based development approaches, modern data engineering frameworks have shown signs of extending their usability to Python and R. Still; they retain performance, unlike classical big-data systems having similar Python or R bindings.

Furthermore, data processing and planning have evolved in a new direction where users can now define a plan to run their queries with various platforms without introducing new code. The idea is that a new specification will determine what happens to the data, and the query processing engines that adopt this specification can trivially run the workloads. In Section 2, we discuss how data analytics has evolved in supporting AlexNet [1] to ChatGPT [2]. Section 3 discusses how data engineering has evolved from Apache Hadoop [3] to Ray [4] and how the query processing technologies have adopted new standards to run queries in various query engines with minimal effort. In Section 4, we summarize how data analytics has impacted the data engineering evolution in the past decade.

Advertisement

2. Modern data analytics and practices

Since AlexNet [1] took over the data analytics world by surprise, the data analytics world required more tools to be built on, making deep learning concepts available to build intelligent applications. Before AlexNet, it support vector machines which were breaking records in classifying and recognizing details intelligently. The main difference between these two algorithms is that AlexNet contains more layers of computation done upon the output of a previous layer. This required way more computing power compared to what SVM needed. Also, unlike SVM, AlexNet was not designed to run on CPUs but on GPUs. This is the first time the computer graphics card has been used to do something new.

With the spark of this new idea, image classification algorithms evolved. Writing complex neural networks requires more computing capability, data, and platforms. Existing tooling in the early days of deep learning required users to write computation layers using CUDA compute kernels from scratch, and this is not a very scalable idea. Unifying various concepts in neural networks, libraries sprang up to provide better tooling to develop deep learning applications.

PyTorch, Tensorflow, Apache MxNet, Caffe, Theano, and Chainer are some of the most prominent deep-learning libraries. These frameworks can provide higher-level APIs to build neural networks in a short time. Underneath these frameworks, use accelerators like GPUs and TPUs perform much better than doing computation on CPUs. But the user did not have to worry about the computation APIs; instead, they just had to select which accelerator they wanted to use when developing applications. Regarding usability, Python was the go-to language for writing such analytical applications. For the moment, we will not go into a deeper analysis of why Python was selected. Still, Python was already a popular language, and it was easy to use because it did not need pre-compilation.

2.1 Data analytics frameworks

So far, we have discussed how data analytics have evolved and the role of data analytics frameworks. It is better to understand this deeply by looking into each framework. Among the data analytics frameworks, legacy machine learning frameworks like Scikit-learn [5] contained many statistical and machine learning models. But before moving into details of the most promising systems, it is worth mentioning that deep learning-based solutions have evolved way beyond the primary machine leanring models.

  • PyTorch [6]: An open-source machine learning library based on Torch, which is used for applications such as natural language processing and computer vision. PyTorch is known for its dynamic computational graph, which allows for more flexibility in model creation and training. It is widely used in both academia and industry.

  • TensorFlow [7]: A popular open-source framework for machine learning and deep learning that was developed by Google Brain. It supports a wide range of tasks, including image and speech recognition, and has a large and active community of users and contributors. TensorFlow’s key features include its data flow graph architecture and its ability to scale across multiple devices.

  • MXNet [8]: A flexible and efficient deep learning framework that is known for its fast training speeds and low memory usage. Developed by Amazon Web Services, MXNet supports multiple programming languages and has a variety of pre-built models for image, text, and speech recognition.

  • Caffe [9]: A deep learning framework that is focused on speed and scalability. It was developed by the Berkeley Vision and Learning Center and is known for its ease of use and powerful visualization tools. Caffe is commonly used for image classification, segmentation, and object detection.

  • Theano [10]: A Python library that allows for efficient computation of mathematical expressions, particularly in the context of deep learning. It is known for its ability to optimize CPU and GPU usage and for its strong integration with NumPy. Theano is used in a variety of applications, including natural language processing and computer vision.

  • Chainer [11]: A Python-based deep learning framework that was developed by Japanese company Preferred Networks. Chainer is known for its dynamic computational graph, which allows for more flexibility in model creation and training. It supports a wide range of tasks, including image and speech recognition, and has a variety of built-in optimization algorithms.

Even though frameworks like Tensorflow, MxNet and Theano are used by many data analysts and industrial work, it is worth noting that PyTorch has become the de-facto standard in the research community to develop deep learning models. The primary reason is it’s a configurable and wide array of APIs to break down the application development into finer details.

PyTorch is a machine-learning library that is built on top of the Torch library. It is designed to provide a user-friendly and flexible framework for building and training deep neural networks in Python. PyTorch offers support for dynamic computation graphs, which allows for modifying neural network architecture during runtime. PyTorch supports CPU, GPU, and TPU-based computation models, which are mainly designed to run on a bulk-synchronous-parallel communication model. In simple terms, AllReduce collective is regarded as the widely used communication operation when running distributed training programmes. In addition, PyTorch recently introduced an RPC-based computation model to provide more flexibility in running applications in cloud environments.

A wide variety of accelerators and communication models enables researchers to experiment with different model architectures and ideas without committing to a fixed architecture beforehand. It offers many built-in functions and classes for building and training neural networks, including modules for convolutional and recurrent neural networks, optimization algorithms, and loss functions. It also supports GPU acceleration, which enables efficient movement of deep neural networks on modern hardware. PyTorch has gained popularity in the machine learning community due to its ease of use, flexibility, and powerful capabilities. It is widely used in research and industry for various applications such as computer vision, natural language processing, speech recognition, and reinforcement learning.

2.2 Machine learning in production

Python provided an easier way to prototype applications, and later in production, these applications can be compiled into high-performance scripts using tools like Torchscript [12]. By decoupling the model from any runtime environment, TorchScript enables the model to be executed independently of the framework or platform it was developed on. This eliminates the Global Interpreter Lock (GIL) in Python, which can be a bottleneck for executing multithreaded inference. TorchScript prioritizes optimizing the entire program as a whole rather than just individual components or parts.

2.3 Computation intensity in data analytics problems

The performance required to run the most recent data analytics problems has exponentially evolved through the past decade. From image classifiers to intuitive Chess Players to intelligent chatbots, the cost of learning more has risen to new levels. Figure 1 [13] shows the computing cost for various deep learning models over time.

Figure 1.

Amount of compute used in deep learning.

The data required to get an accurate model would depend on the model. Still, the modern-day argument in data-centric artificial intelligence [14] is that it is better to focus on a data-driven approach than a model-driven approach where the model is tuned to fit the data. This means that more and more data is required to get a better understanding, requiring data pre-processing at a larger scale.

Can we rely on the current data processing stack or enhance it? This is a significant question that needs to be addressed. How should the systems evolve? What are the best tools, or do we have to write new tools? How should we write such systems? All these questions are valid and very important. Let us learn more about data processing and how such systems have evolved in the past decade.

Advertisement

3. Data engineering

Data Engineering is a discipline that focuses on the design, construction, and maintenance of systems and processes to manage, store, and extract value from large and complex data sets. Breaking down data engineering would give us the following steps;

  • Read raw data from data sources

  • Formulate a series of operations to process data

  • Convert process data to expected output format

  • Persistent storage or sending information to a different service

  • Fault-tolerant for any operation executed with a defined granularity

Data sources can be categorized into various groups based on their data type. Structured data sources offer organized data, for example, CSV, spreadsheets, and databases. Semi-structured data sources provide data in formats such as JSON or XML. Unstructured data sources offer text files, images, videos, audio files, and other forms of data that require more processing than structured or semi-structured data. Streaming data sources provide real-time data from IoT devices, social media, log files, gaming, etc., requiring real-time or stream processing. The cloud data source is also widely adopted, with prominent platforms like Amazon S3, Google Cloud Storage, and Microsoft Azure.

The data sources contain raw data that needs to be processed to formulate the data that can be used for analytics. We need to perform a series of operations into two main categories to develop the expected data. Relational algebra operations and linear algebra operations. With raw data, what is mostly done is relational algebra operators like join, project, filter, sort, product, union, etc. Operators like null handling, filling null values and removing null columns or rows can be fused into projection or filter operations. Once the raw data is processed to remove unnecessary information and extract meaningful information, the next step is to transform data into numerical mapped data and apply numerical operations to fine-tune the data for other data analytics. For instance, we could have string data in our dataset, which requires to be mapped to numerical data, and that can be done with simple project expressions. Depending on the analytical algorithm, the data may need to be transformed into matrices, normalized matrix data, applied the Fourier transformation, etc. Such operations fall into the linear algebra operations.

Once the data is processed, the output data needs to be logged or fed into other systems in various formats. Some systems expect the data in specific storage formats like Parquet, CSV, HDF5, etc. Thus the processed data also have a data format. Converting data into a different format within systems can cause reading all the data, at least in chunks, and it will cost a lot to access storage and computing power. So the correct format is chosen at the data cleaning/processing stage, and data is stored. This approach will benefit smaller datasets requiring less system memory and storage. With efficient networking capability on Infini-band, Cray Aries, Intel Omni-Path and similar technologies, data could be moved from the data processing stage to the data analytics stage over in-memory data formats like Apache Arrow [15] in a very efficient way. For instance, Apache Arrow IPC format enables persistent storage and efficient data reading. With Apache Arrow Flight [16], data can be efficiently transferred for remote processing.

We have discussed the main steps of processing and storing data in various stages of the data processing pipeline. But fault tolerance is an essential feature to ensure systems run end-to-end seamlessly. As the term suggests, a system should be able to withstand faults. In the data processing context, such faults can be server shutdowns, network failures, out-of-memory exceptions and runtime exceptions due to unexpected failures in connected services that could stop the system and bring it to a complete stop. If the data processing time is minimal, re-running the workflow from the start point will be relatively inexpensive. However, executing operations like joins could be costly when the dataset is extensive. Computing and storage are not free, so we must repeatedly pay for the resources each time the system fails. To avoid such issues, systems are designed to snapshot certain stages as checkpoints, resume from the most recent checkpoint, and carry out the task.

3.1 Data engineering frameworks

Large datasets cannot be efficiently processed with only a set of relational and linear algebra operators. As the amount of data generated every day continues to increase exponentially, reaching petabytes of data across various platforms serving millions of users, there is a need for a more versatile set of operators and operation modes. Many applications require real-time notifications, forecasts, instant messaging, and reporting within a specific timeframe. To support these requirements, data engineering frameworks encapsulate ways such as stream data processing and batch data processing. These frameworks provide the necessary infrastructure to process large datasets efficiently and effectively, making extracting valuable insights and information easier.

Such widely used data engineering frameworks are;

  • Apache Hadoop: An open-source big data framework that provides distributed storage and processing of large data sets using a cluster of commodity hardware. Hadoop is used for batch processing, data analysis, machine learning, and more.

  • Apache Spark [17]: A unified analytics engine for large-scale data processing that supports batch processing, streaming, and machine learning. Spark is designed to be fast and efficient, with in-memory computing and the ability to process data in parallel.

  • Apache Kafka [18]: A distributed streaming platform that allows users to publish and subscribe to streams of records. Kafka is used for real-time data processing, data streaming, and data integration across different systems.

  • Apache Flink [19]: A distributed stream processing engine that supports real-time stream processing and batch processing. Flink is designed to be highly scalable, fault-tolerant, and efficient, with support for both batch and stream processing in one system.

  • Apache Beam [20]: An open-source, unified programming model for batch and streaming data processing. Beam provides a simple, consistent API for building data processing pipelines that can run on multiple processing engines, such as Apache Flink, Apache Spark, and Google Cloud Dataflow.

  • Apache Storm [21]: a distributed stream processing framework that supports real-time processing of high-velocity data streams. Storm is used for real-time analytics, machine learning, and other applications that require fast and reliable data processing.

  • Google Cloud Dataflow [22]: A fully managed, serverless data processing service that enables users to build batch and streaming data pipelines using Apache Beam programming model.

These frameworks are designed to solve a set of problems emerged through out the history of data processing. Table 1 shows the breakdown of pros and cons in these frameworks based on qualitative features like scalability, fault-tolerance and processing modes.

FrameworkBatch ProcessingStream ProcessingFault ToleranceProgramming Model a
Apache HadoopYesNoYesMapReduce
Apache SparkYesYesYesDataFrame
Apache KafkaNoYesYesPub/Sub
Apache FlinkYesYesYesDataflow
Apache BeamYesYesYesUnified
Apache StormNoYesYesSpout/Bolt
Google Cloud DataflowYesYesYesUnified

Table 1.

Features of various data processing frameworks.

Programming model refers to the type of programming model or approach that each data processing framework uses.


Apache Hadoop can be recognized as one of the earliest open-source big-data systems, and with time each system added its unique set of features. Apache software foundation and open-source software development significantly impact improving these systems. To see the progress made by these frameworks, we can look at Figure 2, which depicts the Github statistics.

Figure 2.

Github usage statistics of big data frameworks.

Although these are the widely adopted data processing systems, researchers and engineers have made an enormous shift to gain much better performance. We start to see a pattern when looking into the programming languages used to develop the frameworks mentioned below (Table 2).

Programming languageFrameworks
JavaApache Hadoop, Apache Flink, Apache Storm
ScalaApache Spark
Java/ScalaApache Kafka, Apache Beam
PythonGoogle Cloud Dataflow

Table 2.

Frameworks grouped by programming language.

3.2 Data engineering evolution

Java and Scala are the programming languages used in developing these big-data systems. Even though Java is known for platform independence, security, and scalability, the performance aspect could be more pleasing for applications associated with heavy computing tasks. In that aspect, C++ or Rust are better replacements to gain much better performance.

Using high-performance languages like C++ and Rust has been one of the trends in modern large-scale data processing systems. After the big-data era, frameworks like Ray, Velox, DataFusion and Apache Arrow have been mostly used to design data processing pipelines. One key takeaway from these frameworks is how they provide usability. Unlike classical, big-data systems, which focused more on languages like Scala and Java to give the majority of their APIs, these frameworks focused on the Python programming language to a greater extent. Big-data systems provided Python wrappers known to slow performance due to serialization and deserialization issues when crossing language boundaries. Furthermore, the usability of these frameworks has been challenging due to their complex APIs. However, modern big data frameworks backed by C++ offer improved Python APIs with seamless integration with other data analytics systems.

Besides language preference, the widely used big-data systems like Apache Spark have been mainly designed to perform in a lazy execution model where a driver programme takes control of data partitioning and running tasks. This approach bottlenecks the aggregation or reduction tasks, which require all the spawned tasks to communicate with the driver programme to synthesize the final answer. This is a classical problem which can be seen in many big-data systems because of the scheduler semantics.

To address this issue, a few research efforts have been from the big data community to integrate Gang scheduling [23]. Apache Spark has also attempted to incorporate this concept into its schedulers. In addition, research frameworks like Twister [24], Twister2 [25] and Cylon [26] have introduced the usage of MPI for big-data processing by abstracting away the MPI collectives and providing a big-data like APIs for application developers. The performance gains are significant compared to the existing big-data systems.

3.3 Next generation data engineering frameworks

Next-generation data engineering frameworks were built to meet the requirements of modern-day data analytics systems. In Section 2, we learned a few key factors.

  • More data leads to good results

  • Data analytics systems run on accelerators

  • Execution model is BSP or Asynchronous decentralized training (RPC)

  • Python is the widely used programming language

  • High performance is the key to efficient application development

Big-data systems have been there since 2005, and it has evolved in many ways to meet user requirements. But the major challenge came around 2012 when the data analytics world took over business modeling and solving scientific problems. The rise of neural networks in machine learning made shock waves through the entire data-driven echo system. This is where the aforementioned key factors come into play.

The motto of the last decade was the need to process more significant amounts of data to learn things better. And it began the evolution of modern-day data engineering systems. The key aspects that need an update are;

  • Performance

  • Usability

  • Interoperability with Data Analytics systems

  • Low learning curve

Big data systems can process more significant chunks of data but could improve further. The APIs provided in Java, and Scala could be more user-friendly for analysts to use in day-to-day work. There were some efforts to use Python (wrappers on Java/Scala APIs), but they could have been more user-friendly. The schedulers in classical, big-data and modern data analytics systems needed to be aligned, so running end-to-end pipelines was not easy. Also, providing an easier workflow for analysts to perform day-to-day tasks was important.

Figure 3 depicts an approximate performance estimation compared to the usability. Note that this is not a mathematical outcome based on experiments but a collective approximation based on the evolution based on timeline and experience. There is always room for improvement; engineers find ways to improve systems.

Figure 3.

Approximate estimation on big data systems performance vs. usability.

Around 2016, a few projects started focusing on better performance and usability. These projects realized the underneath issues in the existing data processing systems. Each project addressed various aspects of the data processing domain, but collectively they can offer a much better solution to data-driven applications.

Figure 4 depicts the main aspects where data engineering frameworks have been evolving. Distributed computing is not a new aspect of data engineering; the older frameworks can do distributed computing. But frameworks like Ray and Cylon have the edge over the existing distributed computing approaches for two main reasons. These two systems have a C++ core backing the performance of sequential operators and communication operations.

Figure 4.

Modern data engineering.

Although, before the time of high-performance computing engines, there was an invention called Dask [27] which was mainly designed to provide a set of operators on Pandas [28] to provide distributed computing on a primary workstation. These frameworks were seamlessly integrated with Pandas and Numpy, which made it easier to work with deep learning and machine learning libraries. It later evolved into a distributed framework even to scale in supercomputers. Dask-Distributed is a distributed computing framework that is designed to enable efficient processing of large-scale data sets in Dask with Python. It is built on top of standard Python libraries, such as NumPy [29], Pandas, and Scikit-Learn, and provides a flexible programming model that allows users to write distributed applications with ease. Dask also offers a range of distributed data structures, including arrays, data frames, and bags, which can be used to represent and manipulate large-scale data sets in a distributed environment. These data structures are designed to be familiar to users of NumPy, Pandas, and other Python libraries, making it easy to work with large data sets in a distributed setting. To distribute computations across multiple nodes, Dask uses a task scheduler, which enables users to schedule and manage analyses across a cluster of machines. The scheduler is designed to be fault-tolerant, ensuring that computations continue to run even if some nodes fail or become unavailable. Dask also includes a range of performance optimizations, such as data partitioning and compression, to ensure that computations are completed as efficiently as possible. But one of the critical challenges in the Dask system is it is entirely developed on Python. When running larger workloads, Dask tends to decline performance. The main reasons are the less performance from the Python language and bottlenecks with GIL when running compute-intensive workloads.

Example Dask Join [30].

1 import dask

2 large = dask.datasets.timeseries(freq=“10s”, npartitions=10)

3 small = dask.datasets.timeseries(freq=“1D”, dtypes=(“z”: int))

4 

5 small = small.repartition(npartitions=1)

6 result = large.merge(small, how=“left”, on=[“timestamp”])

A framework called Ray was introduced a few years back to provide an abstraction on distributed training for reinforcement learning and deep learning. Ray provides a flexible programming model that allows developers to write distributed applications in Python with minimal effort. Ray provides an actor based compute model which is easier to scale. It is built on top of the Apache Arrow data format, enabling efficient data transmission without serialization and deserialization between different programming languages or services. It also offers a distributed task scheduler, allowing the users to schedule and manage complex workflows across multiple nodes. Ray is optimized for machine learning applications and provides several built-in libraries and tools for developing and deploying ML models at scale. Overall, Ray aims to make it easy for developers to build and scale distributed applications without worrying about the underlying infrastructure. A commercial version of Ray allows users to work with cloud environments and design applications quite efficiently. An autoscale feature will enable developers to parallelize workloads with trivial command-line arguments. Both Ray and Dask are cloud-friendly tools. But there are systems that were invented way before the cloud was created and are very fast compared to big-data processing systems. These systems are built on high-performance computing (HPC) libraries like MPI [31], OpenMP [32], PGAS [33].

Example Ray code for data processing [34].

1 import ray

2 import pandas

3 

4 ds = ray.data.read_csv(“/path-to-iris-data”)

5 ds.show(3)

6 # Repartition the dataset to 5 blocks.

7 ds = ds.repartition(5)

8 # Find rows with sepal.length < 5.5 and petal.length > 3.5.

9 def transform_batch(df: pandas.DataFrame) -> pandas.DataFrame:

10 return df[(df[“sepal.length”] < 5.5) (df[“petal.length”] > 3.5)]

11 # Map processing the dataset.

12 ds.map_batches(transform_batch).show()

13 # Split the dataset into 2 datasets

14 ds.split(2)

15 # Sort the dataset by sepal.length.

16 ds = ds.sort(“sepal.length”)

17 ds.show(3)

18 # Shuffle the dataset.

19 ds = ds.random_shuffle()

20 ds.show(3)

21 # Group by the variety.

22 ds.groupby(“variety”).count().show()

With an HPC-oriented approach, a framework called Cylon has been specially designed on an MPI-backed collective communication model, which provides high-performance computing capability on Supercomputers. Cylon core communication and compute kernels are written in C++ and extended on Apache Arrow data structures to represent data efficiently. This allows the ability to seamlessly integrate with other Arrow-backed systems and provide efficient data movement from data engineering frameworks to machine learning and deep learning workloads [35, 36, 37]. The data structure used underneath is Apache Arrow. It uses Arrow compute kernels to do sequential relational algebra operations while using its partitioned API written on MPI to corresponding distributed operators. Cylon provides APIs in C++, Python and Java. But Cylon has focused on providing more support to Python users by providing a distributed DataFrame library which mimics Pandas but provides distributed operators which abstract away the complex communication algorithms. Cylon supports both CPU and GPU computing. As Apache Arrow for sequential computes, it uses CuDF [38] to do the sequential operators and uses a GPU-supported partition algorithm for the distributed operators. Cylon can be recognized as one of the earliest distributed DataFrame libraries supporting GPU and CPUs. Although Cylon is developed on HPC-based communication models, recently, it has adopted UCX and Gloo as communication backends to enable the workloads to run in cloud-native environments [39].

Big-data frameworks like Apache Spark, Apache Storm, Apache Flink and similar other frameworks have a downside when it comes to being a perfect match for data engineering for data analytics. The gap it left in data engineering is that it is hard to integrate them with the HPC-Pythonic data analytics stack, which runs on HPC hardware with a C++ core backend and an easy-to-use Python API. Cylon fills this gap by providing these high-performance communications and computing APIs to work with data. And enhancing this experience, DataFrames, which Pandas introduced, has become the bedrock of usability. This is one of the unique features of Cylon. Looking into the PyCylon code to do a join in listing 1.3, it is clear that the API is very similar to what Pandas is offering. Pandas has become the go-to tool for data processing for analytic data workloads. But Pandas is a sequential library. Because data analytics frameworks like PyTorch run on BSP-model, they enable MPI-enabled Cylon Dataframes to run seamlessly with data analytics workloads. It is vital when the exploratory analysis is done before designing the production-ready model, where data scientists play with the data to engineer the features and get feedback with the analytical algorithms. It is vital to note that introducing Pythonic HPC solutions is crucial in designing efficient data exploration research to build larger models.

Example Python code with pycylon.

1 from pycylon import read_csv, DataFrame, CylonEnv

2 frompycylon.netimport MPIConfig

3 

4 config: MPIConfig = MPIConfig()

5 env: CylonEnv = CylonEnv(config=config, distributed=True)

6 

7 df1: DataFrame = read_csv(’/tmp/csv1.csv’)

8 df2: DataFrame = read_csv(’/tmp/csv2.csv’)

9 

10 df3: DataFrame = df1.join(other=df2, on=[0], algorithm=“hash”, env=env)

11 

12 print(df3)

Both Ray and Cylon provide much better performance compared to Dask. The main reason is the efficiency of computing kernels written on C++ and distributed computing models. Dask also uses a driver-centric distributed computing model, which becomes a bottleneck in running more tasks that need more synchronization than a job with fewer tasks.

Numerous libraries, including Ray and Cylon, utilize Apache Arrow as their underlying data structure. Apache Arrow is a columnar in-memory data format that enables efficient read operations. Its core memory layout is based on a Columnar specification. It allows any framework to adopt the Arrow C Data Interface and extend it to create Arrow-compatible data structures without relying on the entire library. Several libraries have been built upon this columnar specification, including the compute API, dataset API, Flight SQL, Flight RPC, and Acero streaming execution engine. Apache Arrow has gained widespread adoption in various industrial frameworks and academic research, such as Apache Spark, Clickhouse, Dremio, and Polars. Moreover, it supports an extensive range of programming languages, with C++, Python, R, Java, C#, and Go being the most commonly used.

In the heart of data processing, there lie the query engines. Query engines are known to provide a higher-level API for users to run SQL queries or build query plans based on an API. A few key goals of a query engine are fast query execution, scalability, flexibility, concurrency, query optimization, usability, fault tolerance and extensibility. It should have a way to efficiently load, transform, persist and transmit across a wide array of other systems. Modern-day query engines are not built on Java or Scala like in the big-data era; they are built on C++ or Rust for performance requirements. Velox [40] is a high-performing query engine built on C++. It uses Apache Arrow columnar format and has its own set of compute kernels. Velox also supports Presto and Apache Spark query engines. In addition, it also supports feature engineering and data preprocessing in PyTorch. It is a novel system evolving in the high-performance query processing space. Velox is getting more traction in industrial and academic research.

DataFusion [41] is a robust data processing framework that provides users with two distinct methods for creating logical query plans: SQL and DataFrame API. This versatility enables users to choose the most suitable approach for their specific use cases and requirements. The framework also boasts a comprehensive query optimizer that employs advanced optimization techniques to improve the efficiency and performance of query execution. By analyzing and transforming the logical query plans, the optimizer ensures that the most effective execution strategies are used, resulting in faster processing times and better resource utilization. A key feature of DataFusion is its multi-threaded parallel execution engine, designed to process partitioned data sources, such as CSV and Parquet files, with exceptional speed. By using parallelism, the execution engine can distribute the workload across multiple threads or cores, significantly accelerating data processing and analysis. This parallel execution approach is particularly beneficial when working with large datasets, as it can effectively minimize processing times and overcome performance bottlenecks. In summary, DataFusion combines the flexibility of SQL and DataFrame API support, an advanced query optimizer, and a high-performance parallel execution engine to offer a robust and efficient solution for processing and analyzing partitioned data sources like CSV and Parquet files. Its versatile and powerful capabilities make it ideal for various data processing tasks and use cases.

Considering the discussed novel trends and data processing technologies, it is evident that data engineering has evolved from one dimension to another in less than a decade. Researchers and engineers have produced various enhancements to the data engineering stack, and high performance and better usability are the key aspects that have shown progress. Moreover, the number of novel platforms trying toimprove data engineering workloads gives a variety of options for the user. But most of these frameworks have focused on two essential things. Represent and transform data efficiently and seamlessly integrate with the data analytics workloads.

3.4 Anywhere query execution

Among hundreds of frameworks designed for data engineering with uniqueness, each framework is known to be better at specific tasks than the others. In a practical scenario, a few frameworks form data engineering workflows. The main challenge is communicating an idea or simply a query plan so that each framework can understand and do its part. Apache Beam can be recognized as a single framework which unifies frameworks like Apache Spark, Apache Flink, Apache Samza, Google Cloud Dataflow, Twister2, etc. This approach requires writing composite applications by using the Apache Beam API. But in the long run, maintaining such a code base and supporting various platforms is complex and costly. We must run your query anywhere with less overhead and maintenance.

Substrait [42] provides a cross-language specification for data computing operations. In simple terms, once a framework adopts the Substrait specification, it can run a query plan without involving any additional code except for the code required to load a Substrait-based execution plan and execute the framework-native plan. Substrait currently support types, expressions and relations. Under types, data types, type variations and functions (scalar, aggregate, window and Table) are defined. If a particular framework wants to extend upon the existing definitions, such modifications can be done with Substrait. Especially when defining new data types, function signatures and other custom representations of vivid components. These are defined as extensions, which can be defined in a YAML format [43].

At the query execution level, the most exciting component is the Relational algebra support in Substrait. It contains logical and physical relations defined to support most of the widely used relational algebra operators. Read, filter, sort, project, join, set, aggregate, and write are supported, operators. To understand what Substrait can offer, let us evaluate a sample query and how Substrait can represent it. Listing 1.4 shows a SQL query which performs a read operation on a table LINEITEM and reading L_EXTENDEDPRICE, L_TAX and L_DISCOUNT columns.

Example of a Substrait plan in SQL Format.

1 SELECT

2   t0.”L_EXTENDEDPRICE”,

3   t0.”L_TAX”,

4   t0.”L_DISCOUNT”

5 FROM “LINEITEM” AS t0

Substrait does not have a visual format at the moment. It is a protobuf-based non-human readable format which can be sent to Substrait supported framework to execute using the native query engine. But to visualize it with particular readability, we show it using the JSON format. Listing 1.5 offers the Substrait plan in Visual form.

Example of a Substrait plan in JSON Format.

1 {

2   “extensionUris”: [

3     {

4       “extensionUriAnchor”: 1

5     }

6   ],

7   “relations”: [

8     {

9       “root”: {

10         “input”: {

11         “read”: {

12          “common”: {

13           “direct”: {}

14          },

15          “baseSchema”: {

16           “names”: [

17            “L_EXTENDEDPRICE”,

18            “L_TAX”,

19            “L_DISCOUNT”

20          ],

21          “struct”: {

22           “types”: [

23            {

24             “fp32”: {

25            “nullability”: “NULLABILITY_NULLABLE”

26              }

27            },

28            {

29             “fp32”: {

30             “nullability”: “NULLABILITY_NULLABLE”

31              }

32            },

33            {

34             “fp32”: {

35             “nullability”: “NULLABILITY_NULLABLE”

36                }

37              }

38            ],

39             “nullability”: “NULLABILITY_REQUIRED”

40           }

41          },

42           “namedTable”: {

43               “names”: [

44                “LINEITEM”

45              ]

46            }

47          }

48        },

49        “names”: [

50           “L_EXTENDEDPRICE”,

51           “L_TAX”,

52           “L_DISCOUNT”

53        ]

54      }

55    }

56  ]

57 }

The Substrait plan contains information about the data source, a namedTable representing an in-memory data source. In addition, a file source or a glob can be referred to via URI to define the data source. Since this is a simple read operation, it only shows the baseSchema (schema of the data being read, not the schema of the dataset represented in the data source). One of the doubtful questions which arise is why not SQL? SQL is a language used for querying relational data, but it has limitations and lacks sufficient detail for processing. Therefore, modern systems often translate SQL queries into a query plan before executing them. Query plans can have multiple levels and transform, but no standard or open format exists for them. Substrait was created to provide a standard and available form for query plans and works alongside SQL to deliver capabilities that SQL lacks.

Acero (Arrow streaming query engine) [44], Velox, DataFusion and Ibis support Substrait. In terms of support, there are two aspects. There must be a set of producers who can produce Substrait plans and consumers who can execute them. Ibis supports the production and consumption of Substrait plans, while Velox, DataFusion and Acero mainly support Substrait plan consumption. But regarding a producer, the goto tool is isthmus [45].

Advertisement

4. Conclusions

The evolution of data engineering over the past decade has been characterized by a trend towards high-performance computing, with frameworks designed to keep up with the evolving demands of the field. The development of diverse frameworks, including Apache Arrow, Apache Parquet, Twister2, Cylon, Velox, and Datafusion, has been essential in providing high-performance data processing on a large scale. The shift towards bulk-synchronous parallel data processing, remote procedure call-based approaches, and extending the usability and extensibility of data analytics workloads have further enhanced the performance of data engineering frameworks. Additionally, the introduction of Substrait has enabled the efficient processing of data across multiple platforms, making it easier for data engineers to build complex data engineering workloads and run queries efficiently. Our study has shown how data analytics has impacted the evolution of data engineering and how modern-day data engineering frameworks have been developed with C++ core compute and communication kernels to facilitate high-performance data processing. Further research can explore the potential of these frameworks in real-world applications and evaluate their performance in handling even larger volumes of data.

Advertisement

Acknowledgments

I am grateful to my wife Kalani, parents, and brother for their unwavering support throughout the writing process. Their guidance and feedback were invaluable. I also thank everyone who contributed to this project.

Advertisement

Abbreviations

SVM

Support Vector Machines

HPC

High-Performance Computing

TPU

Tensor Processing Unit

GPU

Graphics Processing Unit

BSP

Bulk Synchronous Parallel

References

  1. 1. Krizhevsky A, Sutskever I, Hinton GE. Imagenet classification with deep convolutional neural networks. Communications of the ACM. 2017;60(6):84-90
  2. 2. Introducing ChatGPT. Available from: https://openai.com/blog/chatgpt [Accessed: March 5, 2023]
  3. 3. Hadoop. Apache. Available from: http://hadoop.apache.org [Accessed: November 30, 2022]
  4. 4. Moritz P, Nishihara R, Wang S, Tumanov A, Liaw R, Liang E, et al. Ray: A distributed framework for emerging AI applications. In: 13th USENIX Symposium on Operating Systems Design and Implementation (OSDI 18). 2018. pp. 561-577
  5. 5. Pedregosa F et al. Scikit-learn: Machine learning in python. The Journal of Machine Learning Research. 2011;12:2825-2830
  6. 6. Paszke A, Gross S, Massa F, Lerer A, Bradbury J, Chanan G, et al. Pytorch: An imperative style, high-performance deep learning library. In: Advances in Neural Information Processing Systems. 2019. p. 32
  7. 7. Abadi M, Barham P, Chen J, Chen Z, Davis A, Dean J, et al. Tensorflow: A system for large-scale machine learning. In: Osdi. Vol. 16, No. 2016. 2016. pp. 265-283
  8. 8. Apache MXNet. Amazon Web Services. 2015. Available from: https://mxnet.apache.org/ [Accessed: March 19, 2023]
  9. 9. Jia Y, Shelhamer E, Donahue J, Karayev S, Long J, Girshick R, et al. Caffe: Convolutional architecture for fast feature embedding. In: Proceedings of the 22nd ACM International Conference on Multimedia. 2014. pp. 675-678
  10. 10. Bergstra J, Breuleux O, Bastien F, Lamblin P, Pascanu R, Desjardins G, et al. Theano: A CPU and GPU math expression compiler. In: Proceedings of the Python for Scientific Computing Conference (SciPy). Vol. 4, No. 3. 2010. pp. 1-7
  11. 11. Tokui S, Oono K, Hido S, Clayton J. Chainer: A next-generation open source framework for deep learning. In: Proceedings of Workshop on Machine Learning Systems (LearningSys) in the Twenty-Ninth Annual Conference on Neural Information Processing Systems (NIPS). Vol. 5. 2015. pp. 1-6
  12. 12. TorchScript. PyTorch. 2021. Available from: https://pytorch.org/docs/stable/jit.html [Accessed: March 19, 2023]
  13. 13. Green AI. By Roy Schwartz, Jesse Dodge, Noah A. Smith, Oren Etzioni Communications of the ACM, 2020, Vol. 63 No. 12, Pages 54-63 10.1145/3381831 https://cacm.acm.org/magazines/2020/12/248800-green-ai/fulltext?mobile=false
  14. 14. Mazumder M, Banbury C, Yao X, Karlaš B, Rojas WG, Diamos S, et al. Dataperf: Benchmarks for data-centric ai development. arXiv preprint arXiv:2207.10062. 2022
  15. 15. Apache Arrow. The Apache Software Foundation. 2016. Available from: https://arrow.apache.org/ [Accessed: March 19, 2023]
  16. 16. Apache Arrow Flight. The Apache Software Foundation. 2020. Available from: https://arrow.apache.org/docs/format/Flight.html [Accessed: March 19, 2023]
  17. 17. Zaharia M et al. Spark: Cluster computing with working sets. HotCloud. 2010;10(10–10):95
  18. 18. Apache Kafka. The Apache Software Foundation. 2011. Available from: https://kafka.apache.org/ [Accessed: March 19, 2023]
  19. 19. Apache Flink. The Apache Software Foundation. 2014. Available from: https://flink.apache.org/[Accessed: March 19, 2023]
  20. 20. Apache Beam. The Apache Software Foundation. 2016. Available from: https://beam.apache.org/ [Accessed: March 19, 2023]
  21. 21. Apache Storm. The Apache Software Foundation. 2011. Available from: https://storm.apache.org/ [Accessed: March 19, 2023]
  22. 22. Google Cloud Dataflow. Google. 2014. Available from: https://cloud.google.com/dataflow [Accessed: March 19, 2023]
  23. 23. Feitelson DG, Rudolph L. Gang scheduling performance benefits for fine-grain synchronization. Journal of Parallel and Distributed Computing. 1992;16(4):306-318
  24. 24. Ekanayake J, Li H, Zhang B, Gunarathne T, Bae S-H, Qiu J, et al. Twister: A runtime for iterative mapreduce. In: Proceedings of the 19th ACM International Symposium on High Performance Distributed Computing. 2010. pp. 810-818
  25. 25. Kamburugamuve S et al. Twister2: Design of a big data toolkit. Concurrency and Computation: Practice and Experience. 2020;32(3):e5189
  26. 26. Widanage C, Perera N, Abeykoon V, Kamburugamuve S, Kanewala TA, Maithree H, et al. High performance data engineering everywhere. In: 2020 IEEE International Conference on Smart Data Services (SMDS), Remote. IEEE; 19 Oct 2020. pp. 122-132
  27. 27. Rocklin M. Dask: Parallel computation with blocked algorithms and task scheduling. In: Proceedings of the 14th Python in Science Conference. Vol. 130. Austin, TX: SciPy; 2015
  28. 28. McKinney W. Pandas: A foundational python library for data analysis and statistics. Python for high performance and scientific computing. 2011;14(9):1-9
  29. 29. Van Der Walt S, Colbert SC, Varoquaux G. The NumPy array: A structure for efficient numerical computation. Computing in Science & Engineering. 2011;13(2):22-30
  30. 30. Dask Join. Available from: https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.join.html
  31. 31. Dongarra JJ, Otto SW, Snir M, Walker D, et al. An introduction to the MPI standard. Communications of the ACM. 1995;18
  32. 32. Dagum L, Menon R. OpenMP: An industry standard API for shared-memory programming. IEEE Computational Science and Engineering. 1998;5(1):46-55
  33. 33. Chapman B, Curtis T, Pophale S, Poole S, Kuehn J, Koelbel C, et al. Introducing OpenSHMEM: SHMEM for the PGAS community. In: Proceedings of the Fourth Conference on Partitioned Global Address Space Programming Model. 2010. pp. 1-3
  34. 34. Ray Dataset. Available from: https://docs.ray.io/en/latest/data/transforming-datasets.html
  35. 35. Kamburugamuve S, Widanage C, Perera N, Abeykoon V, Uyar A, Kanewala TA, et al. Hptmt: Operator-based architecture for scalable high-performance data-intensive frameworks. In: 2021 IEEE 14th International Conference on Cloud Computing (CLOUD), Chicago, IL, USA. IEEE; 2021. pp. 228-239
  36. 36. Abeykoon V, Perera N, Widanage C, Kamburugamuve S, Kanewala TA, Maithree H, et al. Data engineering for hpc with python. In: 2020 IEEE/ACM 9th Workshop on Python for High-Performance and Scientific Computing (PyHPC), GA, USA. IEEE; 2020. pp. 13-21
  37. 37. Perera N, Abeykoon V, Widanage C, Kamburugamuve S, Kanewala TA, Wickramasinghe P, et al. A fast, scalable, universal approach for distributed data aggregations. In: 2020 IEEE International Conference on Big Data (Big Data), GA, USA. IEEE; 2020. pp. 2691-2698
  38. 38. cuDF. NVIDIA Corporation. 2021. Available from: https://github.com/rapidsai/cudf
  39. 39. Perera N, Kamburugamuve S, Widanage C, Abeykoon V, Uyar A, Shan K, et al. High performance dataframes from parallel processing patterns. arXiv preprint arXiv:2209.06146. 2022
  40. 40. Pedreira P et al. Velox: meta’s unified execution engine. Proceedings of the VLDB Endowment. 2022;15(12):3372-3384
  41. 41. Datafusion. Apache Arrow. Available from: https://arrow.apache.org/datafusion/ [Retrieved: February 28, 2023]
  42. 42. Substrait. Available from: https://substrait.io/
  43. 43. Substrait YAML Spec. Available from: https://github.com/substrait-io/substrait/blob/main/text/simple_extensions_schema.yaml
  44. 44. Acero, Streaming Execution Engine for Apache Arrow. Available from: https://arrow.apache.org/docs/cpp/streaming_execution.html
  45. 45. Substrait Isthmus. Available from: https://github.com/substrait-io/substrait-java/tree/main/isthmus

Written By

Vibhatha Abeykoon and Geoffrey Charles Fox

Submitted: 20 March 2023 Reviewed: 24 March 2023 Published: 03 May 2023

Please ensure Javascript is enabled for purposes of website accessibility