3
votes

I'm digging into Apache Kafka with Spring Cloud Stream and observed some behavior that makes me wonder if I'm doing something wrong or if it is working as intended - which I hardly doubt:

It is possible to lose messages on error!?

My setup is as simple as possible. A single Kafka broker and a topic with only 1 partition. Broker, topic, producer and consumer with default settings (auto-ack is true).

testcase 1

  • produce message1
  • produce message2
  • start a consumer that will throw a RuntimeException on receiving any message
  • consuming message1, retry
  • consuming message1, retry
  • consuming message1, retry
  • exception is thrown
  • consuming message2, retry
  • consuming message2, retry
  • consuming message2, retry
  • exception is thrown
  • stop and restart the consumer
  • consuming message1, retry
  • consuming message1, retry
  • consuming message1, retry
  • exception is thrown
  • consuming message2, retry
  • consuming message2, retry
  • consuming message2, retry
  • exception is thrown

Works as expected.

testcase 2

  • produce message1
  • produce message2
  • start a consumer that will throw a RuntimeException on receiving exactly message1
  • consuming message1, retry
  • consuming message1, retry
  • consuming message1, retry
  • exception is thrown
  • successfully consuming message2
  • produce message3
  • successfully consuming message3
  • stop and restart the consumer
  • nothing happens, the consumer waits for new messages to consume

message1 will be skipped because the commited offset has been set to message3. This is what troubles me. I don't want the consumer to continue with messages as long as prior messages were not successfully processed.

Has anyone experiences the same behavior and/or maybe could guide me on how to change this?

Thanks in advance!


Update: as requested, some code snippets

Create the topic

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic

Connect a producer

kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

Create a maven project with

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.7.RELEASE</version>
    <relativePath/>
</parent>

...

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Dalston.SR4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>


<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
</dependencies>

Add the following application.yml

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: test-topic
          contentType: text/plain
          group: test-group
          consumer:
            header-mode: raw
      kafka:
        binder:
          zkNodes: localhost:2181
          brokers: localhost:9092

Add the following Application.java

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

    private static final Logger log = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    private void consume(Message<String> message) {
        log.info("Received: {}", message.getPayload());
        if ("message1".equals(message.getPayload())
            throw new RuntimeException();
        log.info("Successfully processed message {}", message.getPayload());
    }
}

That should be it. Run the application and use the console-producer to produce messages.

3
if there is only one partition, why would the consumer consume message2 when failed to consume message1 in testcase2? - herokingsley
@herokingsley I've no idea but that is what is happening. If it wouldn't consume message2 after the failed attempts with message1 then I would be satisfied. - stphngrtz
maybe show us some code or log will be helpful - herokingsley
@herokingsley I've added some code snippets to my question. - stphngrtz
And how does the code for testcase1 looks like? - Arek

3 Answers

0
votes

In Kafka each message comes with an offset id. Your consumer application can keep a check on offset, and if any offset is skipped or missed instead of consuming the next message. you can use consumer.seek method get the specific message which is missing.

Offsets are incremental in nature and sequential.

And in your case use manual commit.

I could say use the below steps..

  1. After poll method, first check the previously committed offset and and request the next offset value

  2. Once message is consumed and processed successfully, save the offset value of successfully processed message in some internal memory or table. During next poll

The below link will not serve your usecase, but you can get fair idea

Refer Example

0
votes

You should configure a DLQ for such cases. If your message could not be consumed after 3 retries, it's most likely it won't be consumed at all or it needs special treatment. Set a DLQ where the poisonous message could land, and you won't loose messages

0
votes

Kafka gives you a runtime, but you have the power of choice. In some scenarios msgs can be lost/skipped, in some may not - you need to prepare the configuration according to your needs. IMO you should investigate further some of Spring Cloud Stream settings. You can also play around with disabling auto commits and commiting offset "by hand".