3
votes

We have a kafka cluster running with Avro schemas stored in Confluent's Schema-registry. On a recent re-deploy of (one of) our streams applications we starting seeing incompatible schema errors on a single topic (EmailSent). This is the only failing topic, and we receive the error any time a new EmailSent event is committed to the topic.

Caused by:org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"EmailSent","namespace":"com.company_name.communications.schemas","fields":[{"name":"customerId","type":"long","doc":"Customer's ID in the customers service"},{"name":"messageId","type":"long","doc":"The message id of the sent email"},{"name":"sentTime","type":{"type":"string","avro.java.string":"String"},"doc":"The campaign sent time in format 'yyyy-MM-dd HH:mm:ss.SSS'"},{"name":"campaignId","type":"long","doc":"The id of the campaign in the marketing suite"},{"name":"appId","type":["null","long"],"doc":"The app id associated with the sent email, if the email was related to a specific application","default":null}],"version":1} Caused by:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409; error code: 409 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:238) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:230) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:225) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72) at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:54) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:91) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:35) at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:79) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:232) at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:245) at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:153) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:193) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:188) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:35) at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:199) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:121) at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:63) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:222) at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:409) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:308) at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:939) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:819) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:771) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:741)

This schema has gone unchanged since June of 2018, and we've successfully processed EmailSent events up until this point.

The PR associated with the deployment of our Streams App does not change the schema, the Streams Processor throwing the errors, nor any of the streams application's dependencies. My suspicion lies in schema-registry, does anybody have any experience with something similar or insights on what could be causing the failure? I couldn't find any information on error code 409, does this ring any bells for anybody?

Thanks in advance.

1
Hard to help without knowing what schema is already there... Could you please upload both of the schemas? 409 is just what it says in the stacktrace. github.com/confluentinc/schema-registry/blob/5.1.0-post/core/…OneCricketeer
Were you ever able to figure out what is up here? I'm seeing a very similar error. I can take the schema printed in the logs as part of the error and manually post it to the registry without error - it's exactly the same as what's already registered. Setting compatibility mode to NONE changes nothing...salty

1 Answers

0
votes

I don't think the server will lie. You've not shown your two schemas for us to compare them (the one in the registry vs the one in your error message).

One way around the problem would be to set the configuration to NONE compatibility,

export KAFKA_TOPIC=logEvents
curl -X PUT http://schema-registry:8081/config/${KAFKA_TOPIC}-value -d '{"compatibility": "NONE"}' -H "Content-Type:application/json"

(do the same for the ${KAFKA_TOPIC}-key if you need to)

then upload your new schema.

But

  1. Set it back to backwards compatibility (or the original config) once you are done
  2. This can potentially corrupt an Avro consumer reading messages that are from an old schema and this new non-compatible one.