This is a guest blog from a team of Intel engineers, originally published as a white paper https://software.intel.com/en-us/articles/speed-big-data-analytics-on-the-cloud-with-an-in-memory-data-accelerator. We cross post this article with the author's permission.
Introduction
Discontinuity in big data infrastructure drives storage disaggregation, especially in companies experiencing dramatic data growth after pivoting to AI and analytics. This data growth challenge makes disaggregating storage from compute attractive because the company can scale their storage capacity to match their data growth, independent of compute. This decoupled mode allows the separation of compute and storage, enabling users to rightsize hardware for each layer. Users can buy high-end CPU and memory configurations for the compute nodes, and storage nodes can be optimized for capacity.
Many traditional companies who used to be big users of Oracle* Real Application Clusters (RAC) or Oracle Exadata* are now looking for solutions to move off these platforms to something more cloud-like, open source, and that can be readily integrated into their AI/analytics investments. By decoupling compute and storage, multiple compute clusters running Apache Hadoop*, Apache Spark*, Apache Kafka*, MongoDB*, Apache Cassandra*, or data science tools like TensorFlow* can share access to a common data repository/data lake.
This leads to cost savings in storage capacity. Enterprise-grade shared storage with consistent performance, and a rich set of data services can be used for simplified data management and reduction in provisioning overheads. This can also help simplify and improve security by using shared storage data-at-rest encryption capabilities.
According to IDC, through 2020, spending on cloud-based big data analytics technology will grow 4.5x faster than spending for on-premises solutions [1]. Similarly, Gartner has noted, "Cloud-based big data services offer impressive capabilities like rapid provisioning, massive scalability, and simplified management." And in the report “Move Your Big Data Into The Public Cloud “[2] sponsored by Oracle and Intel, Forrester Research wrote, "companies that move more into the cloud for big data analytics achieve greater innovation, increased integration, and higher levels of security." And “Public cloud adoption is the top priority for technology decision makers investing in big data.”
Furthermore, when running big data in cloud-based storage, new technologies like Storage Performance Developer Kit (SPDK), remote direct memory access (RDMA), and Intel® Optane™ DC persistent memory can be used to accelerate performance. Today, a long I/O stack of big data buries hardware performance, but it is challenging to shorten the IO stack to eliminate unnecessary kernel and user space copy. .
This whitepaper is a continuation of Unlock Big Data Analytics Efficiency with Compute and Storage Disaggregation on Intel® Platforms [3].
Cloud Architecture Evolution for Big Data Analytics
Storage Disaggregation
Disaggregating storage with big data compute services is becoming increasingly popular in data centers. Running Spark* [4] on disaggregated cloud storage introduces benefits such as liberating clusters from the performance and scalability limitations, simplifying data center management with a shared data lake, and reducing total cost of ownership (TCO).
To evaluate performance differences between running big data on traditional on-premise configuration with disaggregated cloud storage, we scheduled three types of workloads, covering batch queries, IO intensive workloads, and CPU intensive workload.
This evaluation uncovered two significant performance gaps: first, cloud storage does not natively act like a file system and lacks critical features such as transactional rename support, and second, cloud storage takes less advantage of system memory as buffers or page caches. We identified optimizations that can be made to overcome these performance gaps. We describe the analysis and two optimizations in the next section.
Disaggregate Storage with In-Memory Acceleration
After evaluating running big data on both disaggregated cloud storage and a provisioned orchestration framework, we noticed that, to make solutions for big data increasingly more scalable and flexible, there is an urgent need to optimize disaggregated cloud storage performance.
The solution is to add an “In-memory acceleration” layer to eliminate the two main issues that cause performance degradation. The first issue is that disaggregated cloud storage lacks filesystem semantics like rename; the other issue is that disaggregated cloud storage can’t leverage memory for use as buffers and page caches. Adding an “In-memory acceleration” layer solves both issues, so we can both maximize the benefit brought by cloud storage and achieve competitive or even better performance than traditional on-premise configuration.
Storage Disaggregation
Configurations
System Configuration
The test cluster consists of ten nodes, including five compute nodes and five storage nodes. All of the nodes are equipped with Intel® Xeon® processor E5-2699 v4.
For the compute nodes, we ran five Spark* executors for each node, using 5* 22G memory with two Intel® SSD DC P4500 series as Spark* shuffle devices.
For the storage node, we ran both the Hadoop Distributed File System (HDFS*) [5] and Ceph* [6] on seven 1TB HDDs on each node. The total storage pool size is 5 * 7 / 3(replica) = 11.6T. For Ceph, we deployed Ceph* radosgw on each node to fully use network bandwidth. In comparison with HDFS*, Ceph* OSD needs to use Ceph* radosgw to communicate with Spark* executors as the I/O path will be longer than by using HDFS*. Since when using S3A Ceph* as backend, Read IO path should be from Ceph* OSD to Ceph* Radosgw to Spark*, which is much longer than when using HDFS*, read IO path is from HDFS* Datanode directly to Spark* executors.
Compute Node ConfigurationCPU Intel® Xeon™ processor Gold 6140 @ 2.3GHz Memory 384 GB NIC Intel Corporation Ethernet Connection X722 for 10GBASE-T Storage 5x Intel® SSD DC P4500 Series (two for Spark* shuffle) Software Configuration Hadoop* 2.8.1; Apache Spark* 2.2.0; Apache Hive* 2.2.1; CentOS 7.5, JDK 1.8.0_131
Table 1. Compute nodes configuration
Storage Node Configuration CPU Intel® Xeon™ processor Gold 6140 @ 2.3GHz Memory 192 GB NIC 2x Intel Corporation Ethernet Connection X722 for 10GBASE-T Storage 7x 1TB HDD for Ceph* bluestore or HDFS namenode and datanode Software Configuration Hadoop* 2.8.1; CentOS 7.5; Ceph Luminois(12.2.5)
Table 2. Storage nodes configuration
Test Methodology
To simulate common usage scenarios in big data applications, we tested three use cases:
- Batch Query Analytics
We leveraged 54 queries derived from TPC-DS* [7] (TPC benchmark with decision support) with intensive reads across objects in different buckets to consistently execute analytical processes of a large set of data. For data preparation, we built a 1T text dataset and transformed it into Parquet format, and then dropped the page cache before each run to eliminate performance impact.
- I/O Intensive Benchmark
We used Terasort as our I/O intensive benchmark. Terasort is a popular benchmark that measures the amount of time to sort one terabyte of randomly distributed data on a given computer system. Since Terasort needs to read the entire 1T of data from storage, then sort it and write it back to storage, bandwidth impacts a lot of its performance.
- CPU Intensive Benchmark
We used K-means as our CPU intensive workload. The K-Means algorithm iteratively attempts to determine clusters within the test data by minimizing the distance between the mean value of cluster center vectors, and the new candidate cluster member vectors. This requires a large number of distance calculations in each iteration of the data set. So, for K-means, once data is promoted from storage, it is a CPU-intensive workload to Spark. In our test, the dataset size is 360G.
Disaggregated Cloud Storage
To better evaluate and analyze the performance of storage disaggregation, we conducted tests using three different configurations: traditional on-premise configuration to co-locate HDFS* and compute, disaggregate HDFS* to storage side, and disaggregate cloud storage by Ceph*. These tests show how network and storage implementation impact performance.
For the disaggregated HDFS* vs. co-located HDFS* configuration test, the performance impact is quite slight. For batch queries, disaggregated HDFS* showed a 10% performance degradation. I/O intensive workloads, using Terasort with a 1T dataset, disaggregate HDFS* showed better performance than co-located HDFS since there were ten nodes tested on disaggregated HDFS* while only five nodes in co-locate HDFS*. So, the total memory size of disaggregate HDFS* is 1.5x bigger than the co-located HDFS*. For the CPU intensive test, we barely saw a difference with these two configurations.
There are performance gaps when comparing disaggregated S3A Ceph* cloud storage vs. co-located HDFS* configurations. For batch queries, disaggregated S3A Ceph* cloud storage showed a 30% performance degradation. I/O intensive workload using Terasort had a performance degradation as significant as 60%. And for CPU intensive workload using K-means, the performance also showed 50% degradation.
After further investigating system data, we noticed that there are two main reasons leading to the negative performance impact brought by disaggregated S3A Ceph* cloud storage. One major cause is that when using S3A Ceph* cloud storage in the Hadoop* system, we relied on an S3A adapter. S3A is not a filesystem and does not natively support transactional writes (TW). Most big data analytics software (such as Apache Spark* or Apache Hive*) relies on HDFS*’s atomic rename feature to support atomic writes, and during job submit, tasks submit output to temporary locations first, and only moving (renaming) data to the final location upon job completion. Since S3A lacks native support for moving and renaming, it implements this with: COPY + DELETE + HEAD + POST, a combination of operations which brings additional read and write bandwidth to cloud storage. Figure 5 demonstrates this behavior, S3A Ceph* cloud storage network bandwidth is shown on the left side, and disaggregate HDFS* network bandwidth is shown on the right side. The read bandwidth line shown on the left side is caused by S3A using read and write to implement moving.
Since S3A lacks native support for moving and renaming, it implements this with: COPY + DELETE + HEAD + POST, a combination of operations which brings additional read and write bandwidth to cloud storage. Figure 5 demonstrates this behavior, S3A Ceph* cloud storage network bandwidth is shown on the left side, and disaggregate HDFS* network bandwidth is shown on the right side. The read bandwidth line shown on the left side is caused by S3A using read and write to implement moving.
Another cause is that disaggregated S3A Ceph* cloud storage can’t use memory as buffers and page cache as HDFS* did since cloud storage lacks a good data-locality concept as HDFS*, and has a different implementation in data consistency. I/O in cloud storage will Ack until all replications hit disks while in HDFS* case, I/O may Ack a completion when replications hit data node buffers. As shown in figure 6, on Ceph*, memory utilization is about 25% while the memory is almost used up in HDFS* case.
S3A Connector Adapter Optimization
Since the implementation of an S3A job commit mechanism greatly impacts cloud storage performance, a new feature called “S3A Committer”[8] has been part of Hadoop since version 3.1.1. S3A committer makes explicit use of this multipart upload (“MPU”) mechanism and provides two optimized protocol to make data output much faster. In Table 3, we list these two committers, staging and magic, with their operations in different phases.
FeatureStagingMagicTask Output Destination local disk S3Awithout completing the write Task Commit Process upload data from disk to S3 list all pending uploads on s3 and write details to job attempt directory Task Abort Process delete local disk data list all pending uploads and abort them Job Commit list & complete pending uploads list & complete pending uploads
Table 3. S3A commiter implementation
In our test of the staging committer, temporary output data is written to local disk first when tasks commit, resulting data will be written to cloud storage only once.
Performance improved by 1.5x after using an S3A committer (staging committer), and in Figure 8, you can see the read I/O in the output stage is gone.
There is still 40% performance degradation with S3A Committer compared with co-locate HDFS* performance.
In-Memory Data Acceleration
Cloud Big Data Analytics with In-Memory Data Acceleration
In the above chapters, we evaluated storage disaggregate and S3A adapter optimization, and noticed two issues causing performance gaps between disaggregate cloud storage with a traditional on-premise configuration. To further optimize Spark on disaggregate cloud storage and benefit from rapid provisioning, excellent scalability, easy management, and pay as you grow flexibility, we added an “In-Memory Data Acceleration” layer to support big data filesystem operation natively and better utilize memory to improve the performance.
Accelerating with In-Memory Data Acceleration as Cache
Configurations
To eliminate the existence overhead brought by S3A, we proposed adding a memory layer between the storage systems and the computation frameworks and applications to accelerate Spark* process speed. As shown in Figure 10, when using Alluxio*[9] as a cache layer, data is promoted from Ceph* to Spark* executor local Alluxio* worker and then used by Spark*. And when Spark* executor outputs data back to Ceph*, it outputs data to Alluxio* first then flushes to Ceph* asynchronously.
Compute Nodes ConfigurationSoftware Configuration Hadoop* 2.8.1; Apache Spark* 2.2.0; Apache Hive* 2.2.1; CentOS 7.5; Alluxio* 2.0.0-SNAPSHOT
Table 4. Compute nodes configuration
In this test, we evaluated the performance of adopting Alluxio* as cache on the S3A Ceph* cloud storage, since we still saw a 40% performance degradation when running Terasort on S3A Ceph* compared with traditional on-premise configuration after S3A connector optimization.
We used Alluxio* 2.0.0 in this test, deploying Alluxio* worker to all Spark* running nodes, with the assumption that since we are using Spark* in Yarn, and Spark* executors may switch to different physical nodes every time, the benefit we may observe from Alluxio* is better I/O behavior of promoting and flushing.
Performance and Analysis
We tested on deploying Alluxio* with five 200G Memory, and all Alluxio* tests are based on disaggregation S3A Ceph* cloud storage configuration, so we can tell the exact performance improvement by adding the in-memory data acceleration.
According to the results, both configurations provide a significant performance improvement.
For batch queries, Performance with Alluxio* shows more than 1.42x improvement compared with disaggregate S3A Ceph* cloud storage and performed similarly to traditional on-premise configuration. I/O intensive workload on Terasort, Performance with Alluxio* shows more than 3.5x performance improvement. And when compared with traditional on-premise configuration, disaggregate S3A Ceph* cloud storage with Alluxio* shows 1.4x performance improvement in the Terasort test. For CPU intensive workload using K-Means, performance with Alluxio* shows 1.4x improvement while compared to traditional on-premise configuration and performance with Alluxio* disaggregate S3A Ceph* cloud storage still indicates 10% worse than traditional on-premise configuration.
So, from the above data, we can conclude that using Alluxio* as the cache can eliminate the performance overhead brought by S3A and still benefits from deploying big data on cloud storage. When the workload is IO intensive, it is promisingly more beneficial to adopt Alluxio as the cache.
Further work
In-Memory Data Acceleration with Spark-PMoF
Design and Objective
We have shown that using an in-memory data accelerator (IMDA) as Spark cache improved the Spark process speed significantly, and for further optimization, we propose a Spark module called Spark-PMoF, which will enable Intel® Optane™ DC persistent memory module and RDMA support in Spark shuffle and also external Spark shuffle.
The workflow of using Spark-PMoF as an IMDA layer is shown in figure 13, shuffle data will be written to the Intel® Optane™ DC persistent memory module device or memory by the Persistent Memory Developed Kit (PMDK)[10], and shuffle data transmission among executors is leveraging RDMA to bypass some memory copy and offload CPU cycles.
Initial implementation and evaluation are complete and will be covered in the next paper in this series.
Summary
In this paper, we evaluated performance using three configurations: storage disaggregation, accelerating disaggregated cloud storage with S3A committer, and accelerating disaggregated cloud storage with in-memory data acceleration as the cache. According to our evaluation, performance with disaggregate cloud storage shows gaps between 10% - 40% in comparison with traditional on-premise configuration.
Through deploying IMDA as cache, tested by an IO intensive workload, it brought 3.5x improvement to disaggregate storage, and 1.4x improvement compared to a traditional on-premise configuration.
Reference
Blog
We are thrilled to announce the general availability of Alluxio Enterprise for Data Analytics 3.2! With data volumes continuing to grow at exponential rates, data platform teams face challenges in maintaining query performance, managing infrastructure costs, and ensuring scalability. This latest version of Alluxio addresses these challenges head-on with groundbreaking improvements in scalability, performance, and cost-efficiency.
We’re excited to introduce Rapid Alluxio Deployer (RAD) on AWS, which allows you to experience the performance benefits of Alluxio in less than 30 minutes. RAD is designed with a split-plane architecture, which ensures that your data remains secure within your AWS environment, giving you peace of mind while leveraging Alluxio’s capabilities.
PyTorch is one of the most popular deep learning frameworks in production today. As models become increasingly complex and dataset sizes grow, optimizing model training performance becomes crucial to reduce training times and improve productivity.