7
votes

I have a processor that takes json string from a topic, with type GenericRecord. Now i split the stream in 2 branch. I take the first branch and i map the (key, value) in 2 string containing a specific field of json and a value of that field and i group by key. And so far so good. Now i have to aggregate the stream with a new Type user defined and i receive an exception.

Here the code:

The new type:

private class Tuple {

    public int occ;
    public int sum;


    public Tuple (int occ, int sum) {
        this.occ = occ;
        this.sum = sum;
    }

    public void sum (int toAdd) {
        this.sum += toAdd;
        this.occ ++;
    }

    public int getAverage () {
        return this.sum / this.occ;
    }

    public String toString() {
        return occ + "-> " + sum + ": " + getAverage();
    }

The good stream:

  StreamsBuilder builder = new StreamsBuilder();
    KStream<GenericRecord, GenericRecord> source =
          builder.stream(topic);

    KStream<GenericRecord, GenericRecord>[] branches = source.branch(
            (key,value) -> partition(value.toString()),
            (key, value) -> true
    );

    KGroupedStream <String, String> groupedStream = branches[0]
            .mapValues(value -> createJson(value.toString()))
            .map((key, value) -> KeyValue.pair(new String("T_DUR_CICLO"), value.getNumberValue("payload", "T_DUR_CICLO")))
            .peek((key, value) -> System.out.println("key=" + key + ", value=" + value))
            .groupByKey();

The problem:

   KTable<String, Tuple> aggregatedStream = groupedStream.aggregate(
            () -> new Tuple (0,0), // initializer 
            (aggKey, newValue, aggValue) -> new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue)));



    KafkaStreams streams = new KafkaStreams(builder.build(), props);
    streams.start();

This is the exception:

   Exception in thread "streamtest-6173d6a2-4a3a-4d76-b793-774719f8b1f5-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_0, processor=KSTREAM-SOURCE-0000000011, topic=streamtest-KSTREAM-AGGREGATE-STATE-STORE-0000000007-repartition, partition=0, offset=0
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:318)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:409)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:964)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (value: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer) is not compatible to the actual value type (value type: com.mycompany.maventest.Streamer$Tuple). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:195)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:66)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57)
    at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
    at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:302)
    ... 6 more
Caused by: java.lang.ClassCastException: com.mycompany.maventest.Streamer$Tuple cannot be cast to org.apache.avro.generic.GenericRecord
    at io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer.serialize(GenericAvroSerializer.java:39)
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:191)
    ... 19 more

How i can resolve this?

----- UPDATE ------

The producer produce with Avro so i have this configuration prperties:

 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);

If i specific the custom serde this is the result:

 KTable<String, Tuple> aggregatedStream = groupedStream.aggregate(
            () -> new Tuple(0, 0), // initializer 
            (aggKey, newValue, aggValue) ->  new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue)),
            Materialized.with(Serdes.String(), new MySerde()));

The Exception:

   Exception in thread "streamtest-17deb5c8-ed07-4fcf-bd59-37b75e44b83f-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:80)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:677)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

--- SOLVED ---- I have added also the new serde for the type change in groupBy

 KGroupedStream <String, String> groupedStream = branches[0]
            .mapValues(value -> createJson(value.toString()))
            .map((key, value) -> KeyValue.pair(new String("T_DUR_CICLO"), value.getNumberValue("payload", "T_DUR_CICLO")))
            .peek((key, value) -> System.out.println("key=" + key + ", value=" + value))
            .groupByKey( Serialized.with(
                    Serdes.String(), /* key (note: type was modified) */
                    Serdes.String()));  /* value */
1

1 Answers

6
votes

Kafka streams will use the default Serde unless it is explicitly specified with the operations.

In the aggregate() method, you are defining valueType as Tuple while the default serde is for GenericRecord hence it throws the exception. You need to specify the serde like below :

 KTable<String, Tuple> aggregatedStream = groupedStream.aggregate(
            () -> new Tuple (0,0), // initializer 
            (aggKey, newValue, aggValue) -> 
                 new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue))
                ,Materialized.with(keySerde, tupleSerde));

It will use the tupleSerde for this operation. You can find the example here: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#aggregating