0
votes

I did a join of two dataframes on one common column and then ran a show method:

    df= df1.join(df2, df1.col1== df2.col2, 'inner')
    df.show()

Then join ran very slow and finally raise an error: slave lost.

    Py4JJavaError: An error occurred while calling o109.showString.

    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 : ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Slave lost

Driver stacktrace: at

org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745)

After some search, it seems this is a memory related issue. Then I increased repartition to 3000, increased executor memory,increased memoryOverhead, but still no luck, I got the same slave lost error. During df.show(), I found one of the execuctor shuffle write size is very high, the others were not so high. Any clue? Thanks

1
Sounds like the data might be skewed - how many rows are in the two data frames respectively? Also what sort of instance types are you running on and how much memory do you allocate? And can you try to do a count instead of a show after the join? - Glennie Helles Sindholt
@GlennieHellesSindholt Yes the count went through. One dataframe is 100 times bigger than another one. The smaller df is around 6M. I'm using Spark 1.6 on EC2. - newleaf
I suspected so. I'm guessing that if you do a df= df1.join(df2, df1.col1== df2.col2, 'inner').persist(StorageLevel.MEMORY_AND_DISK) followed by a df.count followed by a df.show, it probably goes through as well, right? - Glennie Helles Sindholt
@GlennieHellesSindholt haven't try that yet. but why count went through and show did not? Is there any preprocessing for a better join? - newleaf
Well, I have not verified this, but I suspect that since show will need to collect data to the driver, it's probably collecting to a single partition. Now, as I'm sure you know, Spark splits processing into stages, but the number of partitions used in each stage seems to be defined by the number of partitions required at the stage boundary. So if you for instance read in a file, do a bunch of maps and filters (that won't trigger a stage boundary) and then write it back to disc in a single file, all you see that your map and filter operations will in fact be processed in a single partition. - Glennie Helles Sindholt

1 Answers

1
votes

If using scala try

val df = df1.join(df2,Seq("column name"))

if pyspark

df = df1.join(df2,["columnname"])

or

df = df1.join(df2,df1.columnname == df2.columnname)
display(df)

If trying to do same in pyspark - sql

df1.createOrReplaceTempView("left_test_table")
df2..createOrReplaceTempView("right_test_table")
left <- sql(sqlContext, "SELECT * FROM left_test_table")
right <- sql(sqlContext, "SELECT * FROM right_test_table")

head(drop(join(left, right), left$name))