0
votes

I try to change Flink runner code to let it read data from several Kafka topics and write it to different HDFS folders accordingly and without joining. I have a lot Java and Scala generic methods and generic object initiations inside the main process method and reflection. It work correctly with one Avro schema, but when I try to add unknown amount of Avro schema I have a problem with generics and reflection constructions.

How to resolve it? Whats design pattern can help me?

The model (Avro schema) is in Java classes.

    public enum Types implements MessageType {
    RECORD_1("record1", "01", Record1.getClassSchema(), Record1.class),
    RECORD_2("record2", "02", Record2.getClassSchema(), Record2.class);

    private String topicName;
    private String dataType;
    private Schema schema;
    private Class<? extends SpecificRecordBase> clazz;}



public class Record1 extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord 
{
  public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("???");
  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
... }

public class Record1 ...

The process trait with main process methods.

import org.apache.avro.specific.SpecificRecordBase
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.fs.Writer
import tests.{Record1, Record2, Types}
import scala.reflect.ClassTag

trait Converter[T] extends Serializable {
  def convertToModel(message: KafkaSourceType): T
}

trait FlinkRunner extends Serializable {

  val kafkaTopicToModelMapping: Map[String, Class[_ <: SpecificRecordBase]] =
    Map(
      "record_1" -> Types.RECORD_1.getClassType,
      "record_2" -> Types.RECORD_2.getClassType
    )

  def buildAvroSink1(path: String, writer1: Writer[Record1]): BucketingSink[Record1] = ???
  def buildAvroSink2(path: String, writer2: Writer[Record2]): BucketingSink[Record2] = ???

  def process(topicList: List[String], env: StreamExecutionEnvironment): Unit = {
    // producer kafka source building
    val clazz1: Class[Record1] = ClassTag(kafkaTopicToModelMapping(topicList.head)).runtimeClass.asInstanceOf[Class[Record1]]
    val clazz2: Class[Record2] = ClassTag(kafkaTopicToModelMapping(topicList.tail.head)).runtimeClass.asInstanceOf[Class[Record2]]
    // How to makes clazzes list from that val clazzes: List[Class[???]] = ???

    val avroTypeInfo1: TypeInformation[Record1] = TypeInformation.of(clazz1)
    val avroTypeInfo2: TypeInformation[Record2] = TypeInformation.of(clazz2)
    // How to makes clazzes list from that val avroTypeInfos = ???

    val stream: DataStream[KafkaSourceType] = ???

    // consumer avro paths building, it
    val converter1: Converter[Record1] = new Converter[Record1] {
      override def convertToModel(message: KafkaSourceType): Record1 = deserializeAvro[Record1](message.value)
    }
    val converter2: Converter[Record2] = new Converter[Record2] {
      override def convertToModel(message: KafkaSourceType): Record2 = deserializeAvro[Record2](message.value)
    }
      // How to makes converters list from that

    val outputResultStream1 = stream
      .filter(_.topic == topicList.head)
      .map(record => converter1.convertToModel(record))(avroTypeInfo1)

    val outputResultStream2 = stream
      .filter(_.topic == topicList.tail.head)
      .map(record => converter2.convertToModel(record))(avroTypeInfo2)

    val writer1 = new AvroSinkWriter[Record1](???)
    val writer2 = new AvroSinkWriter[Record2](???)

    // add sink and start process
  }
}

AS IS There are several different topics in Kafka. The Kafka version is 10.2 without Confluent. Every Kafka topic works with only one Avro schema class, written in Java. The only one Flink job (written in Scala) reads the only one topic, convert with one Avro schema and write the data to only one folder in HDFS. The name, path and output folder name are in config. For example there are 3 job flows with parameters:

First Job Flow

--brokersAdress … 
--topic record1
--folderName  folder1
-- avroClassName Record1
--output C:/….
--jobName SingleTopic1 
--number_of_parallel 2
--number_of_task 1
--mainClass Runner 
….

Second Job Flow

--brokersAdress … 
--topic record1
--folderName  folder1
-- avroClassName Record1
--output C:/….
--jobName SingleTopic2 
--number_of_parallel 2
--number_of_task 1
--mainClass Runner 
….

Third Job Flow

TO BE The one Flink job can read more than one Kafka topics, convert it with different Avro schema and write the data to different folders without joining. For example I can to start only one job flow which will do the same work

--brokersAdress … 
--topic record1, record2, record3
--folderName  folder1, folder2, 
-- avroClassName Record1, Record2
--output C:/….
--jobName MultipleTopics 
--number_of_parallel 3
--number_of_task 3
--mainClass Runner
...

Ok, thank you. There are several questions about code organization: 1) How to generalize variables in the method and methods (called procees) parameters to let initiate the List of several clazzes with inherited from SpecificRecordBase classes? If it possible, sure.

val clazz1: Class[Record1] = ClassTag(kafkaTopicToModelMapping(topicList.head)).runtimeClass.asInstanceOf[Class[Record1]]
val clazz2: Class[Record2] = ClassTag(kafkaTopicToModelMapping(topicList.tail.head)).runtimeClass.asInstanceOf[Class[Record2]]

2) The same question is for avroTypeInfo1, avroTypeInfo2 ..., converter1, converter2, ..., buildAvroSink1, buildAvroSink2, ... .

Also I have a questions about architecture. I tried to execute this code and Flink worked correctly with different topics with Avro schema classes. Which Flink code tools can help me to put different avro schema classes to several outputStrems and add sink with them? Do you have code examples with it?

And also what could I use instead the Flink to resolve issue with generating several Avro files from different Kafka topics? Perhaps confluent.

1
Have You considered using schema registry ?? - Dominik Wosiński
No I didn't. Could you write please some links with examples of using schema registry in similar cases. - Vadim
Could You say a little more about the usecase?? It seems that It would be a better idea to write AvroParquet files to HDFS, than pure Avro. - Dominik Wosiński
Just 've updated the question - Vadim
I'd still try to subdivide the problem into smaller chunks and then create independent topologies in the same process as sketched in my answer. If that somehow does not work for you, could you add your issues (depending on the length of the answer either as comment or update the question). - Arvid Heise

1 Answers

2
votes

I'm a bit lost on your motivation. The general idea is that if you want to use a generic approach, go with GenericRecord. If you have specific code for the different types go SpecificRecord but then don't use the generic code around it.

Further, if you don't need, try to do the best to not mix different events in the same topic/topology. Rather spawn different topologies in your same main for each subtype.

def createTopology[T](topic: String) {
  val stream: DataStream[KafkaSourceType] = 
    env.addSource(new FlinkKafkaConsumer[T](topic, AvroDeserializationSchema.forSpecific(T), properties))
  stream.addSink(StreamingFileSink.forBulkFormat(
    Path.fromLocalFile(folder),
    ParquetAvroWriters.forSpecificRecord(T)))
}