7
votes

I have below simple SparkR program, which is to create a SparkR DataFrame and retrieve/collect data from it.

Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn")
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
sc <- sparkR.init(master="yarn-client",sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40"))
hiveContext <- sparkRHive.init(sc)

n = 1000
x = data.frame(id = 1:n, val = rnorm(n))
xs <- createDataFrame(hiveContext, x)

xs

head(xs)
collect(xs)

I am able to create it and view information successfully, but any operation related to fetch data is throwing below error.

16/07/25 16:33:59 WARN TaskSetManager: Lost task 0.3 in stage 17.0 (TID 86, wlos06.nrm.minn.seagate.com): java.net.SocketTimeoutException: Accept timed out at java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) at java.net.ServerSocket.implAccept(ServerSocket.java:530) at java.net.ServerSocket.accept(ServerSocket.java:498) at org.apache.spark.api.r.RRDD$.createRWorker(RRDD.scala:432) at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

16/07/25 16:33:59 ERROR TaskSetManager: Task 0 in stage 17.0 failed 4 times; aborting job 16/07/25 16:33:59 ERROR RBackendHandler: dfToCols on org.apache.spark.sql.api.r.SQLUtils failed Error in invokeJava(isStatic = TRUE, className, methodName, ...) : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 17.0 failed 4 times, most recent failure: Lost task 0.3 in stage 17.0 (TID 86, wlos06.nrm.minn.seagate.com): java.net.SocketTimeoutException: Accept timed out at java.net.PlainSocketImpl.socketAccept(Native Method) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:398) at java.net.ServerSocket.implAccept(ServerSocket.java:530) at java.net.ServerSocket.accept(ServerSocket.java:498) at org.apache.spark.api.r.RRDD$.createRWorker(RRDD.scala:432) at org.apache.spark.api.r.BaseRRDD.compute(RRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPar

If I am executing it by sparkR command line like below, it's getting executed.

~/Downloads/spark-1.6.1-bin-hadoop2.6/bin/sparkR --master yarn-client

But when I am executing it via R, and sparkR.init((master="yarn-client"), it's throwing error.

Can someone please help resolving these errors?

1
I am having the same problem. How did you fix it?Fisseha Berhane

1 Answers

6
votes

Adding this line made the difference:

Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")

Here is the full code:

Sys.setenv(HADOOP_CONF_DIR = "/etc/hadoop/conf.cloudera.yarn")
Sys.setenv(SPARK_HOME = "/home/user/Downloads/spark-1.6.1-bin-hadoop2.6")
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)
Sys.setenv("SPARKR_SUBMIT_ARGS"="--master yarn-client sparkr-shell")
sc <- sparkR.init(sparkEnvir = list(spark.shuffle.service.enabled=TRUE,spark.dynamicAllocation.enabled=TRUE,spark.dynamicAllocation.initialExecutors="40"))
hiveContext <- sparkRHive.init(sc)

n = 1000
x = data.frame(id = 1:n, val = rnorm(n))
xs <- createDataFrame(hiveContext, x)

xs

head(xs)
collect(xs)