Disclaimer: My experience with KafkaStreams is quite limited.
I do not quite understand why am I getting a org.apache.kafka.common.errors.SerializationException: Error registering Avro schema:
and Schema being registered is incompatible with an earlier schema;
error when all I'm doing is Streaming a Topic into a KTable so I can later use Interactive Queries on that Store.
here's the SerDes config.
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
Streams code: Note I do not wish to do any filtering or grouping with the Stream at this point, I just want the data to be available for future querying over a Store.
final KStream<String,GenericRecord> stream = builder.stream("my-topic");
stream.toTable(Materialized.as("my-state-store"));
whenever I streams.start()
I keep getting those Exceptions whilst Serialising.
I've got a schema for this so I've used the SpecificAvroSerdes but the problem is the same.
I guess I am missing some fundamental understanding on why my KTable is attempting to register a new Schema with Confluent.
Edit 1: I now understand the role being played by the schema register here. Using the KStream with a GenericAvroSerde I can consume the data from the Topic however still not being able to Materialize it in the KTable. My questions now are:
- Why do I keep getting the above exception always in the same partition and offset, even thought I am not calling
streams.cleanUp()
. Why isn't it moving on (committing). - This exception seems to be un-recoverable. All the Streams threads die bringing the app down. Is there a way to circumvent this? Note: I'm already using the LogAndContinue exception handlers for Production and Deserialization.
Edit 2:
I was able to overcome that exception. My StateStore contained previous entries with an incompatible schema. After I Purged the Topic and changed the ApplicationId it started to work.
This still doesn't negate the need to trap the Schema being registered is incompatible with an earlier schema;
Exception though. This brings the Streams Application to a halt. I have tried usingstreams.setUncaughtExceptionHandler
where I'm able to log the Error however this doesn't prevent the Streams Threads to die and I can't even start them after that. Surely there's a way to trap this?