3
votes

I am new to spark and scala and I am having a hard time to submit a Spark job as YARN client. Doing this via the spark shell (spark submit) is no problem same goes for: first creating a spark job in eclipse then compile it into jar and use spark submit via the kernel shell, like:

 spark-submit --class ebicus.WordCount /u01/stage/mvn_test-0.0.1.jar

However using Eclipse to directly compile and submit it to YARN seems difficult.

My project setup is the following: My cluster is running CDH cloudera 5.6. I have a maven project, using scala,My classpath / which is in sinc with my pom.xml

My code is as follows:

package test

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.TaskContext;
import akka.actor
import org.apache.spark.deploy.yarn.ClientArguments
import org.apache.spark.deploy.ClientArguments

object WordCount {

  def main(args: Array[String]): Unit = {
//    val workaround = new File(".");
    System.getProperties().put("hadoop.home.dir",  "c:\\winutil\\");
    System.setProperty("SPARK_YARN_MODE", "true");

   val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("yarn-client")
      .set("spark.hadoop.fs.defaultFS", "hdfs://namecluster.com:8020/user/username")
      .set("spark.hadoop.dfs.nameservices", "namecluster.com:8020")
      .set("spark.hadoop.yarn.resourcemanager.hostname", "namecluster.com")
      .set("spark.hadoop.yarn.resourcemanager.address", "namecluster:8032")
      .set("spark.hadoop.yarn.application.classpath",
              "/etc/hadoop/conf,"
          +"/usr/lib/hadoop/*,"
          +"/usr/lib/hadoop/lib/*,"
          +"/usr/lib/hadoop-hdfs/*,"
          +"/usr/lib/hadoop-hdfs/lib/*,"
          +"/usr/lib/hadoop-mapreduce/*,"
          +"/usr/lib/hadoop-mapreduce/lib/*,"
          +"/usr/lib/hadoop-yarn/*,"
          +"/usr/lib/hadoop-yarn/lib/*,"
          +"/usr/lib/spark/*,"
          +"/usr/lib/spark/lib/*,"
          +"/usr/lib/spark/lib/*"
      )
      .set("spark.driver.host","localhost");

    val sc = new SparkContext(conf);

    val file = sc.textFile("hdfs://namecluster.com:8020/user/root/testdir/test.csv")
    //Count number of words from a hive table (split is based on char 001)
    val counts = file.flatMap(line => line.split(1.toChar)).map(word => (word, 1)).reduceByKey(_ + _)

    //swap key and value with count value and sort from high to low 
    val test = counts.map(_.swap).sortBy(word =>(word,1), false, 5)

    test.saveAsTextFile("hdfs://namecluster.com:8020/user/root/test1")

  }

}

I'm receiving the next error message on the log files of the hadoop resource manager

YARN executor launch context:
  env:
    CLASSPATH -> {{PWD}}<CPS>{{PWD}}/__spark__.jar<CPS>/etc/hadoop/conf<CPS>/usr/lib/hadoop/*<CPS>/usr/lib/hadoop/lib/*<CPS>/usr/lib/hadoop-hdfs/*<CPS>/usr/lib/hadoop-hdfs/lib/*<CPS>/usr/lib/hadoop-mapreduce/*<CPS>/usr/lib/hadoop-mapreduce/lib/*<CPS>/usr/lib/hadoop-yarn/*<CPS>/usr/lib/hadoop-yarn/lib/*<CPS>/usr/lib/spark/*<CPS>/usr/lib/spark/lib/*<CPS>/usr/lib/spark/lib/*<CPS>$HADOOP_MAPRED_HOME/*<CPS>$HADOOP_MAPRED_HOME/lib/*<CPS>$MR2_CLASSPATH
    SPARK_LOG_URL_STDERR -> http://cloudera-002.fusion.ebicus.com:8042/node/containerlogs/container_1461679867178_0026_01_000005/hadriaans/stderr?start=-4096
    SPARK_YARN_STAGING_DIR -> .sparkStaging/application_1461679867178_0026
    SPARK_YARN_CACHE_FILES_FILE_SIZES -> 520473
    SPARK_USER -> hadriaans
    SPARK_YARN_CACHE_FILES_VISIBILITIES -> PRIVATE
    SPARK_YARN_MODE -> true
    SPARK_YARN_CACHE_FILES_TIME_STAMPS -> 1462288779267
    SPARK_LOG_URL_STDOUT -> http://cloudera-002.fusion.ebicus.com:8042/node/containerlogs/container_1461679867178_0026_01_000005/hadriaans/stdout?start=-4096
    SPARK_YARN_CACHE_FILES -> hdfs://cloudera-003.fusion.ebicus.com:8020/user/hadriaans/.sparkStaging/application_1461679867178_0026/spark-yarn_2.10-1.5.0.jar#__spark__.jar

  command:
    {{JAVA_HOME}}/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms1024m -Xmx1024m -Djava.io.tmpdir={{PWD}}/tmp '-Dspark.driver.port=49961' -Dspark.yarn.app.container.log.dir=<LOG_DIR> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url akka.tcp://[email protected]:49961/user/CoarseGrainedScheduler --executor-id 4 --hostname cloudera-002.fusion.ebicus.com --cores 1 --app-id application_1461679867178_0026 --user-class-path file:$PWD/__app__.jar 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr
===============================================================================

16/05/03 17:19:58 INFO impl.ContainerManagementProtocolProxy: Opening proxy : cloudera-002.fusion.ebicus.com:8041
16/05/03 17:20:01 INFO yarn.YarnAllocator: Completed container container_1461679867178_0026_01_000005 (state: COMPLETE, exit status: 1)
16/05/03 17:20:01 INFO yarn.YarnAllocator: Container marked as failed: container_1461679867178_0026_01_000005. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1461679867178_0026_01_000005
Exit code: 1
Stack trace: ExitCodeException exitCode=1: 
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:561)
    at org.apache.hadoop.util.Shell.run(Shell.java:478)
    at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:738)
    at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
    at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Any tips or advises are welcome.

1
The problem is eclipse doesn't send your jar to Yarn. This is done in SparkSubmit which is called by bin/spark-submit.zsxwing

1 Answers

2
votes

From my previous experience there are two possible scenarios which might be causing this not very descriptive error (I am submitting jobs from Eclipse, but using Java)

  1. I noticed that you are not passing the JAR to the configuration of the SparkContext. If I remove the line which passed the JAR when submitting from Eclipse my code fails with exactly the same error. So basically you setup the path to the not-yet existing JAR into your code, then you export your project as Runnable JAR, which would package all the transitive dependencies into it, to the path you have previously set in your code. This is how it looks in Java:

    SparkConf sparkConfiguration = new SparkConf();
    sparkConfiguration.setJars(new String[] { "path to your jar" });

  2. Check if your cluster is healthy, you might have your tmp directories full. Check all hadoop logging files, some of them (sorry cannot remember which ones) are giving more details (some warnings) when this happens.