0
votes

I am building a spring cloud microservice, that consumes data from kafka topic. In the consumer, i bind the topic to an KStream. Incoming messages doesn't contain timestamp, because of the version of kafka is lower than 0.10. When i parse the incoming values, it works fine. Otherwise, when i grouped them by a key, it doesn't use the "default.timestamp.extractor" (has been set to org.apache.kafka.streams.processor.WallclockTimestampExtractor).

This service i have test it, using a different version of kafka (higher or equals to 0.10), and it worked fine.

Here is my config:

spring: cloud: stream: kafka: streams: binder: brokers: ${KAFKA_BROKERS} applicationId: email-messages-stream configuration: default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde commit.interval.ms: 1000 default.timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor poll.ms: 60000 # BLOCKING TIME WAITING FOR MORE MESSAGES buffered.records.per.partition: 2000

SomePart of my code:

    stream
        .mapValues(this::mapMessage)
        .groupBy(this::buildGroup, Serialized.with(new JsonSerde<>(Group.class), new JsonSerde<>(EmailMessage.class)))
        .windowedBy(TimeWindows.of(WINDOW_TIME))
        .aggregate(ArrayList::new, this::aggregate, Materialized.with(new JsonSerde<>(Group.class), new MessageListSerialization()))
        .toStream()
        .process(() -> new MailMessagesProcessor(emailService));

It is throwing me this error: org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(topic = .....) Use a different TimestampExtractor to process this data.

1

1 Answers

0
votes

Kafka Streams requires brokers 0.10.0 or newer. It's not compatible to older brokers.

  • Kafka Streams 0.10.0, is only compatible to 0.10.0 (or newer) brokers.

  • Kafka Streams 0.10.1 and newer, is backward compatible to 0.10.1 (but not older brokers) and compatible to newer brokers.

  • Furthermore, since Kafka Streams 1.0, message format 0.10 (or higher) is required. Hence, even if you upgrade your brokers to 0.10.0 (or higher), if your message format is not upgraded, too, it won't work either.

  • For using "exactly-once" feature, a broker version of 0.11.0 (or higher) is required.

For more details see: https://docs.confluent.io/current/streams/upgrade-guide.html#compatibility