We have a kafka cluster running with Avro schemas stored in Confluent's Schema-registry. On a recent re-deploy of (one of) our streams applications we starting seeing incompatible schema errors on a single topic (EmailSent). This is the only failing topic, and we receive the error any time a new EmailSent event is committed to the topic.
Caused by:org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"EmailSent","namespace":"com.company_name.communications.schemas","fields":[{"name":"customerId","type":"long","doc":"Customer's ID in the customers service"},{"name":"messageId","type":"long","doc":"The message id of the sent email"},{"name":"sentTime","type":{"type":"string","avro.java.string":"String"},"doc":"The campaign sent time in format 'yyyy-MM-dd HH:mm:ss.SSS'"},{"name":"campaignId","type":"long","doc":"The id of the campaign in the marketing suite"},{"name":"appId","type":["null","long"],"doc":"The app id associated with the sent email, if the email was related to a specific application","default":null}],"version":1} Caused by:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409; error code: 409 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:238) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:230) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:225) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72) at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:91) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:35) at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:79) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:232) at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:245) at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:153) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:193) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:188) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:35) at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:199) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:121) at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:63) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:222) at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:409) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:308) at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:939) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:819) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:771) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:741)
This schema has gone unchanged since June of 2018, and we've successfully processed EmailSent events up until this point.
The PR associated with the deployment of our Streams App does not change the schema, the Streams Processor throwing the errors, nor any of the streams application's dependencies. My suspicion lies in schema-registry, does anybody have any experience with something similar or insights on what could be causing the failure? I couldn't find any information on error code 409, does this ring any bells for anybody?
Thanks in advance.