3
votes

Problem: After setting up a Kafka pipeline that pulls data in using a Kafka Connect JDBC source with Avro serializers and deserializers, once I try to read that data into a KStream using a Kafka Streams Java app, I get the following error.

org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

I've tried to follow the existing examples as closely as possible but there are some things that just aren't making sense. I'll include all the code / additional information below, but here's a couple of questions I have...

  1. One of the biggest gaps in understanding I currently have is what is being used for the "KEY" for the Avro record? The line that is erroring on me (at run time) has to do with the fact that I tell the KStream that the key is a LONG, yet when the Avro record is retrieved, the length is less than 8 (expected length of a LONG type).
    When I set up my JDBC Source, there's nothing there that identifies what the key is - and I've seen nothing in the documentation that would lead me to believe I can specify the key, although I've tried to:

    curl -X POST \
      -H "Content-Type: application/json" \
      --data 'see next code block for formatted data'  \
    http://localhost:8083/connectors
    
    // This is the data chunk used above but in a string - broke it apart for readability here
    {
        "name": "source-jdbc-ldw_applications",
        "config": {
            "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
            "tasks.max": 1,
            "connection.url": "jdbc:sqlserver://dbserver;databaseName=dbname;user=kafkareader;password=kafkareader;",
            "mode": "incrementing",
            "incrementing.column.name": "ApplicationID",
            "topic.prefix": "source-jdbc-",
            "poll.interval.ms": 30000,
            "table.whitelist": "LDW_Applications",
            "transforms": "setSchema",
            "transforms.setSchema.type": "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
            "transforms.setSchema.schema.name": "com.mycompany.avro.Application",
            "transforms.setSchema.schema.version": "1"
        }
    }
    

With the above, I get the following schema as reported by running:

curl http://localhost:8081/subjects/source-jdbc-LDW_Applications-value/versions/1 |jq

Here's the output of that:

{
    "subject": "source-jdbc-LDW_Applications-value",
    "version": 1,
    "id": 9,
    "schema": "{\"type\":\"record\",\"name\":\"Application\",\"namespace\":\"com.baydynamics.avro\",\"fields\":[{\"name\":\"ApplicationID\",\"type\":\"long\"},{\"name\":\"Name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"Description\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"Group\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"OwnerUserID\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"RiskScore\",\"type\":[\"null\",{\"type\":\"int\",\"connect.type\":\"int16\"}],\"default\":null},{\"name\":\"RiskRating\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ServiceLevelTierID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"LossPotentialID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ConfidentialityRequirementID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"IntegrityRequirementID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"AvailabilityRequirementID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"ApplicationCategoryID\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.version\":1,\"connect.name\":\"com.baydynamics.avro.Application\"}"
}

To see that schema a little prettier:

{
"type":"record",
"name":"Application",
"namespace":"com.baydynamics.avro",
"fields":[
    {
        "name":"ApplicationID",
        "type":"long"
    },
    {
        "name":"Name",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"Description",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"Group",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"OwnerUserID",
        "type":[
            "null",
            "long"
        ],
        "default":null
    },
    {
        "name":"RiskScore",
        "type":[
            "null",
            {
            "type":"int",
            "connect.type":"int16"
            }
        ],
        "default":null
    },
    {
        "name":"RiskRating",
        "type":[
            "null",
            "string"
        ],
        "default":null
    },
    {
        "name":"ServiceLevelTierID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"LossPotentialID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"ConfidentialityRequirementID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"IntegrityRequirementID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"AvailabilityRequirementID",
        "type":[
            "null",
            "int"
        ],
        "default":null
    },
    {
        "name":"ApplicationCategoryID",
        "type":[
            "null",
            "long"
        ],
        "default":null
    }
],
"connect.version":1,
"connect.name":"com.baydynamics.avro.Application"
}

So again, I don't see anything there that would indicate any particular field up there would be the key to the record.

So then I go into Kafka Streams, and I try to bring that data into a KStream...and it bombs...

final KStream<Long, Application> applicationStream = builder.stream(Serdes.Long(), applicationSerde, VULNERABILITY_TOPIC);

So, here's the thing, because I know the data stored behind the scenes is a BIGINT in SQL Server and that maps to a LONG in Java, I make the KStream's key type a Long and then I use the Serdes.Long() deserializer for the argument to the KStream builder.

When debugging, I see that the raw record has a length of 7 and that's why it throws the error. Apparently Avro serializes things in a way to compress better? I don't know. Regardless, the thing is I don't even know what key it thinks it's using actually is?! So who knows - maybe my assumption of Long is incorrect because it's not actually using the ApplicationID as the key? Why would I even assume it was?!

Any help on this would be appreciated. I know there's a lot of information up there, but in a nutshell..

  1. Using JDBC Kafka connect to push data into a topic
  2. Data is making it into the topic - I can see it via the console
  3. Trying to push that data into a stream so I can do some awesome things with the data and it blows up trying to fill the stream because the Serdes isn't compatible with the Avro Record

UPDATE 1: Per the advice of Randall below, I went and tried out the SMT (Single Message Transform) and now I have a key per record, which is an excellent step in the right direction, but for some reason it doesn't appear that the forced cast to a Long (INT64) has any real effect. I've taken some screenshots of the connector config with the SMT's, the resulting record (which now has a key!) and the same error I'm seeing in the Kafka stream: Screenshots mentioned above

1

1 Answers

3
votes

The Confluent JDBC source connector does not generate records with keys. A feature request to add this support has already been logged.

In the meantime, you could use a single message transform to extract some fields from the value to essentially create the key. The built-in ValueToKey transform does exactly that. This blog post has an example of that SMT.