0
votes

Im trying to implement this joins according this kafka documentation.

I've no idea why this joins does not work...

first try

First I passed all values.

without parameter Here it try to force generics to be of type Object, this is very wrong.

Without Joined with serialization options I receive this runtime exception:

Exception in thread "StreamAPP-stream-event-b3dc5fff-abee-4fa0-92f9-e1690f8fd152-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: ClassCastException while producing data to topic StreamAPP-stream-event-KSTREAM-KEY-SELECT-0000000025-repartition. A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: br.com.calebebrim.kafka.entities.stream.sharing.registry.StreamRegistryKey / value type: br.com.calebebrim.kafka.entities.stream.sharing.stream.Event). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, #to(String topic, Produced<K, V> produced) with Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class)))

Can anyone help me?

Thanks!

1

1 Answers

0
votes

Solved,

I found out that the join operation can not transform data.

So, I just applyed mapValues before like:

stream map values before join