0
votes

I created a Dataproc cluster with 1 master and 10 nodes. All have the same CPU and memory configuration: 32 vCPU, 120 GB memory. When I submitted a job that handles big amount of data and calculation. The job failed.

From the logging, I am not really sure about what caused the failure. But I saw the memory related error message from tJob#: job-c46fc848-6: Container killed by YARN for exceeding memory limits. 24.1 GB of 24 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

So I tried a few solutions I found from other posts. For example, I tried to increase spark.executor.memoryOverhead and spark.driver.maxResultSize in the "Properties" section when submitting a job from the "Jobs" console. The job# find-duplicate-job-c46fc848-7 still failed.

I was also seeing warning messages and not really sure what it means: 18/06/04 17:13:25 WARN org.apache.spark.storage.BlockManagerMasterEndpoint: No more replicas available for rdd_43_155 !

I am going to try to create a higher level cluster to see if it works. But I doubt it would solve the issue since cluster with 1 master and 10 nodes with 32 vCPU, 120 GB memory are already very powerful.

Hope to get some help from advanced users and experts. Thanks in advance!

1
I have no exprience in google dataproc. but If you can access spark web ui, see that. And trying to figure out that data skew in spark web ui occurs by your query might help. And If there is data skew, repartition for more partition would help. and if you include your query in question, you might more likely to get help. - ruseel
@ruseel I was able to solve the problem. It was partially related to the data skew. Thanks for your suggestion, I looked at UI to see the CPU and memory. - Bin Lin

1 Answers

1
votes

The root cause of the failure was related to memory caused by the self cross join. It was still failing even I kept increasing the CPU power and memories. So the solution of this is combination of the following.

  1. Use the repartition() function to repartition after the join, before the next transformation. This will fix the data skew issue. Ex: df_joined = df_joined.repartition(partitions)
  2. Broadcast right table.
  3. Break it into 10 iterations. In each iteration, I only process 1/10 of the left table joined with the full data of the right table.

See sample code:

groups = 10 <br/>
for x in range(0, groups): 
  df_joined = df1.join(broadcast(df2), (df1.authors == df2.authors)).where((col("df1.content_id") % groups == x)) 

With the above 3 methods combined, I was able to finish the job in 1.5 hour and only used 1 master and 4 worker nodes (8 CPU and 30 GB for each vm).