1
votes

I am creating a Kafka Streams application and my topic data is coming from Protobuf. We are able to create Java code bindings for this. However, I am struggling to use correct serde to consume my data from topic. Can someone please share what wrong I am doing here.

Below is the properties definition, I am using:

Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-id-config");
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "my-broker:my-port");
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaProtobufSerde.class);

    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

My Serde class

public class AppSerdes extends Serdes {

    public static KafkaProtobufSerde<ProtobufClass1> createConfiguredSerde1() {
        KafkaProtobufSerde<ProtobufClass1> serde = new KafkaProtobufSerde<ProtobufClass1>();
        Map<String, Object> serdeConfig = getSerdeConfig();
        serde.configure(serdeConfig, false);
        return serde;
    }

    public static KafkaProtobufSerde<ProtobufClass2> createConfiguredSerde2() {
        KafkaProtobufSerde<ProtobufClass2> serde = new KafkaProtobufSerde<ProtobufClass2>();
        Map<String, Object> serdeConfig = getSerdeConfig();
        serde.configure(serdeConfig, false);
        return serde;
    }

    private static Map<String, Object> getSerdeConfig() {
        Map<String, Object> serdeConfig = new HashMap<>();
        serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

        return serdeConfig;
    }
}

And this is how I am creating KStream and KTable instances:

StreamsBuilder streamBuilder = new StreamsBuilder();
    KTable<String, ProtobufClass1> table = streamBuilder.table("topic1",
            Consumed.with(AppSerdes.String(), AppSerdes.createConfiguredSerde1()));
    KStream<String, ProtobufClass2> stream = streamBuilder.stream("topic2".
            Consumed.with(AppSerdes.String(), AppSerdes.createConfiguredSerde2()));

However, I am getting below error:

org.apache.kafka.streams.errors.StreamsException: ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.String, and value: com.google.protobuf.DynamicMessage. Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes. at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) Caused by: java.lang.ClassCastException: com.google.protobuf.DynamicMessage cannot be cast to iit.datahub.party.system_crm.v1.CustomerAddressBase$CustomerAddressBaseEntity at org.apache.kafka.streams.kstream.internals.KStreamImpl.lambda$internalSelectKey$0(KStreamImpl.java:234) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41) at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) ... 11 more

1
I've never used protobuf. But it says com.google.protobuf.DynamicMessage cannot be cast to iit.datahub.party.system_crm.v1.CustomerAddressBase$CustomerAddressBaseEntity I think I would try to find out what kind of class com.google.protobuf.DynamicMessage is.Erik
I have the same error DynamicMessage cannot be cast to giaosudau

1 Answers

0
votes

I am able to resolve it by changing this from

Map<String, Object> serdeConfig = getSerdeConfig();

To

Map<String, String> serdeConfig = getSerdeConfig();

As key & Value for this map are both strings.