I'm digging into Apache Kafka with Spring Cloud Stream and observed some behavior that makes me wonder if I'm doing something wrong or if it is working as intended - which I hardly doubt:
It is possible to lose messages on error!?
My setup is as simple as possible. A single Kafka broker and a topic with only 1 partition. Broker, topic, producer and consumer with default settings (auto-ack is true).
testcase 1
- produce
message1 - produce
message2 - start a consumer that will throw a RuntimeException on receiving any message
- consuming
message1, retry - consuming
message1, retry - consuming
message1, retry - exception is thrown
- consuming
message2, retry - consuming
message2, retry - consuming
message2, retry - exception is thrown
- stop and restart the consumer
- consuming
message1, retry - consuming
message1, retry - consuming
message1, retry - exception is thrown
- consuming
message2, retry - consuming
message2, retry - consuming
message2, retry - exception is thrown
Works as expected.
testcase 2
- produce
message1 - produce
message2 - start a consumer that will throw a RuntimeException on receiving exactly
message1 - consuming
message1, retry - consuming
message1, retry - consuming
message1, retry - exception is thrown
- successfully consuming
message2 - produce
message3 - successfully consuming
message3 - stop and restart the consumer
- nothing happens, the consumer waits for new messages to consume
message1 will be skipped because the commited offset has been set to message3. This is what troubles me. I don't want the consumer to continue with messages as long as prior messages were not successfully processed.
Has anyone experiences the same behavior and/or maybe could guide me on how to change this?
Thanks in advance!
Update: as requested, some code snippets
Create the topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic
Connect a producer
kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
Create a maven project with
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.7.RELEASE</version>
<relativePath/>
</parent>
...
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Dalston.SR4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>
Add the following application.yml
spring:
cloud:
stream:
bindings:
input:
destination: test-topic
contentType: text/plain
group: test-group
consumer:
header-mode: raw
kafka:
binder:
zkNodes: localhost:2181
brokers: localhost:9092
Add the following Application.java
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
private void consume(Message<String> message) {
log.info("Received: {}", message.getPayload());
if ("message1".equals(message.getPayload())
throw new RuntimeException();
log.info("Successfully processed message {}", message.getPayload());
}
}
That should be it. Run the application and use the console-producer to produce messages.
message2after the failed attempts withmessage1then I would be satisfied. - stphngrtz