0
votes

Trying to make a Spark broadcast BiMap that is composed of two Maps. Since the mappings are unique from either direction all that should be serialized is a single Map, actually only the a Seq[(K, V)] needs to be serialized. So the underlying forward Map's elements only. In the deserialization we can recreate the inverse Map and indexes.

Here is the proposed design:

class BiMap[K, V] (
    private val m: Map[K, V],
    // if this is serialized we allow i to be discarded and recalculated when deserialized
    @transient private var i: Option[BiMap[V, K]] = None
  ) extends Serializable {

  // NOTE: make inverse's inverse point back to current BiMap
  // if this is serialized we allow inverse to be discarded and recalculated
  // when first invoked from "val size_" in the constructor
  @transient lazy val inverse: BiMap[V, K] = {
    if( i == null.asInstanceOf[Option[BiMap[V, K]]] )
      i = None
    i.getOrElse {
      val rev = m.map(_.swap)
      require((rev.size == m.size), "Failed to create reversed map. Cannot have duplicated values.")
      new BiMap(rev, Some(this))
    }
  }

  // forces inverse to be calculated in the constructor when deserialized
  // not when first used
  @transient val size_ = inverse.size
  ...
}

While this seems to work, I can't figure why I have to check i for null, but it is null after deserialization. Originally it was a val that had a default initialization = None.

Only m should be serialized so the inverse is @transient lazy and there is another @transient val size_ = inverse.size that is meant to cause the inverse to be evaluated when deserialized (instead of when a task invokes inverse). This last bit is to make sure the inverse is shared and not recreated by each task.

While this seems to work it's a bit ugly and I'm still not sure of a few things:

  1. is all storage for the instance allocated in the broadcast variable, not in task heap space?
  2. Why does i need to be a var and checked for null when it should never be null?
  3. Most importantly does this cause the inverse to be discarded at broadcast time and recreated in the deserialize?

I understand that I need to register this with Kryo and ultimately to implement KryoSerializable to finely control the serialization.

1

1 Answers

0
votes

I can answer only few of your questions.

First of all, setting Kryo as default serializer is not enough. Your data is being serialized according to the Java serialization framework which is far less efficient than Kryo. If you want Kryo to be used, you need to set also the "spark.kryo.registrator" property to the full class name of your registrator class, which must implement KryoRegistrator. This means having a method which you can easily implement like this (this is Java code, but I believe you'll have no issue in porting it to Scala):

public void registerClasses(Kryo k) {
    k.register(BiMap.class);

}

If you want, with Kryo you can manage the serializations by making your class implementing KryoSerializable. For further details, please read the documentation.

If you want to check what is being sent over the network, you may write the result of your serializations to a file...