2
votes

I get the error

ERROR org.apache.spark.executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.util.NoSuchElementException: None.get

when I run my Job using a Dataproc cluster, when I run it locally it runs perfectly. I have recreated the issue using the following toy example.

package com.deequ_unit_tests

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

object reduce_by_key_example {def main(args: Array[String]): Unit = {

  // Set the log level to only print errors
  Logger.getLogger("org").setLevel(Level.ERROR)

  val spark: SparkSession = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExamples.com")
    .getOrCreate()

  println("Step 1")
  val data = Seq(("Project", 1),
    ("Gutenberg’s", 1),
    ("Alice’s", 1),
    ("Adventures", 1),
    ("in", 1),
    ("Wonderland", 1),
    ("Project", 1),
    ("Gutenberg’s", 1),
    ("Adventures", 1),
    ("in", 1),
    ("Wonderland", 1),
    ("Project", 1),
    ("Gutenberg’s", 1))

  println("Step 2")
  val rdd = spark.sparkContext.parallelize(data)

  println("Step 3")
  val rdd2 = rdd.reduceByKey(_ + _)

  println("Step 4")
  rdd2.foreach(println)
  }
}

When I run this job in Dataproc, I get this error when executing the line

rdd2.foreach(println)

As additional information, I have to say that I wasn't receiving this error until some changes where applied in my company's Dataproc cluster. For colleagues using PySpark, with an equivalent version in Pyspark of the example above, changing

  sc = SparkContext('local')

to

  sc = SparkContext()

did the trick, but I couldn't find an equivalent solution in Spark Scala. Do you have any idea about what could be causing this issue? Any help is welcomed.

1
Don’t set master while creating the spark session , SparkSession.builder() .appName("SparkByExamples.com") .getOrCreate()itIsNaz
It works for the toy example (but not printing when rdd2.foreach(println)). Nevertheless not adding master in the real case I am working in, makes the process return this error before it would break with .master added: WARN org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, dataproc-managed-w-91.c.wf-gcp-us-ae-dataproc-prod.internal, executor 2): java.io.InvalidClassException: com.google.cloud.spark.bigquery.SparkBigQueryConfig; local class incompatible: stream classdesc serialVersionUID = 2964184825620630609, local class serialVersionUID = -3988734315685039601Rodrigo_V
By the way you can not print row of rdd without making transformations ( immutability of an rdd) , just check the different versions compatibly, I have also a question are you running this code from zeppelin notebook on VM dataproc or using a jar ?itIsNaz
The toy example is taken from here sparkbyexamples.com/apache-spark-rdd/…. Is anything missing? It runs perfectly locally and prints the desired output. Anyway, this example only serves to illustrate the issue I am facing.Rodrigo_V
well i don't understand what you mean by local (in your machine ) well in dataproc , would you share your code please !itIsNaz

1 Answers

2
votes
  1. Configure your pom.xml or your build.sbt as follow:

Add the provided scope in the script :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>stackOverFlowGcp</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.3</version>
            <scope>provided</scope>


        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.3</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.4.0</version>
            <scope>provided</scope>

        </dependency>


    </dependencies>


    <build>
        <plugins>
            <!-- Maven Plugin -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
            <!-- assembly Maven Plugin -->
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>mainPackage.mainObject</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

        </plugins>

    </build>


</project>

  1. Create your package : Clean => rebuild => compile => package
package mainPackage
import org.apache.spark.sql.SparkSession

object mainObject {


  def main(args: Array[String]): Unit = {


    val spark: SparkSession = SparkSession.builder()
      //.master("local[*]")
      .appName("SparkByExamples")
      .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    println("Step 1")
    val data = Seq(("Project", 1),
      ("Gutenberg’s", 1),
      ("Alice’s", 1),
      ("Adventures", 1),
      ("in", 1),
      ("Wonderland", 1),
      ("Project", 1),
      ("Gutenberg’s", 1),
      ("Adventures", 1),
      ("in", 1),
      ("Wonderland", 1),
      ("Project", 1),
      ("Gutenberg’s", 1))

    println("Step 2")
    val rdd = spark.sparkContext.parallelize(data)
    println("Step 3")
    val rdd2 = rdd.reduceByKey(_ + _)

    println("Step 4")
    rdd2.foreach(println)


  }
}
  1. Create your dataproc Cluster
  2. Run the spark job in dataproc

In dataproc you will not see the result as a mentionned before, if you want to know just read more about Dataproc approch on that. However you can show a dataframe in dataproc if you like.

enter image description here enter image description here enter image description here enter image description here

As you can see in dataproc every thing is working fine. Don't forget to shut down the cluster or delete it after finishing ;)