0
votes

Using confluent 5.4.1

We happened to encounter an issue in the new KTable foreign key join while forwarding the joined stream from the joined KTable to another topic.

Schema registry and Avro serialiser related artifacts: confluent version 5.4.1 kafkastreams client: 5.4.1-ccs

After foreign key joins across Ktables, while consuming the stream from ktable using “for each”, it works fine but if the joined stream is forwarded using “to” to another topic, there is a schema registry error that is thrown.

Compared the schema mentioned in the error with the schema that is registered on schema registry, it is exactly the same… It seems there was a similar issue already from Kafka 2.4.0 : https://issues.apache.org/jira/browse/KAFKA-9390 and the issue continues to exist while forwarding it to another topic

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
io.confluent.kafka.schemaregistry.rest.exceptions.RestIncompatibleAvroSchemaException: Schema being registered is incompatible with an earlier schema
    at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.incompatibleSchemaException(Errors.java:64)
    at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:236)
    at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)
    at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:469)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:391)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:80)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:253)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:679)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:392)
    at org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:385)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:560)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:501)
    at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:438)
    at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1591)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:542)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1581)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1307)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:482)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1549)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1204)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
    at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:221)
    at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:772)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
    at org.eclipse.jetty.server.Server.handle(Server.java:494)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:374)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:268)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
    at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
    at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:367)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:782)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:918)
    at java.lang.Thread.run(Thread.java:748)
Caused by: io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException: New schema is incompatible with an earlier schema.
    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.register(KafkaSchemaRegistry.java:432)
    at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.registerOrForward(KafkaSchemaRegistry.java:481)
    at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:222)
    ... 57 more
; error code: 409
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:272) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:331) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:431) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:423) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:409) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:140) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:196) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:172) ~[kafka-schema-registry-client-5.4.1.jar:na]
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:71) ~[kafka-avro-serializer-5.4.1.jar:na]
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.4.1.jar:na]
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65) ~[kafka-streams-avro-serde-5.4.1.jar:na]
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38) ~[kafka-streams-avro-serde-5.4.1.jar:na]
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) ~[kafka-clients-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:166) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:106) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:124) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplier$1.process(SubscriptionResolverJoinProcessorSupplier.java:107) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplier$1.process(SubscriptionResolverJoinProcessorSupplier.java:60) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:432) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474) ~[kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:536) [kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:792) [kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) [kafka-streams-5.4.1-ccs.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [kafka-streams-5.4.1-ccs.jar:na]
1
It would be great if you could show your Avro scehmas currently in the registry and those you've tried to produceOneCricketeer

1 Answers

0
votes

My bad, the issue was because of a difference in schema that got registered earlier for the same topic through another compiled module... The error wasn't obvious enough to mention the topic from "to", it instead was mentioning an internal topic, which lead to the confusion. This is no longer an issue.