1
votes

I am running pyspark job on EMR ( 5.5.1 ) with Spark 2.1.0, Hadoop 2.7.3, Hive 2.1.1, Sqoop 1.4.6 and Ganglia 3.7.2 which is loading data from s3. There are multiple buckets that contain input files so I have a function which uses boto to traverse through them and filter them out according to some pattern.

Cluster Size: Master => r4.xlarge , Workers => 3 x r4.4xlarge

Problem: The function getFilePaths returns a list of s3 paths which is directly fed to spark dataframe load method.

Using Dataframe

file_list = getFilePaths() # ['s3://some_bucket/log.json.gz','s3://some_bucket/log2.json.gz']
schema = getSchema()  # for mapping to the json files
df = sparkSession.read.format('json').load(file_list, schema=schema)

Using RDD

master_rdd = sparkSession.sparkContext.union(
    map(lambda file: sparkSession.sparkContext.textFile(file), file_list)
)
df = sparkSession.createDataFrame(master_rdd, schema=schema)

The file_list can be a huge list ( max 500k files ) due to large amount of data & files. Calculation of these paths only takes 5-20mins but when trying to load them as dataframe with spark, spark UI remains inactive for hours i.e. not processing anything at all. The inactivity period for processing 500k files is above 9hrs while for 100k files it is around 1.5hrs.

Viewing Gangilla metrics shows that only driver is running/processing while workers are idle. There are no logs generated until the spark job has finished and I haven't got any success with 500k files.

enter image description here

I have tried s3, s3n connectors but no success.

Question:

  • Figure out the root cause of this delay?
  • How can I debug it properly ?
2
Had some issues with spark not reading files in parallel and went this route stackoverflow.com/questions/28685874/… - David

2 Answers

1
votes

In general, Spark/Hadoop prefer to have large files they can split instead of huge numbers of small files. One approach you might try though would be to parallelize your file list and then load the data in a map call.

I don't have the resources right now to test this out, but it should be something similar to this:

file_list = getFilePaths()
schema = getSchema()  # for mapping to the json files

paths_rdd = sc.parallelize(file_list)

def get_data(path):
    s3 = boto3.resource('s3')

    obj = s3.Object(bucket, path)
    data = obj.get()['Body'].read().decode('utf-8')
    return [json.loads(r) for r in data.split('\n')]

rows_rdd = rdd.flatMap(get_data)
df = spark.createDataFrame(rows_rdd, schema=schema)

You could also make this a little more efficient by using mapPartition instead so you don't need to recreate the s3 object each time.

EDIT 6/14/18:

With regards to handling the gzip data, you can decompress a stream of gzip data using python as detailed in this answer: https://stackoverflow.com/a/12572031/1461187 . Basically just pass in obj.get()['Body'].read() into the function defined in that answer.

1
votes

There's two performance issues surfacing

  1. reading the files: gzip files can't be split to have their workload shared across workers, though with 50 MB files, there's little benefit in splitting things up
  2. The way the S3 connectors spark uses mimic a directory structure is a real performance killer for complex directory trees.

Issue #2 is what slows up partitioning: the initial code to decide what to do, which is done before any of the computation.

How would I go about trying to deal with this? Well, there's no magic switch here. But

  • have fewer, bigger files; as noted, Avro is good, so are Parquet and ORC later.
  • use a very shallow directory tree. Are these files all in one single directory? Or in a deep directory tree? The latter is worse.

Coalesce the files first.

I'd also avoid any kind of schema inference; it sounds like you aren't doing that (good!), but for anyone else reading this answer: know that for CSV and presumably JSON, schema inference means "read through all the data once just to work out the schema"