1
votes

I have a Kafka consumer configured with schema polling from the topic, what I would like to do, is create another Avro schema, on top of the current one, and hydrate data using it, basically I don't need 50% of the information and need to write some logic to change a couple of fields. Thats just an example

    val consumer: KafkaConsumer<String, GenericRecord>(props) = createConsumer()
    while (true) {
        consumer.poll(Duration.ofSeconds(10).forEach {it ->
            println(it.value())
        }
    }

The event returned from stream is pretty complex, so I've modelled a smaller CustomObj as a .avsc file and compiled it to java. And when trying to run the code with the CustomObj, Error deserializing key/value for partition all I want to do is consume an event, and then deserialize it into a much smaller object with just selected fields.

return KafkaConsumer<String, CustomObj>(props)

This didn't work, not sure how can I deserialize it using CustomObj from the GenericRecord? Let me just add that I don't have any access to the stream or its config I can just consume from it.

1

1 Answers

1
votes

In Avro, your reader schema needs to be compatible with the writer schema. By giving the smaller object, you're providing a different reader schema

It's not possible to directly deserialize to a subset of the input data, so you must parse the larger object and map it to the smaller one (which isn't what deserialization does)