4
votes

I just got introduced to this wonderful world of Big Data and Cloud technology, using GCP(dataproc) and pyspark. I have ~5 GB size JSON file(zipped, gz file) containing ~5 million records, I need to read each row and process only those rows which satisfies a certain condition. I have my working code and I issued a spark-submit with --num-partitions=5 but still only one worker is used to carry out the action.

This is the spark-submit command I am using:

spark-submit --num-executors 5 --py-files /home/user/code/dist/package-0.1-py3.6.egg job.py

job.py:

path = "gs://dataproc-bucket/json-files/data_5M.json.gz"
mi = spark.read.json(path)
inf_rel = mi.select(mi.client_id,
                    mi.user_id,
                    mi.first_date,
                    F.hour(mi.first_date).alias('hour'),
                    mi.notes).rdd.map(foo).filter(lambda x: x)
inf_relevance = inf_rel.map(lambda l: Row(**dict(l))).toDF()
save_path = "gs://dataproc-bucket/json-files/output_5M.json"
inf_relevance.write.mode('append').json(save_path)
print("END!!")

Dataproc config: (I am using the free account for now, once I get working solution will add more cores and executors)

(Debian 9, Hadoop 2.9, Spark 2.4) Master node:2 vCPU, 7.50 GB memory Primary disk size: 32 GB 5 Worker nodes: 1 vCPU, 3.75 GB memory Primary disk type: 32 GB

After spark-submit I can see in web UI that 5 executors were added but then only 1 executor remains active and perform all task and rest 4 are released. enter image description here

I did my research and most of the questions talk about accessing data via JDBC.

Please suggest what I am missing here.

P.S. Eventually I would read 64 json files of 5 GB each, so might use 8 core * 100 workers.

2
How many partitions do you have on your dataframe? check df.rdd.getNumPartitions() ... I have not used this google dataproc thing however when reading data from jdbc it defaults to 1 partition because it is single thread. I bet your dataframe is 1 partition = 1 task = 1 core being used and in this case only on a single machine because nothing is being parallelized.thePurplePython
Thanks, I now increased the CPU count to 2 X 3 workers, and also repartitioned it to 10, but still no improvements. One more important thing I forgot to mention is that this json file is zipped file (.gz) format. Not sure if that is causing any issues.ramd
yes that is a problem ... gzip format is non splittable ... possible duplicate to a solution to your question here => stackoverflow.com/questions/40492967/…thePurplePython

2 Answers

1
votes

Your best bet is to preprocess the input. Given a single input file, spark.read.json(... will create a single task to read and parse the JSON data as Spark cannot know ahead of time how to parallelize it. If your data is in line-delimited JSON format (http://jsonlines.org/), the best course of action would be to split it into manageable chunks beforehand:

path = "gs://dataproc-bucket/json-files/data_5M.json"
# read monolithic JSON as text to avoid parsing, repartition and *then* parse JSON
mi = spark.read.json(spark.read.text(path).repartition(1000).rdd)
inf_rel = mi.select(mi.client_id,
                   mi.user_id,
                   mi.first_date,
                   F.hour(mi.first_date).alias('hour'),
                   mi.notes).rdd.map(foo).filter(lambda x: x)
inf_relevance = inf_rel.map(lambda l: Row(**dict(l))).toDF()
save_path = "gs://dataproc-bucket/json-files/output_5M.json"
inf_relevance.write.mode('append').json(save_path)
print("END!!")

Your initial step here (spark.read.text(...) will still bottleneck as a single task. If your data isn't line-delimited or (especially!) you anticipate you will need to work with this data more than once, you should figure out a way to turn your 5GB JSON file into 1000 5MB JSON files before getting Spark involved.

0
votes

.gz files are not splittable, so they're read by one core and placed onto a single partition.

see Dealing with a large gzipped file in Spark for reference.