3
votes

I´m trying to process some Kafka records using Spring Cloud Stream 3.0.3.RELEASE, but I´m having trouble with Serdes configuration getting an error as soon as a record enters the stream pipeline.

This is the stack trace:

30-03-2020 19:28:33 ERROR org.apache.kafka.streams.KafkaStreams              [application-local,,,]: stream-client [joinPriorityData-applicationId-1302980a-a016-4167-9c0b-750ffb5d107a] All stream threads have died. The instance will be in error state and should be closed.
Exception in thread "joinPriorityData-applicationId-1302980a-a016-4167-9c0b-750ffb5d107a-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000001, topic=moaii.security.pe.incidence.queue, partition=0, offset=18108, stacktrace=org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.vcp.moaii.cep.dto.IncidenceState). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:116)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:363)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:425)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:912)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: java.lang.ClassCastException: class com.vcp.moaii.cep.dto.IncidenceState cannot be cast to class [B (com.vcp.moaii.cep.dto.IncidenceState is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:163)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:103)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
    ... 35 more

    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:380)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:425)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:912)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: com.vcp.moaii.cep.dto.IncidenceState). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:116)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:363)
    ... 5 more
Caused by: java.lang.ClassCastException: class com.vcp.moaii.cep.dto.IncidenceState cannot be cast to class [B (com.vcp.moaii.cep.dto.IncidenceState is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')
    at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:163)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:103)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
    ... 35 more

This is my Function:

@Bean
public Function<KTable<String, IncidenceItem>, KStream<String, ?>> joinPriorityData() {
    return incidenceStream -> incidenceStream
            .toStream()
            .filter((key, value) -> filterZoneTypeAndPriority(value))
            .selectKey((key, value) -> value.getsInc())
            .mapValues((readOnlyKey, value) -> stateMapper.toIncidendeState(value, null));
}

and this is my application.yml:

  spring.json.value.default.type: RawAccounting
  spring.cloud.stream:
    function.definition: joinPriorityData
    bindings:
      joinPriorityData-in-0:
        destination: moaii.security.pe.incidence.queue
        consumer.valueSerde: IncidenceItemSerde
      joinPriorityData-out-0:
        destination: moaii.security.pe.incidence.state
        producer.valueSerde: IncidenceItemSerde
    kafka:
      binder:
        configuration:
          auto.commit.interval.ms: 100
          auto.offset.reset: latest
      streams:
        binder:
          applicationId: moaii-cep
          content-type: application/json
          configuration:
            default:
              key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              value.serde:  org.springframework.kafka.support.serializer.JsonSerde
              useNativeDecoding: true

I´ve tryed a lot of different configurations from a lot of places, but any seems to work. As you can see in the stacktrace the default value Serdes are being ignored, and is using ByteArray instead.

I have created some custom serdes too, declaring them as a bean and using in the config file as shown, but same result.

Anyway i have the feeling that other configurations are being ignored too, for example I can see how the Ktable skips some old records with null Key and then fails when it reads the first not null one, even when i have auto.offset.reset: latest

Can´t say if it´s a Kafka or Spring problem, but can´t manage to fix it

Edit: In this logs you can see how the binder gets the correct serde for the Inbound but not for the outbound:

31-03-2020 11:54:24 INFO  o.s.c.s.b.k.streams.KafkaStreamsFunctionProcessor  [application-local,,,]: Key Serde used for joinPriorityData-in-0: org.apache.kafka.common.serialization.Serdes$StringSerde
31-03-2020 11:54:24 INFO  o.s.c.s.b.k.streams.KafkaStreamsFunctionProcessor  [application-local,,,]: Value Serde used for joinPriorityData-in-0: com.vcp.moaii.cep.broker.serde.MoaiiSerdes$IncidenceItemSerde

31-03-2020 11:54:27 INFO  o.s.c.stream.binder.kafka.streams.KStreamBinder    [application-local,,,]: Key Serde used for (outbound) moaii.security.pe.incidence.state: org.apache.kafka.common.serialization.Serdes$StringSerde    
31-03-2020 11:54:27 INFO  o.s.c.stream.binder.kafka.streams.KStreamBinder    [application-local,,,]: Value Serde used for (outbound) moaii.security.pe.incidence.state: org.apache.kafka.common.serialization.Serdes$ByteArraySerde

in the outbound the binding "joinPriorityData-out-0" is not even mentioned as you can see

2

2 Answers

1
votes

IncidenceState is loaded twice with two different class loaders. Its loaded once with the app (system class loader) and once with the bootstrap class loader.

Its a Kafka issue. Check this https://discuss.kotlinlang.org/t/classloading-error-with-kafka-streams/4547 and this https://youtrack.jetbrains.com/issue/KT-24966

There's a work around there on how to fix it.

1
votes

I wonder if the issue is related to your configuration and the function method. Try the following function and configuration and see if that makes a difference.

@Bean
public Function<KTable<String, IncidenceItem>, KStream<String, IncidenceState>> 
  //same as your original code
}

Note that I added the parameterized type for the outbound KStream. This is necessary for the binder to properly infer the Serde types to use. Assuming that both IncidenceItem and IncidenceState are JSON friendly objects, you can inonore providing any default serdes. However, you still need to provide them if your internal logic needs to rely on those Serdes. The following is the modified config. I removed the unnecessary properties or rearranged them.

spring.json.value.default.type: RawAccounting
  spring.cloud.stream:
    function.definition: joinPriorityData
    bindings:
      joinPriorityData-in-0:
        destination: moaii.security.pe.incidence.queue
      joinPriorityData-out-0:
        destination: moaii.security.pe.incidence.state
    kafka:
      streams:
        binder:
          applicationId: moaii-cep
          configuration:
            auto.commit.interval.ms: 100
            auto.offset.reset: latest
            default:
              key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              value.serde:  org.springframework.kafka.support.serializer.JsonSerde

I am not sure if you need both Kafka and Kafka Streams binders. It doesn't look that way from the setup that you shared. Therefore, moved all the configuration under the Kafka Streams binder configuration.

See if these changes make any difference.