1
votes

I'm struggling creating a generic AvroSerde in Scala. I will be using this serde in combination with Flink therefore this serde should also be serializable itself. Avro doesn't have any native support for Scala, however there are some libraries which enable conversion from case classes to generic records using shapeless. Note: this generic serializer will only be instantiated with case classes.

Firstly, I tried to implement this serde using Avro4s. I got this compiled pretty easily by ensuring that the generic type was context bound to FromRecord and RecordFrom, however both FromRecord and RecordFrom aren't serializable therefore I can't use this serde in Flink.

Currently, I'm trying a different library shapeless-datatype which also uses shapeless. My current code looks like this:

class Serializer[T : TypeTag : ClassTag] {

  //Get type of the class at run time
  val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]

  //Get Avro Type
  val avroType = AvroType[T]

  def serialize(value : T) : Array[Byte] = {
    var schema: Schema = null

    if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
      schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
    } else {
      schema = ReflectData.get().getSchema(inputClassType)
    }

    val out: ByteArrayOutputStream = new ByteArrayOutputStream()
    val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
    var writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)

    val genericRecord = avroType.toGenericRecord(value)

    writer.write(genericRecord, encoder)
    encoder.flush()
    out.close()

    out.toByteArray
  }

  def deserialize(message: Array[Byte]) : T = {
    var schema: Schema = null

    if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType)) {
      schema = inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
    } else {
      schema = ReflectData.get().getSchema(inputClassType)
    }

    val datumReader = new GenericDatumReader[GenericRecord](schema)
    val decoder = DecoderFactory.get().binaryDecoder(message, null)

    avroType.fromGenericRecord(datumReader.read(null, decoder)).get
  }


}

So basically I create an AvroType[T] which has two methods fromGenericRecord and toGenericRecord (source). Those methods require some implicits: LabelledGeneric.Aux[A, L], ToAvroRecord[L], tt: TypeTag[A] and fromL: FromAvroRecord[L].

Currently this code gives compile errors due to missing those implicits:

Error:(48, 51) could not find implicit value for parameter gen: shapeless.LabelledGeneric.Aux[T,L]
  val genericRecord = avroType.toGenericRecord(value)

Simply overloading the implicits from the toGenericRecord and fromGenericRecord methods doesn't solve it because then I need to parameterize serialize[L <: Hlist] and deserialize[L <: Hlist] which I can't do because Flink doesn't allow these methods to have types.

I have little experience with both shapeless and implicits to understand which context bounds I need to solve this, while also keeping this class serializable.

Hope someone can help or point me to some useful resources.

Thanks, Wouter

EDIT

I can't pass implicits through the methods nor make them parametrized, since I need to base the serde on serialization interfaces of Flink, which forces me to override: byte[] serialize(T element) and T deserialize(byte[] message)

If I try to pass the implicit to the class itself, I would need to change it to:

class Serializer[T : TypeTag : ClassTag, L <: HList](implicit gen: LabelledGeneric.Aux[T, L], toL: ToAvroRecord[L], fromL: FromAvroRecord[L])

but then if I instantiate it like this:

case class Test(str: String)
val serializer = new Serializer[Test]

I get this compile error:

Error:(29, 26) wrong number of type arguments for shapeless.datatype.avro.Serializer, should be 2
val serializer = new Serializer[Test]
1
Can you add implicit parameters to your methods? def serialize(value : T)(implicit gen: LabelledGeneric.Aux[T, L], toL: ToAvroRecord[L]) : Array[Byte] = ... def deserialize(message: Array[Byte])(implicit gen: LabelledGeneric.Aux[T, L], fromL: FromAvroRecord[L]) : T = ...? If you can't parameterize your methods can you move L to class Serializer[T : TypeTag : ClassTag] { type L <: HList ... or class Serializer[T : TypeTag : ClassTag, L <: HList] { ... ? Could you provide more details (with code) why you can't parametrize the methods?Dmytro Mitin
I added more details @DmytroMitin, basically I don't understand what the L generic is nor how to instantiate it?wouterdz

1 Answers

1
votes

You should make Serializer a type class. (By the way, using vars without necessity is a bad practice.)

import java.io.ByteArrayOutputStream
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{BinaryEncoder, DatumWriter, DecoderFactory, EncoderFactory}
import org.apache.avro.reflect.ReflectData
import org.apache.avro.specific.SpecificRecordBase
import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema}
import org.apache.flink.api.common.typeinfo.TypeInformation
import shapeless.datatype.avro.{AvroType, FromAvroRecord, ToAvroRecord}
import shapeless.{HList, LabelledGeneric}  
import scala.reflect.runtime.universe.TypeTag
import scala.reflect.{ClassTag, classTag}

trait Serializer[T] extends SerializationSchema[T] with DeserializationSchema[T] {
  type L <: HList
}

object Serializer {
  type Aux[T, L0 <: HList] = Serializer[T] { type L = L0 }

  def apply[T](implicit serializer: Serializer[T]): Serializer[T] = serializer

  implicit def mkSerializer[T : ClassTag : TypeTag, L0 <: HList](implicit
    gen: LabelledGeneric.Aux[T, L0],
    toL: ToAvroRecord[L0],
    fromL: FromAvroRecord[L0]): Aux[T, L0] =
    new Serializer[T] {
      type L = L0

      //Get type of the class at run time
      val inputClassType: Class[T] = classTag[T].runtimeClass.asInstanceOf[Class[T]]

      //Get Avro Type
      val avroType = AvroType[T]

      override def serialize(value : T) : Array[Byte] = {
        val schema: Schema =
          if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
            inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
          else ReflectData.get().getSchema(inputClassType)

        val out: ByteArrayOutputStream = new ByteArrayOutputStream()
        val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
        val writer: DatumWriter[GenericRecord] = new GenericDatumWriter[GenericRecord](schema)

        val genericRecord = avroType.toGenericRecord(value)

        writer.write(genericRecord, encoder)
        encoder.flush()
        out.close()

        out.toByteArray
      }

      override def deserialize(message: Array[Byte]) : T = {
        val schema: Schema =
          if (classOf[SpecificRecordBase].isAssignableFrom(inputClassType))
            inputClassType.newInstance.asInstanceOf[SpecificRecordBase].getSchema
          else ReflectData.get().getSchema(inputClassType)

        val datumReader = new GenericDatumReader[GenericRecord](schema)
        val decoder = DecoderFactory.get().binaryDecoder(message, null)

        avroType.fromGenericRecord(datumReader.read(null, decoder)).get
      }

      override def isEndOfStream(nextElement: T): Boolean = ???

      override def getProducedType: TypeInformation[T] = ???
    }
}

case class Test(str: String)    
val serializer = Serializer[Test]