1
votes

I have built one kafka topic with a single partition.

kafka-topics --create --topic files.write --if-not-exists --zookeeper zookeeper:32181 --partitions 1 --replication-factor 1

It would be possible to push many messages in this topic.

But I would like a single consumer (for a given group) processes these message one by one.

spring:
  application:
    name: file-consumer
  cloud:
    stream:
      kafka:
        binder:
          type: kafka
          brokers: localhost
          defaultBrokerPort: 29092
          defaultZkPort: 32181
          configuration:
            max.request.size: 300000
            max.message.bytes: 300000
        bindings:
          fileWriteBindingInput:
            consumer:
              autoCommitOffset: false
      bindings:
        fileWriteBindingInput:
          binder: kafka
          destination: files.write
          group: ${spring.application.name}
          contentType: 'text/plain'

And the Java sample code

@StreamListener(FileBindingProcessor.INPUT_FILE_WRITE)
public void onInputMessage(Message<String> message, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {

    // I Would like here to synchronize the processing of messages one by one
    // But, If many messages are pushed to this topic (single partition), they will be processed asynchronously event if I didn't yet acknowledge the current message

    acknowledgment.acknowledge();
}

What is missing in my configuration?

I thought, while message is not acknowledged (offset is not increased), none other message is consumed from the same partition.

3
This link might be helpful for you : github.com/spring-cloud/spring-cloud-stream/issues/575Nishu Tayal

3 Answers

2
votes

You can set this consumer config max.poll.records to 1 by default is 500

max.poll.records

The maximum number of records returned in a single call to poll().

2
votes

If autoCommitOffset is enabled (which is the default), then the binder will already acknowledge each record. So by the time, it gets to your StreamListener, the record is already acknowledged.

Correction: The above statement about StreamListener is not quite true. The auto-ack is done when the listener exits.

Since you only have a single partition, you will get the messages in the same order as it was sent to that topic partition. You can disable autoCommitOffset, in which case, you can use manual acknowledgment.

1
votes

Not acknowledging a message has nothing to do with stopping the next message from being delivered.

You cannot hand off a message to another thread and acknowledge it later; if you want single-thread processing, your must do all the processing on the listener thread.