I face a weird behavior when using broadcast variables. Each time I use a broadcasted variable, the content gets copied once for each node and is never reused.
Here is an example in spark-shell --master local[32]: (Granted, it's useless and stupid code but it does show the behavior)
case class Test(a:String)
val test = Test("123")
val bc = sc.broadcast(test)
// On my 32 core machine, I get 33 copies of Test (expected)
// Yourkit profiler shows 33 instances of my object (32 are unreachable)
sc.parallelize((1 to 100)).map(x => bc.value.a).count
// Doing it again, Test copies are not reused and serialized again (now 65 copies, 64 are unreachable)
sc.parallelize((1 to 100)).map(x => bc.value.a).count
In my case, the variable I broadcast is several hundred megabytes and is composed of millions of small objects (few hashmaps and vectors).
Each time I run an operation on a RDD that use it, I get several gigabytes of memory wasted and the garbage collector is getting more and more of a bottleneck!
Is it by design to re-broadcast variables for each execution of a new closure or is it a bug and I should be reusing my copies?
Why are they unreachable immediatly after use?
It is particular to a spark-shell in local mode?
Note: I'm using spark-1.3.1-hadoop2.6
Update1: According to this post: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-td11048.html Singleton objects does not work anymore on Spark 1.2.x+ So this kind of workaround would not work either:
val bcModel = sc.broadcast(bigModel)
object ModelCache {
@transient lazy private val localModel = { bcModel.value }
def getModel = localModel
}
sc.parallelize((1 to 100)).map(x => ModelCache.getModel.someValue)
Update2: I've also tried to reuse the accumulator pattern without success:
class VoidAccumulatorParams extends AccumulatorParam[BigModel] {
override def addInPlace(r1: BigModel, r2: BigModel): BigModel= { r1 }
override def zero(initialValue: BigModel): BigModel= { initialValue }
}
val acc = sc.accumulator(bigModel, "bigModel")(new VoidAccumulableParams())
sc.parallelize((1 to 100)).map(x => acc.localValue.someValue)
Update3: Looks like the singleton object works when running a job with spark-submit instead of the scala shell.