1
votes

I am working with kafka and apache flink. I am trying to consume records (which are in avro format) from a kafka topic in apache flink. Below is the piece of code I am trying with.

Using a custom deserialiser to deserialise avro records from the topic.

the Avro schema for the data I am sending to topic "test-topic" is as below.

{
  "namespace": "com.example.flink.avro",
  "type": "record",
  "name": "UserInfo",
  "fields": [
    {"name": "name", "type": "string"}
  ]
}

The custom deserialiser I am using is as below.

public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

    private static final long serialVersionUID = 1L;

    private final Class<T> avroType;

    private transient DatumReader<T> reader;
    private transient BinaryDecoder decoder;

    public AvroDeserializationSchema(Class<T> avroType) {
        this.avroType = avroType;
    }


    public T deserialize(byte[] message) {
        ensureInitialized();
        try {
            decoder = DecoderFactory.get().binaryDecoder(message, decoder);
            T t = reader.read(null, decoder);
            return t;
        } catch (Exception ex) {
            throw new RuntimeException(ex);
        }
    }

    private void ensureInitialized() {
        if (reader == null) {
            if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
                reader = new SpecificDatumReader<T>(avroType);
            } else {
                reader = new ReflectDatumReader<T>(avroType);
            }
        }
    }


    public boolean isEndOfStream(T nextElement) {
        return false;
    }


    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(avroType);
    }
}

And this is how my flink app is written.

public class FlinkKafkaApp {


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("bootstrap.servers", "localhost:9092");
        kafkaProperties.put("group.id", "test");

        AvroDeserializationSchema<UserInfo> schema = new AvroDeserializationSchema<UserInfo>(UserInfo.class);

        FlinkKafkaConsumer011<UserInfo> consumer = new FlinkKafkaConsumer011<UserInfo>("test-topic", schema, kafkaProperties);

        DataStreamSource<UserInfo> userStream = env.addSource(consumer);

        userStream.map(new MapFunction<UserInfo, UserInfo>() {

            @Override
            public UserInfo map(UserInfo userInfo) {
                return userInfo;
            }
        }).print();

        env.execute("Test Kafka");

    }

I am trying to print the record sent to the the topic which is as below. {"name" :"sumit"}

Output:

The output I am getting is {"name":""}

Can anyone help to figure out what is the issue here and why I am not getting {"name" : "sumit"} as output.

1
Could you also post the code for UserInfo? Does it extend SpecificRecord?Dawid Wysakowicz
UserInfo is generated using avro tools plugin. public class UserInfo extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"UserInfo\",\"namespace\":\"com.example.flink.avro\",\"fields\":[{\"name\":\"name\",\"type\":\"int\"}]}"); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }Sumit Nekar
If the class is SpecificRecord than it seems strange your code does not work. I can see nothing wrong about it. I would suggest checking if the deserialize method works on its own.Dawid Wysakowicz
I tried to debug deserialize method . Decoder has issue I guess. But i am not able find what is the issue.Sumit Nekar

1 Answers

3
votes

Flink documentation says : Flink’s Kafka consumer is called FlinkKafkaConsumer08 (or 09 for Kafka 0.9.0.x versions, etc. or just FlinkKafkaConsumer for Kafka >= 1.0.0 versions). It provides access to one or more Kafka topics.

We do not have to write the custom de-serializer to consume Avro messages from Kafka.

-To read SpecificRecords :

DataStreamSource<UserInfo> stream = streamExecutionEnvironment.addSource(new FlinkKafkaConsumer<>("test_topic", AvroDeserializationSchema.forSpecific(UserInfo.class), properties).setStartFromEarliest());

To read GenericRecords :

Schema schema = Schema.parse("{"namespace": "com.example.flink.avro","type": "record","name": "UserInfo","fields": [{"name": "name", "type": "string"}]}");
DataStreamSource<GenericRecord> stream = streamExecutionEnvironment.addSource(new FlinkKafkaConsumer<>("test_topic", AvroDeserializationSchema.forGeneric(schema), properties).setStartFromEarliest());

For more details : https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumer