1
votes

I'm relatively new to using kafka streams and spring cloud stream and I'm having difficulty using the windowed aggregate functionality there.

What I'm trying to do is

  1. take my initial stream of UserInteractionEvents and group them by userProjectId (a String)
  2. create a windowed session of these events with a 15 min inactivity
  3. aggregate these windowed-sessions into a custom Session object
  4. then convert these Session objects into another custom UserSession object

My code is just this:

    @EnableBinding(KafkaStreamsProcessor::class)
    inner class SessionProcessorApplication {

        @StreamListener("input")
        @SendTo("output")
        fun process(input: KStream<*, UserInteractionEvent>): KStream<*, UserSession> {
            return input
                .groupBy({ _, v -> v.userProjectId }, Serialized.with(Serdes.String(), UserInteractionEventSerde()))
                .windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(15)))
                .aggregate(
                        Initializer<Session>(::Session),
                        Aggregator<String, UserInteractionEvent, Session> { _, event, session ->  session.interactions + event.interaction; session  },
                        Merger<String, Session> { _, session1, session2 ->  Session.merge(session1, session2)},
                        Materialized.`as`<String, Session, SessionStore<Bytes, ByteArray>>("windowed-sessions")
                        .withKeySerde(Serdes.String()).withValueSerde(SessionSerde()))
                .toStream()
                .map { windowed, session ->
                    KeyValue(windowed.key(),
                            UserSession(windowed.key(),
                                    session.interactions,
                                    Instant.ofEpochSecond(windowed.window().start()),
                                    Instant.ofEpochSecond(windowed.window().end()))
                    )
                }
        }
    }

I seem to be having issues in the aggregation part of it. Seeing class cast exceptions when trying to flush the windowed-sessions store. I'm fairly puzzled how to proceed from here. If someone could point out where I'm going wrong or to some documentation that deals with using session windows with custom serdes I'd appreciate it!

Thanks a lot!

Full stack trace below:

Exception in thread "default-dc0af3aa-8d8d-4b51-b0de-cdeb2dd83db6-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store windowed-sessions at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245) at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:327) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:307) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:302) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:292) at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87) at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:452) at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:381) at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:310) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: [B). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters. at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:176) at org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38) at org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196) at org.apache.kafka.streams.state.internals.MeteredSessionStore.flush(MeteredSessionStore.java:165) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242) ... 14 more Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:90) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87) ... 45 more

My config:

spring.cloud.stream.kafka.streams.bindings:
  default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  input:
    consumer:
      valueSerde: com.teckro.analytics.UserInteractionEventSerde
  output:
    producer:
      valueSerde: com.teckro.analytics.UserSessionSerde

spring.cloud.stream.bindings:
  input:
    destination: test-interaction
    consumer:
      headerMode: raw
  output:
    destination: test-session
    producer:
      headerMode: raw
1

1 Answers

2
votes

I see some problems with your configuration.

The way the default Serdes are configured should be changed as below:

spring.cloud.stream.kafka.streams.binder.configuration:
  default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
  default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings:
  input:
    consumer:
      valueSerde: com.teckro.analytics.UserInteractionEventSerde
  output:
    producer:
      valueSerde: com.teckro.analytics.UserSessionSerde

It appears that you are using native Serde's for all de/serialization. You want to include that in the config. By default, the binder does the input/output serialization.

spring.cloud.stream.bindings:
  input:
    destination: test-interaction
    consumer:
      useNativeDecoding: true
  output:
    destination: test-session
    producer:
      useNativeEncoding: true

If the problems still persist, please create a simple sample project on Github and share with us. We will take a look.