2
votes

I need to create a stream from a topic that is consumed with KafkaAvroDeserializer and not the standard kafka deserializers. This is because further down the line it will be sent out to a topic that is used in the confluent JDBC Sink connector (which does not support standard serializer/deserializers). When creating the topic I used the KafkaAvroSerializer for both key and value.

My original code (before I changed to use the Kafka Avro Serializers for the key) was:

final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC, Consumed.with(Serdes.String(), uploadSerde));

NOTE: Serdes.string above will not deserialize correctly since the key was serialized using the KafkaAvroSerializer. So, maybe there is a another form of code that will allow me to build a stream without having to set the key serdes (so it defaults to what is in the config) and I can just set the value serde (uploadSerde)?

If not, can someone tell me how I change the "Serdes.String()" standard deserizlaizer to a KafkaAvroDeserializer? e.g.

final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC, Consumed.with(<What can I insert here for the KafkaAvroDeserializer.String???>, uploadSerde));

In my consumer, I am setting the correct default deserializers:

streamsConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
streamsConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);

If use the form (and allow the defaults as specified in my consumer which are KafkaAvro):

final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC);

I get the following:

2018-04-08 00:24:53,433] ERROR [fc-demo-client-StreamThread-1] stream-thread [fc-demo-client-StreamThread-1] Failed to process stream task 0_0 due to the following error:    (org.apache.kafka.streams.processor.internals.AssignedTasks)
java.lang.ClassCastException: [B cannot be cast to java.lang.String
at     org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at    org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
....

I am using Java generated classes from avsc files and the uploadSerde is initialised with the generated Java class from the avro schema.

Thanks.

1
Thanks. So I create a wrapper around the KafkaAvroSerializer and KafkaAvroDeserializer, that instantiates these and then use the wrapper for the key parameter in Consumed. With?Arturo Knight
Is there an example of this? Would have thought creating a stream from a topic with a KafkaAvroSerialize'd String key was a common use case.Arturo Knight
I'm a bit confused here. I set the correct default deserializers in my StreamConfig. So I assume then that if I call builder.stream without passing in the serdes that it will apply the defaults for the key. It does not seem to be doing that. Perhaps it is on the value part where it is throwing the exception. To summarise: I have produced the keys and values in the topic using the KafkaAvroSerializers and I simply want to create a stream from that topic in my consumer. Surely it can't be that hard to achieve this?Arturo Knight
The issue you linked to Matthias looks like it is for value objects, not the key. My issue seems to be with the key, not the value. My Key has been serialized with the KafkaAvroSerializer. When I try to create a topic using the code as above (Consumed.with) it is not obvious to me how I can instruct Consumed.with to use the KafkaAvroDeserializer to deserialize the key even though I have specified the correct Deserializer in the config. Any help would be appreciated. Can't believe it is this difficult.Arturo Knight

1 Answers

4
votes

The logic is the same for key and value. Thus, you can handle both the same way.

Your confusion is about setting consumer deserializers in the config. Note, that those configs are ignored (for internal reasons). You cannot directly configure the deserializers of the consumer. You always need to use Serdes. Thus, if you want to set default deserializer for the consumer, you need so specify the default Serde in the config.

So I create a wrapper around the KafkaAvroSerializer and KafkaAvroDeserializer, that instantiates these and then use the wrapper for the key parameter in Consumed.with

Exactly. Or you can also set this Serde as default in the config.

Would have thought creating a stream from a topic with a KafkaAvroSerialize'd String key was a common use case

Not sure about this. If it's a plain String, I assume that people may use StringDeserializer directly instead of wrapping the String as Avro (not sure). Also note, that it's recommended to use a schema registry when dealing with Avro. Confluent's schema registry ships with correspoinding Avro Serdes: https://github.com/confluentinc/schema-registry/ (Disclaimer: I am an employee at Confluent.)