1
votes

Kafka JDBC Sink Connector gives a Null Pointer Exception for a message with schema having an optional field here 'parentId'. Have I missed anything? I am using out of the box JSONConverter and JDBC Sink Connector

A message on Kafka topic is

{
"schema":{
  "type":"struct",
  "fields":[
     {
        "field":"id",
        "type":"string"
     },
     {
        "field":"type",
        "type":"string"
     },
     {
        "field":"eventId",
        "type":"string"
     },
     {
        "field":"parentId",
        "type":"string",
        "optional":true
     },
     {
        "field":"created",
        "type":"int64",
        "name":"org.apache.kafka.connect.data.Timestamp",
        "version":1
     }
  ]
 },
 "payload":{
   "id":"asset-1",
   "type":"parcel",
   "eventId":"evt-1",
   "created":1501834166000
 }
}

And Connector is with these properties

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=admin
topics=asset-topic
tasks.max=1
batch.size=1
auto.evolve=true
connection.user=admin
auto.create=true
connection.url=jdbc:postgresql://postgres-db:5432/fabricdb
value.converter=org.apache.kafka.connect.json.JsonConverter
pk.mode=record_value
pk.fields=id

But JDBC Sink Connector is failing for the optional field parentId as

        org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.NullPointerException
        at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:698)
        at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:61)
        at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:181)
        at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:742)
        at org.apache.kafka.connect.json.JsonConverter.jsonToConnect(JsonConverter.java:361)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:350)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more
1
What is the version of Kafka Connect?Bartosz Wardziński

1 Answers

4
votes

According to source code of JsonConverter value for fields, that are marked as optional has to be in message payload.

You can find in JsonConverter class in method that converts jsonValue toObject`

    private static Object convertToConnect(Schema schema, JsonNode jsonValue) {
    final Schema.Type schemaType;
    if (schema != null) {
        schemaType = schema.type();
        if (jsonValue.isNull()) {
            if (schema.defaultValue() != null)
                return schema.defaultValue(); // any logical type conversions should already have been applied
            if (schema.isOptional())
                return null;
            throw new DataException("Invalid null value for required " + schemaType +  " field");
        }
    }

Summaries if schema is present in your case it is:

 {
    "field":"parentId",
    "type":"string",
    "optional":true
 }

Value has to be in the message payload. It can be null, but it has to be.

If you look at code from other site. Code, that is responsible for serialization. It adds NullNode for null reference.

    private static JsonNode convertToJson(Schema schema, Object logicalValue) {
    if (logicalValue == null) {
        if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional schema
            return null;
        if (schema.defaultValue() != null)
            return convertToJson(schema, schema.defaultValue());
        if (schema.isOptional())
            return JsonNodeFactory.instance.nullNode();
        throw new DataException("Conversion error: null value for field that is required and has no default value");
    }