I am building a spring cloud microservice, that consumes data from kafka topic. In the consumer, i bind the topic to an KStream. Incoming messages doesn't contain timestamp, because of the version of kafka is lower than 0.10. When i parse the incoming values, it works fine. Otherwise, when i grouped them by a key, it doesn't use the "default.timestamp.extractor" (has been set to org.apache.kafka.streams.processor.WallclockTimestampExtractor).
This service i have test it, using a different version of kafka (higher or equals to 0.10), and it worked fine.
Here is my config:
spring: cloud: stream: kafka: streams: binder: brokers: ${KAFKA_BROKERS} applicationId: email-messages-stream configuration: default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde commit.interval.ms: 1000 default.timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor poll.ms: 60000 # BLOCKING TIME WAITING FOR MORE MESSAGES buffered.records.per.partition: 2000
SomePart of my code:
stream
.mapValues(this::mapMessage)
.groupBy(this::buildGroup, Serialized.with(new JsonSerde<>(Group.class), new JsonSerde<>(EmailMessage.class)))
.windowedBy(TimeWindows.of(WINDOW_TIME))
.aggregate(ArrayList::new, this::aggregate, Materialized.with(new JsonSerde<>(Group.class), new MessageListSerialization()))
.toStream()
.process(() -> new MailMessagesProcessor(emailService));
It is throwing me this error: org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(topic = .....) Use a different TimestampExtractor to process this data.