I need to create a stream from a topic that is consumed with KafkaAvroDeserializer and not the standard kafka deserializers. This is because further down the line it will be sent out to a topic that is used in the confluent JDBC Sink connector (which does not support standard serializer/deserializers). When creating the topic I used the KafkaAvroSerializer for both key and value.
My original code (before I changed to use the Kafka Avro Serializers for the key) was:
final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC, Consumed.with(Serdes.String(), uploadSerde));
NOTE: Serdes.string above will not deserialize correctly since the key was serialized using the KafkaAvroSerializer. So, maybe there is a another form of code that will allow me to build a stream without having to set the key serdes (so it defaults to what is in the config) and I can just set the value serde (uploadSerde)?
If not, can someone tell me how I change the "Serdes.String()" standard deserizlaizer to a KafkaAvroDeserializer? e.g.
final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC, Consumed.with(<What can I insert here for the KafkaAvroDeserializer.String???>, uploadSerde));
In my consumer, I am setting the correct default deserializers:
streamsConfiguration.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
streamsConfiguration.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
If use the form (and allow the defaults as specified in my consumer which are KafkaAvro):
final KStream<String, DocumentUpload> uploadStream = builder.stream(UPLOADS_TOPIC);
I get the following:
2018-04-08 00:24:53,433] ERROR [fc-demo-client-StreamThread-1] stream-thread [fc-demo-client-StreamThread-1] Failed to process stream task 0_0 due to the following error: (org.apache.kafka.streams.processor.internals.AssignedTasks)
java.lang.ClassCastException: [B cannot be cast to java.lang.String
at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
....
I am using Java generated classes from avsc files and the uploadSerde is initialised with the generated Java class from the avro schema.
Thanks.