Trying to make a Spark broadcast BiMap that is composed of two Maps. Since the mappings are unique from either direction all that should be serialized is a single Map, actually only the a Seq[(K, V)] needs to be serialized. So the underlying forward Map's elements only. In the deserialization we can recreate the inverse Map and indexes.
Here is the proposed design:
class BiMap[K, V] (
private val m: Map[K, V],
// if this is serialized we allow i to be discarded and recalculated when deserialized
@transient private var i: Option[BiMap[V, K]] = None
) extends Serializable {
// NOTE: make inverse's inverse point back to current BiMap
// if this is serialized we allow inverse to be discarded and recalculated
// when first invoked from "val size_" in the constructor
@transient lazy val inverse: BiMap[V, K] = {
if( i == null.asInstanceOf[Option[BiMap[V, K]]] )
i = None
i.getOrElse {
val rev = m.map(_.swap)
require((rev.size == m.size), "Failed to create reversed map. Cannot have duplicated values.")
new BiMap(rev, Some(this))
}
}
// forces inverse to be calculated in the constructor when deserialized
// not when first used
@transient val size_ = inverse.size
...
}
While this seems to work, I can't figure why I have to check i
for null, but it is null after deserialization. Originally it was a val that had a default initialization = None.
Only m should be serialized so the inverse is @transient lazy
and there is another @transient val size_ = inverse.size
that is meant to cause the inverse to be evaluated when deserialized (instead of when a task invokes inverse
). This last bit is to make sure the inverse is shared and not recreated by each task.
While this seems to work it's a bit ugly and I'm still not sure of a few things:
- is all storage for the instance allocated in the broadcast variable, not in task heap space?
- Why does
i
need to be a var and checked for null when it should never be null? - Most importantly does this cause the inverse to be discarded at broadcast time and recreated in the deserialize?
I understand that I need to register this with Kryo and ultimately to implement KryoSerializable to finely control the serialization.