0
votes

We are trying to port our Spring Cloud Stream application which consumes messages from Kafka to AWS Kenesis. We require manual acknowledgement for handling certain timeout conditions.

For Kafka we use property autocommitoffset to false and use the ACKNOWLEDGEMENT header to handle the manual acknowledgement.

I went through the documentation for Spring Cloud Stream went through the below: https://dataflow.spring.io/docs/recipes/kinesis/simple-producer-consumer/ https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc

But could not find any solutions. Any pointers would be very helpful.

1

1 Answers

2
votes

After some more searching, found the solution :

In Kenesis the shard is equivalent to partition and checkpoint to offset

In Application Yml:

spring:
  cloud:
    stream:
      kinesis:
        bindings:
          consumer-in-0:
            consumer:
              checkpointMode: manual

Sample Code for checkpointing

 @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            System.out.println("message received : "+ message.getPayload());
            System.out.println("message headers : "+ message.getHeaders());
            Checkpointer checkPointer = (Checkpointer) message.getHeaders().get(AwsHeaders.CHECKPOINTER);
            checkPointer.checkpoint();
           
        };
    }

Referred Kenesis Consumer Binder Properties