i'm trying to use the library akka-kryo-serializer.
I manage to make it work with a string as a test but then when I use the same code to deal with Map, carefully following instruction of the website i keep having the same errors:
Error1: I follow the instruction of the website and write:
package entellect.spike.Kryo
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
object KryoSpike extends App {
val kryo = new Kryo()
kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaMapSerializer])
kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaMapSerializer])
val testin = Map("id" -> "objID", "field1" -> "field1Value")
val outStream = new ByteArrayOutputStream()
val output = new Output(outStream, 4096)
kryo.writeClassAndObject(output, testin)
output.flush()
val input = new Input(new ByteArrayInputStream(outStream.toByteArray), 4096)
val testout = kryo.readObject(input, classOf[Map[String,String]])
println(testout.toString)
}
This code does not compile because of the following two lines taken from the Website:
kryo.addDefaultSerializer(classOf[scala.collection.Map[,]], classOf[ScalaMapSerializer]) kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaMapSerializer])
I try to follow what is in the test of the website. For testing map it uses:
kryo.setRegistrationRequired(true) kryo.addDefaultSerializer(classOf[scala.collection.Map[_, _]], classOf[ScalaImmutableMapSerializer]) kryo.register(classOf[scala.collection.immutable.HashMap$HashTrieMap], 40)
The following line does not compile because the compile does not find "HashMap$HashTrieMap"
classOf[ScalaImmutableMapSerializer]) kryo.register(classOf[scala.collection.immutable.HashMap$HashTrieMap],
40)
Finally my example look as such:
package entellect.spike.Kryo
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.{Input, Output} import com.romix.scala.serialization.kryo.ScalaImmutableMapSerializer
object KryoSpike extends App {
val kryo = new Kryo()
kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaImmutableMapSerializer])
kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaImmutableMapSerializer])
val testin = Map("id" -> "objID", "field1" -> "field1Value")
val outStream = new ByteArrayOutputStream()
val output = new Output(outStream, 4096)
kryo.writeClassAndObject(output, testin)
output.flush()
val input = new Input(new ByteArrayInputStream(outStream.toByteArray), 4096)
val testout = kryo.readObject(input, classOf[Map[String,String]])
println(testout.toString)
}
But then i get the following error:
Exception in thread "main" com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): scala.collection.immutable.Map at com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1319) at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1127) at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1136) at com.romix.scala.serialization.kryo.ScalaImmutableMapSerializer.read(ScalaMapSerializers.scala:75) at com.romix.scala.serialization.kryo.ScalaImmutableMapSerializer.read(ScalaMapSerializers.scala:69) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:709) at entellect.spike.Kryo.KryoSpike$.delayedEndpoint$entellect$spike$Kryo$KryoSpike$1(KryoSpike.scala:25) at entellect.spike.Kryo.KryoSpike$delayedInit$body.apply(KryoSpike.scala:10) at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.App$class.main(App.scala:76) at entellect.spike.Kryo.KryoSpike$.main(KryoSpike.scala:10) at entellect.spike.Kryo.KryoSpike.main(KryoSpike.scala)
EDIT1:
My dependency
"org.apache.spark" % "spark-core_2.11" % "2.3.1",
"org.apache.spark" % "spark-sql_2.11" % "2.3.1",
"com.typesafe.akka" %% "akka-stream" % "2.5.16",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.4",
"com.typesafe.akka" %% "akka-stream-kafka" % "0.22",
"com.github.romix.akka" %% "akka-kryo-serialization" % "0.5.0"
Note i am not using Kryo specific Akka feature, i am using it as a generic serialization framework. Same thing with spark. No direct plug into spark or akka config.