2
votes

I have a global config Object in my spark app.

Object Config {
 var lambda = 0.01
}

and I will set the value of lambda according to user's input.

Object MyApp {
   def main(args: String[]) {
     Config.lambda = args(0).toDouble
     ...
     rdd.map(_ * Config.lambda)
   }
}

and I found that the modification does not take effect in executors. The value of lambda is always 0.01. I guess the modification in driver's jvm will not effect the executor's.

Do you have other solution ?

I found a similar question in stackoverflow :

how to set and get static variables from spark?

in @DanielL. 's answer, he gives three solutions:

  1. Put the value inside a closure to be serialized to the executors to perform a task.

But I wonder how to write the closure and how to serialized it to the executors, could any one give me some code example?

2.If the values are fixed or the configuration is available on the executor nodes (lives inside the jar, etc), then you can have a lazy val, guaranteeing initialization only once.

what if I declare the lambda as a lazy val variable? the modification in the driver will take effects in the executors? could you give me some code example?

3.Create a broadcast variable with the data. I know this way, but it also need a local Broadcast[] variable which wraps the Config Object right? for example:

val config = sc.broadcast(Config)

and use config.value.lambda in executors , right ?

1

1 Answers

4
votes
  1. Put the value inside a closure
object Config {var lambda = 0.01}
object SOTest {
  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf().setAppName("StaticVar"))
    val r = sc.parallelize(1 to 10, 3)
    Config.lambda = 0.02
    mul(r).collect.foreach(println)
    sc.stop()
  }
  def mul(rdd: RDD[Int]) = {
    val l = Config.lambda
    rdd.map(_ * l)
  }
}
  1. lazy val for only once initialisation
object SOTest {
  def main(args: Array[String]) {
    lazy val lambda = args(0).toDouble
    val sc = new SparkContext(new SparkConf().setAppName("StaticVar"))
    val r = sc.parallelize(1 to 10, 3)
    r.map(_ * lambda).collect.foreach(println)
    sc.stop()
  }
}
  1. Create a broadcast variable with the data
object Config {var lambda = 0.01}
object SOTest {
  def main(args: Array[String]) {
    val sc = new SparkContext(new SparkConf().setAppName("StaticVar"))
    val r = sc.parallelize(1 to 10, 3)

    Config.lambda = 0.04
    val bc = sc.broadcast(Config.lambda)
    r.map(_ * bc.value).collect.foreach(println)

    sc.stop()
  }
}

Note: You shouldn't pass in the Config Object into sc.broadcast() directly, it would serialise your Config before transfer it to executors, however, your Config is not serialisable. Another thing to mention here: Broadcast variable do not fit well for your situation here, because you are only sharing a single value.