In cloud Datafusion I am using a joiner transform to join two tables.
One of them is a large table with about 87M Joins, while the other is a smaller table with only ~250 records. I am using 200 partitions in the joiner.
This causes the following failure:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 50 in stage 7.0 failed 4 times, most recent failure: Lost task 50.3 in stage 7.0 (TID xxx, cluster_workerx.c.project.internal, executor 6): ExecutorLostFailure (executor 6 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 133355 ms java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.spark.SparkException: Application application_xxxxx finished with failed status
On a closer look into the spark UI the 200 tasks for the Join, nearly 80% of the 87m records go into one task O/P which fails with the heartbeat error, while the succeeded tasks has very few record O/P ~<10k records
Seems like spark performs a shuffle hash Join, is there a way in datafusion/cdap where we can force a broadcast join since one of my table is very small? Or can i make come configuration changes to the cluster config to make this join work?
What are the performance tuning i can make in the data fusion pipeline. I didnt find any reference to the configuration, tuning in the Datafusion documentation