0
votes

Hi I'm new to Spark and Amazon EMR cluster.

I tried to write a demo spark application that can run on Amazon EMR cluster. When the code runs on Zeppelin notebook, it returns output which I thought output will be saved as a single file on Amazon EMR cluster as below:

%pyspark
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
if type(df.c) == type(upper(df.c)) == type(df.c.isNull()):
    df_new = df.withColumn('upper_c', upper(df.c))
df_new

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+

The Spark application:

from pyspark.sql import SparkSession
from pyspark.sql import Column
from pyspark.sql.functions import upper
from datetime import datetime, date
import argparse

def pre_processing(output_uri):

    spark =  SparkSession.builder.appName("process sample data").getOrCreate()
    rdd = spark.sparkContext.parallelize([
        (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
        (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
        (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
    ])
    df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
    spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
    if type(df.c) == type(upper(df.c)) == type(df.c.isNull()):
        df_new = df.withColumn('upper_c', upper(df.c))
    df_new
    df_new.write.option("header", "true").mode("overwrite").csv(output_uri)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--output_uri', help="The URI where output is saved")
    args = parser.parse_args()
    pre_processing(args.output_uri)

However, when I ran its Spark application on Cluster, it saves multi CSV files to S3 bucket. I wonder why my Spark application saves DataFrame to multi files on S3 bucket.

Arguments given to Spark application as below:

spark-submit --deploy-mode cluster s3://<BUCKET_NAME>/spark_application/emr_demo_app.py --output_uri s3://<BUCKET_NAME>/output

Thanks in advance.

ps: Once I followed AWS EMR tutorial as following page, and the sample application saved as a single CSV file.

1

1 Answers

1
votes

Spark works with the concept of partitions in order to parallelize the tasks among the workers. The dataframe is also partitioned and when the save action is called, each worker will save a portion of the dataframe creating multiple files.

In order to create a single file, just repartition or coalesce the dataframe into one partition:

df_new.repartition(1).write.option("header", "true").mode("overwrite").csv(output_uri)

All the data is send to a worker which then saves the records into one file. If the dataset is too large, you will have a bottleneck issue. A similar answer is found here: Write single CSV file using spark-csv