4
votes

I face a weird behavior when using broadcast variables. Each time I use a broadcasted variable, the content gets copied once for each node and is never reused.

Here is an example in spark-shell --master local[32]: (Granted, it's useless and stupid code but it does show the behavior)

case class Test(a:String)
val test = Test("123")
val bc = sc.broadcast(test)

// On my 32 core machine, I get 33 copies of Test (expected)
// Yourkit profiler shows 33 instances of my object (32 are unreachable)
sc.parallelize((1 to 100)).map(x => bc.value.a).count

// Doing it again, Test copies are not reused and serialized again (now 65 copies, 64 are unreachable)
sc.parallelize((1 to 100)).map(x => bc.value.a).count

In my case, the variable I broadcast is several hundred megabytes and is composed of millions of small objects (few hashmaps and vectors).

Each time I run an operation on a RDD that use it, I get several gigabytes of memory wasted and the garbage collector is getting more and more of a bottleneck!

Is it by design to re-broadcast variables for each execution of a new closure or is it a bug and I should be reusing my copies?

Why are they unreachable immediatly after use?

It is particular to a spark-shell in local mode?

Note: I'm using spark-1.3.1-hadoop2.6

Update1: According to this post: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-td11048.html Singleton objects does not work anymore on Spark 1.2.x+ So this kind of workaround would not work either:

val bcModel = sc.broadcast(bigModel)
object ModelCache {
  @transient lazy private val localModel = { bcModel.value }
  def getModel = localModel
}
sc.parallelize((1 to 100)).map(x => ModelCache.getModel.someValue)

Update2: I've also tried to reuse the accumulator pattern without success:

class VoidAccumulatorParams extends AccumulatorParam[BigModel] {
    override def addInPlace(r1: BigModel, r2: BigModel): BigModel= { r1 }
    override def zero(initialValue: BigModel): BigModel= { initialValue }
}
val acc = sc.accumulator(bigModel, "bigModel")(new VoidAccumulableParams())
sc.parallelize((1 to 100)).map(x => acc.localValue.someValue)

Update3: Looks like the singleton object works when running a job with spark-submit instead of the scala shell.

1

1 Answers

1
votes

Take a look at the broadcast tests (BroadcastSuite.scala) to see what to expect.

When you run the first job, your object is serialized, cut into chunks and the chunks are sent to the executors (through the BlockManager mechanism). They deserialize the object from the chunks and use it in processing the tasks. When they finish, they discard the deserialized object, but the BlockManager keeps the chunks of serialized data cached.

For the second job the object does not need to be serialized and transmitted. It's just deserialized from the cache and used.


Caveats: For one, this does not help you avoid the excessive GC. The other thing is, I tried to validate the above theory by using mutable state (class Test(var a: String) extends Serializable) and mutating it between runs. To my surprise the second run saw the mutated state! So I'm either completely wrong, or just wrong in the local mode. I hope someone can tell which. (I'll try to test this further myself if I remember tomorrow.)