This is a guest blog by Chengzhi Zhao with an original blog source.
Traditionally, if you want to run a single Spark job on EMR, you might follow the steps: launching a cluster, running the job which reads data from storage layer like S3, performing transformations within RDD/Dataframe/Dataset, finally, sending the result back to S3. You end up having something like this:
If we add more Spark jobs across multiple clusters, you could have something like this.
There can be more use cases, for example, sometimes we need to store the intermediate result of Spark job A, and used the intermediate result as input for Spark job B; sometimes you would have multiple Spark jobs read data from the same dataset multiple times. As for now, each of the Spark jobs have to read input data from disk then process it. What if we can read input from memory? Alluxio is a solution for it.
Alluxio is the storage underneath that usually collocates with the computation frameworks, so that Alluxio can provide fast storage, facilitating data sharing and locality between jobs, regardless of whether they are running on the same computation engine.
I will show you how to set up Alluxio 1.8.1 on EMR 5.17 with a bootstrap script and compare the data loading performance.
Some more detail is described in the following repository: https://github.com/ChengzhiZhao/Alluxio-EMR-bootstrap
STEP 1: set up a bootstrap script
Use bootstrap.sh script includes download Alluxio 1.8.1 and setup required permissions on EMR
STEP 2: add the required configuration.
[ { "Classification": "core-site", "Properties": { "fs.alluxio.impl": "alluxio.hadoop.FileSystem", "fs.AbstractFileSystem.alluxio.impl": "alluxio.hadoop.AlluxioFileSystem" } }, { "Classification": "spark-defaults", "Properties": { "spark.driver.extraClassPath": ":/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/opt/alluxio-1.8.1-hadoop-2.8/client/alluxio-1.8.1-client.jar", "spark.executor.extraClassPath": ":/opt/alluxio-1.8.1-hadoop-2.8/client/alluxio-1.8.1-client.jar" } }]
STEP 3: launch the cluster, take a break and relax ☕️
STEP 4: ssh to EMR cluster and perform copy test data to make sure Alluxio is running
cd /opt/alluxio-1.8.1-hadoop-2.8 # copy files to alluxio bin/alluxio fs copyFromLocal LICENSE /Input # Copied file:///opt/alluxio-1.8.1-hadoop-2.8/LICENSE to /Input # verify files by listing alluxio bin/alluxio fs ls / # 26847 NOT_PERSISTED 02-18-2019 19:22:28:025 100% /Input
You can also check Alluxio UI by going to {Master public DNS}:19999
Click “Browse” and you should see Input file there with default 128 MB as block size.
STEP 5: Use Spark shell to read the “Input” file from Alluxio
#launch spark-shell spark-shell #read data from alluxio and use the DNS -- in this case, ip-10-192-4-226.ec2.internal val s = sc.textFile("alluxio://ip-10-192-4-226.ec2.internal:19998/Input") #s: org.apache.spark.rdd.RDD[String] = alluxio://ip-10-192-4-226.ec2.internal:19998/Input MapPartitionsRDD[1] at textFile at <console>:24 s.count res0: Long = 476
STEP 6: Read and compare as a real example
Note: You can use any test_data_set with Spark, I just pick a test Avro file online.
# read test_data_set val df=spark.read.format("com.databricks.spark.avro").load("s3://test_bucket/test_data_set/date=2019-04-18") spark.time(df.count) #Time taken: 29609 ms #res0: Long = 86731200 # write df to alluxio spark.time(df.write.parquet("alluxio://ip-10-192-4-226.ec2.internal:19998/data.test_data_set")) #Time taken: 45775 ms # first execution spark.time(spark.read.parquet("alluxio://ip-10-192-4-226.ec2.internal:19998/data.test_data_set").count) #Time taken: 13477 ms #res3: Long = 86731200 # second execution spark.time(spark.read.parquet("alluxio://ip-10-192-4-226.ec2.internal:19998/data.test_data_set").count) #Time taken: 372 ms #res4: Long = 86731200 # restart with another spark-shell spark.time(spark.read.parquet("alluxio://ip-10-192-4-226.ec2.internal:19998/data.test_data_set").count) #Time taken: 9606 ms #res0: Long = 86731200
As you can see, the second execution is faster, it is similar as we perform cache. But if we close and open another spark-shell, since we read data from Alluxio and data is kept in memory, it would be faster. If you are interested, please refer to this great talk — Best Practices for Using Alluxio with Apache Spark which talks benchmark for Alluxio compared with Spark.
Performance Comparison (Count job with reading from Disk vs. Alluxio)
pros: 1) faster speed to read data as df than from S3
cons: 1) Alluxio persist data first in memory to achieve speed, so spark job could have less memory to run jobs. 2) there are overhead time to write df to Allluxio first
What’s not been covered is Alluxio is also a great storage sharing layer for multiple jobs read and write data. You could have a Flink job that writes to Alluxio and later used by Spark, there are more interesting topics on it. This post focus on how to setup Alluxio 1.8.1 on EMR and run simple test data on it.
Blog
We are thrilled to announce the general availability of Alluxio Enterprise for Data Analytics 3.2! With data volumes continuing to grow at exponential rates, data platform teams face challenges in maintaining query performance, managing infrastructure costs, and ensuring scalability. This latest version of Alluxio addresses these challenges head-on with groundbreaking improvements in scalability, performance, and cost-efficiency.
We’re excited to introduce Rapid Alluxio Deployer (RAD) on AWS, which allows you to experience the performance benefits of Alluxio in less than 30 minutes. RAD is designed with a split-plane architecture, which ensures that your data remains secure within your AWS environment, giving you peace of mind while leveraging Alluxio’s capabilities.
PyTorch is one of the most popular deep learning frameworks in production today. As models become increasingly complex and dataset sizes grow, optimizing model training performance becomes crucial to reduce training times and improve productivity.