1
votes

I have got a brand new install of spark 1.2.1 over a mapr cluster and while testing it I find that it works nice in local mode but in yarn modes it seems not to be able to access variables, neither if broadcasted. To be precise, the following test code

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object JustSpark extends App {
val conf = new org.apache.spark.SparkConf().setAppName("SimpleApplication")
val sc = new SparkContext(conf)
val a = List(1,3,4,5,6)
val b = List("a","b","c")
val bBC= sc.broadcast(b)
val data = sc.parallelize(a)
val transform = data map ( t => { "hi" })
transform.take(3) foreach (println _)
val transformx2 = data map ( t => { bBC.value.size })
transformx2.take(3) foreach (println _)
//val transform2 = data map ( t => { b.size })
//transform2.take(3) foreach (println _)
}

works in local mode but fails in yarn. More precisely, both methods, transform2 and transformx2, fail, and all of them work if --master local[8].

I am compiling it with sbt and sending with the submit tool

/opt/mapr/spark/spark-1.2.1/bin/spark-submit --class JustSpark --master yarn target/scala-2.10/simulator_2.10-1.0.jar

Any idea what is going on? The fail message just claims to have a java null pointer exception in the place where it should be accessing the variable. Is there other method to pass variables inside the RDD maps?

3
Define "fail"? that's crucial and you haven't said which line or what error.Sean Owen

3 Answers

2
votes

I'm going to take a pretty good guess: it's because you're using App. See https://issues.apache.org/jira/browse/SPARK-4170 for details. Write a main() method instead.

1
votes

I presume the culprit were

val transform2 = data map ( t => { b.size })

In particular the accessing the local variable b . You may actually see in your log files java.io.NotSerializableException .

What is supposed to happen: Spark will attempt to serialize any referenced object. That means in this case the entire JustSpark class - since one of its members is referenced.

Why did this fail? Your class is not Serializable. Therefore Spark is unable to send it over the wire. In particular you have a reference to SparkContext - which does not extend Serializable

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {

So - your first code - which does broadcast only the variable value - is the correct way.

0
votes

This is the original example of broadcast, from spark sources, altered to use lists instead of arrays:

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object MultiBroadcastTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Multi-Broadcast Test")
val sc = new SparkContext(sparkConf)
val slices = if (args.length > 0) args(0).toInt else 2
val num = if (args.length > 1) args(1).toInt else 1000000
val arr1 = (1 to num).toList
val arr2 = (1 to num).toList
val barr1 = sc.broadcast(arr1)
val barr2 = sc.broadcast(arr2)
val observedSizes: RDD[(Int, Int)] = sc.parallelize(1 to 10, slices).map { _ =>
  (barr1.value.size, barr2.value.size)
}
observedSizes.collect().foreach(i => println(i))
sc.stop()
}}

I compiled it in my environment and it works.

So what is the difference?

The problematic example uses extends App while the original example is a plain singleton.

So I demoted the code to a "doIt()" function

object JustDoSpark extends App{
def doIt() {
...
}
doIt()

and guess what. It worked.

Surely the problem is related to Serialization indeed, but in a different way. Having the code in the body of the object seems to cause problems.