1
votes

i'm trying to use the library akka-kryo-serializer.

I manage to make it work with a string as a test but then when I use the same code to deal with Map, carefully following instruction of the website i keep having the same errors:

Error1: I follow the instruction of the website and write:

package entellect.spike.Kryo

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}


object KryoSpike extends App {

  val kryo = new Kryo()
  kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaMapSerializer])
  kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaMapSerializer])

  val testin = Map("id" -> "objID", "field1" -> "field1Value")

  val outStream = new ByteArrayOutputStream()
  val output = new Output(outStream, 4096)
  kryo.writeClassAndObject(output, testin)
  output.flush()


  val input = new Input(new ByteArrayInputStream(outStream.toByteArray), 4096)
  val testout = kryo.readObject(input, classOf[Map[String,String]])

  println(testout.toString)

}

This code does not compile because of the following two lines taken from the Website:

kryo.addDefaultSerializer(classOf[scala.collection.Map[,]], classOf[ScalaMapSerializer]) kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaMapSerializer])

I try to follow what is in the test of the website. For testing map it uses:

kryo.setRegistrationRequired(true) kryo.addDefaultSerializer(classOf[scala.collection.Map[_, _]], classOf[ScalaImmutableMapSerializer]) kryo.register(classOf[scala.collection.immutable.HashMap$HashTrieMap], 40)

The following line does not compile because the compile does not find "HashMap$HashTrieMap"

classOf[ScalaImmutableMapSerializer])
    kryo.register(classOf[scala.collection.immutable.HashMap$HashTrieMap],

40)

Finally my example look as such:

package entellect.spike.Kryo

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.{Input, Output} import com.romix.scala.serialization.kryo.ScalaImmutableMapSerializer

object KryoSpike extends App {

  val kryo = new Kryo()
  kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaImmutableMapSerializer])
  kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaImmutableMapSerializer])

  val testin = Map("id" -> "objID", "field1" -> "field1Value")

  val outStream = new ByteArrayOutputStream()
  val output = new Output(outStream, 4096)
  kryo.writeClassAndObject(output, testin)
  output.flush()


  val input = new Input(new ByteArrayInputStream(outStream.toByteArray), 4096)
  val testout = kryo.readObject(input, classOf[Map[String,String]])

  println(testout.toString)

}

But then i get the following error:

Exception in thread "main" com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): scala.collection.immutable.Map at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1319) at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1127) at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1136) at com.romix.scala.serialization.kryo.ScalaImmutableMapSerializer.read(ScalaMapSerializers.scala:75) at com.romix.scala.serialization.kryo.ScalaImmutableMapSerializer.read(ScalaMapSerializers.scala:69) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:709) at entellect.spike.Kryo.KryoSpike$.delayedEndpoint$entellect$spike$Kryo$KryoSpike$1(KryoSpike.scala:25) at entellect.spike.Kryo.KryoSpike$delayedInit$body.apply(KryoSpike.scala:10) at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76) at entellect.spike.Kryo.KryoSpike$.main(KryoSpike.scala:10) at entellect.spike.Kryo.KryoSpike.main(KryoSpike.scala)

EDIT1:

My dependency

  "org.apache.spark" % "spark-core_2.11" % "2.3.1",
  "org.apache.spark" % "spark-sql_2.11" % "2.3.1",
  "com.typesafe.akka" %% "akka-stream" % "2.5.16",
  "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.4",
  "com.typesafe.akka" %% "akka-stream-kafka" % "0.22",
  "com.github.romix.akka" %% "akka-kryo-serialization" % "0.5.0"

Note i am not using Kryo specific Akka feature, i am using it as a generic serialization framework. Same thing with spark. No direct plug into spark or akka config.

1

1 Answers

1
votes

Solution use

ScalaImmutableAbstractMapSerializer with Map

and writeObject && readObject method together.

package entellect.spike.Kryo

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
import com.romix.scala.serialization.kryo._

object KryoSpike extends App {


  val kryo = new Kryo()
  kryo.setRegistrationRequired(false)
  kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaImmutableAbstractMapSerializer])
  kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaImmutableAbstractMapSerializer])

  val testin = Map("id" -> "objID", "field1" -> "field1Value")

  val outStream = new ByteArrayOutputStream()
  val output = new Output(outStream, 4096)
  kryo.writeObject(output, testin)
  output.flush()


  val input = new Input(new ByteArrayInputStream(outStream.toByteArray), 4096)
  val testout = kryo.readObject(input, classOf[scala.collection.Map[_,_]])

  println(testout.toString)

}