0
votes

there's something weird happening when we are creating AVRO messages through KSQL and try to consume them by using Kafka Connect. A bit of context:

Source data A 3rd party provider is producing data on one of our Kafka clusters as JSON (so far, so good). We actually see the data coming in.

Data Transformation As our internal systems require data to be encoded in AVRO, we created a KSQL cluster that transforms the incoming data into AVRO by creating the following stream in KSQL:

{
    "ksql": "
        CREATE STREAM src_stream (browser_name VARCHAR)
        WITH (KAFKA_TOPIC='json_topic', VALUE_FORMAT='JSON');

        CREATE STREAM sink_stream WITH (KAFKA_TOPIC='avro_topic',VALUE_FORMAT='AVRO',  PARTITIONS=1, REPLICAS=3) AS
        SELECT * FROM src_stream;
    ",
    "streamsProperties": {
        "ksql.streams.auto.offset.reset": "earliest"
    }
}

(so far, so good)

We see the data being produced from the JSON topic onto the AVRO topic, as the offset increases.

We then create a Kafka connector in a (new) Kafka Connect cluster. As some context, we are using multiple Kafka Connect clusters (with the same properties for those clusters), and as such we have a Kafka Connect cluster running for this data, but an exact copy of the cluster for other AVRO data (1 is for analytics, 1 for our business data).

The sink for this connector is BigQuery, we're using the Wepay BigQuery Sink Connector 1.2.0. Again, so far, so good. Our business cluster is running fine with this connector and the AVRO topics on the business cluster are streaming into BigQuery.

When we try to consume the AVRO topic created by our KSQL statement earlier however, we see an exception being thrown :/

The exception is the following:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: dpt_video_event-created_v2
 at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:98)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
 ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 0
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:209)
 at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:235)
 at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:415)
 at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:408)
 at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:123)
 at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:190)
 at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:169)
 at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:121)
 at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:243)
 at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:134)
 at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:85)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)

Which, to us, indicates that Kafka Connect is reading the message, decodes the AVRO and tries to fetch the schema with ID 0 from the schema registry. Obviously, schema IDs in the schema registry are always > 0.

We're currently stuck in trying to identify the issue here. It looks like KSQL is encoding the message with schema ID 0, but we're unable to find the cause for that :/

Any help is appreciated!

BR, Patrick

UPDATE: We have implemented a basic consumer for the AVRO messages and that consumer is correctly identifying the schema in the AVRO messages (ID: 3), so it seems to be rekated to Kafka Connect, instead of the actual KSQL / AVRO messages.

1
Is key.converter set to AvroConverter, by chance?OneCricketeer
Yes :( That solved it! For our other clusters we were using AVRO for keys as well, but as KSQL outputs them as String, it was trying to decode the key and fetch schema with ID 0.Patrick Plaatje

1 Answers

1
votes

Obviously, schema IDs in the schema registry are always > 0... It looks like KSQL is encoding the message with schema ID 0, but we're unable to find the cause for that

The AvroConverter does a "dumb check" that only looks that the consumed bytes start with a magic byte of 0x0. The next 4 bytes are the ID.

If you are using key.converter=AvroConverter and your keys start like 0x00000 in hex, then the ID would be shown as 0 in the logs, and the lookup would fail.

Last I checked, KSQL doesn't output keys in Avro format, so you will want to check the properties of your connector.