I just got introduced to this wonderful world of Big Data and Cloud technology, using GCP(dataproc) and pyspark. I have ~5 GB size JSON file(zipped, gz file) containing ~5 million records, I need to read each row and process only those rows which satisfies a certain condition. I have my working code and I issued a spark-submit with --num-partitions=5 but still only one worker is used to carry out the action.
This is the spark-submit command I am using:
spark-submit --num-executors 5 --py-files /home/user/code/dist/package-0.1-py3.6.egg job.py
job.py:
path = "gs://dataproc-bucket/json-files/data_5M.json.gz"
mi = spark.read.json(path)
inf_rel = mi.select(mi.client_id,
mi.user_id,
mi.first_date,
F.hour(mi.first_date).alias('hour'),
mi.notes).rdd.map(foo).filter(lambda x: x)
inf_relevance = inf_rel.map(lambda l: Row(**dict(l))).toDF()
save_path = "gs://dataproc-bucket/json-files/output_5M.json"
inf_relevance.write.mode('append').json(save_path)
print("END!!")
Dataproc config: (I am using the free account for now, once I get working solution will add more cores and executors)
(Debian 9, Hadoop 2.9, Spark 2.4) Master node:2 vCPU, 7.50 GB memory Primary disk size: 32 GB 5 Worker nodes: 1 vCPU, 3.75 GB memory Primary disk type: 32 GB
After spark-submit I can see in web UI that 5 executors were added but then only 1 executor remains active and perform all task and rest 4 are released.
I did my research and most of the questions talk about accessing data via JDBC.
Please suggest what I am missing here.
P.S. Eventually I would read 64 json files of 5 GB each, so might use 8 core * 100 workers.
df.rdd.getNumPartitions()
... I have not used this google dataproc thing however when reading data from jdbc it defaults to 1 partition because it is single thread. I bet yourdataframe
is 1 partition = 1 task = 1 core being used and in this case only on a single machine because nothing is being parallelized. – thePurplePython