I'm relatively new to using kafka streams and spring cloud stream and I'm having difficulty using the windowed aggregate functionality there.
What I'm trying to do is
- take my initial stream of UserInteractionEvents and group them by userProjectId (a String)
- create a windowed session of these events with a 15 min inactivity
- aggregate these windowed-sessions into a custom Session object
- then convert these Session objects into another custom UserSession object
My code is just this:
@EnableBinding(KafkaStreamsProcessor::class)
inner class SessionProcessorApplication {
@StreamListener("input")
@SendTo("output")
fun process(input: KStream<*, UserInteractionEvent>): KStream<*, UserSession> {
return input
.groupBy({ _, v -> v.userProjectId }, Serialized.with(Serdes.String(), UserInteractionEventSerde()))
.windowedBy(SessionWindows.with(TimeUnit.MINUTES.toMillis(15)))
.aggregate(
Initializer<Session>(::Session),
Aggregator<String, UserInteractionEvent, Session> { _, event, session -> session.interactions + event.interaction; session },
Merger<String, Session> { _, session1, session2 -> Session.merge(session1, session2)},
Materialized.`as`<String, Session, SessionStore<Bytes, ByteArray>>("windowed-sessions")
.withKeySerde(Serdes.String()).withValueSerde(SessionSerde()))
.toStream()
.map { windowed, session ->
KeyValue(windowed.key(),
UserSession(windowed.key(),
session.interactions,
Instant.ofEpochSecond(windowed.window().start()),
Instant.ofEpochSecond(windowed.window().end()))
)
}
}
}
I seem to be having issues in the aggregation part of it. Seeing class cast exceptions when trying to flush the windowed-sessions store. I'm fairly puzzled how to proceed from here. If someone could point out where I'm going wrong or to some documentation that deals with using session windows with custom serdes I'd appreciate it!
Thanks a lot!
Full stack trace below:
Exception in thread "default-dc0af3aa-8d8d-4b51-b0de-cdeb2dd83db6-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store windowed-sessions at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245) at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:327) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:307) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:302) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:292) at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87) at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:452) at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:381) at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:310) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer / value: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: [B). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters. at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:91) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41) at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85) at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42) at org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:176) at org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38) at org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) at org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196) at org.apache.kafka.streams.state.internals.MeteredSessionStore.flush(MeteredSessionStore.java:165) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242) ... 14 more Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:90) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78) at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87) ... 45 more
My config:
spring.cloud.stream.kafka.streams.bindings:
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
input:
consumer:
valueSerde: com.teckro.analytics.UserInteractionEventSerde
output:
producer:
valueSerde: com.teckro.analytics.UserSessionSerde
spring.cloud.stream.bindings:
input:
destination: test-interaction
consumer:
headerMode: raw
output:
destination: test-session
producer:
headerMode: raw