1
votes

I'm trying to build the Materialized.as DSL code here: https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/state/Stores.html

But I'm getting the error

incompatible types: org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted to org.apache.kafka.common.serialization.Serde<java.lang.Object>

On the line

.withKeySerde(Serdes.Long())

Does anyone know what might be wrong here?

final StreamsBuilder builder = new StreamsBuilder();

   KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("mystore");
   KTable<Long,String> dataStore = builder.table(
     "example_stream",
     Materialized.as(storeSupplier)
             .withKeySerde(Serdes.Long())
             .withValueSerde(Serdes.String()));
2
Please add code with toplogy/state store buildingBartosz Wardziński
Please add the code.Nishu Tayal
Apologies, I added the code building the store.J. Doe
See minimal reproducible example. Add builder definition, and all other code that builder usesOneCricketeer
What is the Stream Builder default Key and Value Serde Config?Nishu Tayal

2 Answers

3
votes

The problem is that builder.table does not know the generic type defaulting to <Object,Object>. Later, the Serde types don't match. You need to specify the types like

KTable<Long,String> dataStore = builder.<Long,String>table(
    "example_stream",
    Materialized.as(storeSupplier)
        .withKeySerde(Serdes.Long())
        .withValueSerde(Serdes.String()));
0
votes

I cannot say for sure without a code sample, however the error message is very clear. You are specifying to Kafka that the key is of type Long. However your key is actually some other Java Object. For example if you had a message with a String key, this code would change to: .withKeySerde(Serdes.String()). Check the type of the key and specify the correct Serde for that type.