0
votes

I am new to kafka. I am trying to leftJoin a kafka stream (named as inputStream) to kafka-table(named as detailTable) where the kafka-stream is built as:

//The consumer to consume the input topic
Consumed<String, NotifyRecipient> inputNotificationEventConsumed = Consumed
        .with(Constants.CONSUMER_KEY_SERDE, Constants.CONSUMER_VALUE_SERDE);


//Now create the stream that is directly reading from the topic
KStream<NotifyKey, NotifyVal> initialInputStream =
        streamsBuilder.stream(properties.getInputTopic(), inputNotificationEventConsumed);


//Now re-key the above stream for the purpose of left join
KStream<String, NotifyVal> inputStream = initialInputStream
          .map((notifyKey,notifyVal) ->
              KeyValue.pair(notifyVal.getId(),notifyVal)
          );

And the kafka-table is created this way:

//The consumer for the table
Consumed<String, Detail> notifyDetailConsumed =
          Consumed.with(Serdes.String(), Constants.DET_CONSUMER_VALUE_SERDE);

//Now consume from the topic into ktable
KTable<String, Detail> detailTable = streamsBuilder
          .table(properties.getDetailTopic(), notifyDetailConsumed);

Now I am trying to join the inputStream to the detailTable as:

  //Now join
  KStream<String,Pair<Long, SendCmd>> joinedStream = inputStream
          .leftJoin(detailTable, valJoiner)
          .filter((key,value)->value!=null);

I am getting an error from which it seems that during the join, the key and value of the inputStream were tried to cast into the default key-serde and default value-serde and getting a class cast exception.

Not sure how to fix this and need help there. Let me know if I should provide more info.

1

1 Answers

2
votes

Because you use a map(), key and value type might have changes and thus you need to specify the correct Serdes via Joined.with(...) as third parameter to .leftJoin().