I'm struggling creating a generic AvroSerde in Scala. I will be using this serde in combination with Flink therefore this serde should also be serializable itself. Avro doesn't have any native support for Scala, however there are some libraries which enable conversion from case classes to generic records using shapeless. Note: this generic serializer will only be instantiated with case classes.
Firstly, I tried to implement this serde using Avro4s. I got this compiled pretty easily by ensuring that the generic type was context bound to FromRecord
and RecordFrom
, however both FromRecord
and RecordFrom
aren't serializable therefore I can't use this serde in Flink.
Currently, I'm trying a different library shapeless-datatype which also uses shapeless. My current code looks like this:
class Serializer[T : TypeTag : ClassTag] {
//Get type of the class at run time
val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]
//Get Avro Type
val avroType = AvroType[T]
def serialize(value : T) : Array[Byte] = {
var schema: Schema = null
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
} else {
schema = ReflectData.get().getSchema(inputClassType)
}
val out: ByteArrayOutputStream = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
var writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)
val genericRecord = avroType.toGenericRecord(value)
writer.write(genericRecord, encoder)
encoder.flush()
out.close()
out.toByteArray
}
def deserialize(message: Array[Byte]) : T = {
var schema: Schema = null
if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
} else {
schema = ReflectData.get().getSchema(inputClassType)
}
val datumReader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get().binaryDecoder(message, null)
avroType.fromGenericRecord(datumReader.read(null, decoder)).get
}
}
So basically I create an AvroType[T]
which has two methods fromGenericRecord
and toGenericRecord
(source). Those methods require some implicits: LabelledGeneric.Aux[A, L]
, ToAvroRecord[L]
, tt: TypeTag[A]
and fromL: FromAvroRecord[L]
.
Currently this code gives compile errors due to missing those implicits:
Error:(48, 51) could not find implicit value for parameter gen: shapeless.LabelledGeneric.Aux[T,L]
val genericRecord = avroType.toGenericRecord(value)
Simply overloading the implicits from the toGenericRecord
and fromGenericRecord
methods doesn't solve it because then I need to parameterize serialize[L <: Hlist]
and deserialize[L <: Hlist]
which I can't do because Flink doesn't allow these methods to have types.
I have little experience with both shapeless and implicits to understand which context bounds I need to solve this, while also keeping this class serializable.
Hope someone can help or point me to some useful resources.
Thanks, Wouter
EDIT
I can't pass implicits through the methods nor make them parametrized, since I need to base the serde on serialization interfaces of Flink, which forces me to override: byte[] serialize(T element)
and T deserialize(byte[] message)
If I try to pass the implicit to the class itself, I would need to change it to:
class Serializer[T : TypeTag : ClassTag, L <: HList](implicit gen: LabelledGeneric.Aux[T, L], toL: ToAvroRecord[L], fromL: FromAvroRecord[L])
but then if I instantiate it like this:
case class Test(str: String)
val serializer = new Serializer[Test]
I get this compile error:
Error:(29, 26) wrong number of type arguments for shapeless.datatype.avro.Serializer, should be 2
val serializer = new Serializer[Test]
def serialize(value : T)(implicit gen: LabelledGeneric.Aux[T, L], toL: ToAvroRecord[L]) : Array[Byte] = ...
def deserialize(message: Array[Byte])(implicit gen: LabelledGeneric.Aux[T, L], fromL: FromAvroRecord[L]) : T = ...
? If you can't parameterize your methods can you moveL
toclass Serializer[T : TypeTag : ClassTag] { type L <: HList ...
orclass Serializer[T : TypeTag : ClassTag, L <: HList] { ...
? Could you provide more details (with code) why you can't parametrize the methods? – Dmytro Mitin