2
votes

I am trying to achieve exactly once delivery using spring-cloud-stream-binder-kafka in a spring boot application. The versions I am using are:

  • spring-cloud-stream-binder-kafka-core-1.2.1.RELEASE
  • spring-cloud-stream-binder-kafka-1.2.1.RELEASE
  • spring-cloud-stream-codec-1.2.2.RELEASE spring-kafka-1.1.6.RELEASE
  • spring-integration-kafka-2.1.0.RELEASE
  • spring-integration-core-4.3.10.RELEASE
  • zookeeper-3.4.8
  • Kafka version : 0.10.1.1

This is my configuration (cloud-config):

    spring:
      autoconfigure:
        exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
      kafka:
        consumer:
          enable-auto-commit: false
      cloud:
        stream:
          kafka:
            binder:
              brokers: "${BROKER_HOST:xyz-aws.local:9092}"
              headers:
                - X-B3-TraceId
                - X-B3-SpanId
                - X-B3-Sampled
                - X-B3-ParentSpanId
                - X-Span-Name
                - X-Process-Id
              zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
            bindings:
              feed_platform_events_input:
                consumer:
                  autoCommitOffset: false
          binders:
            xyzkafka:
              type: kafka
          bindings:
            feed_platform_events_input:
              binder: xyzkafka
              destination: platform-events
              group: br-platform-events

I have two main classes: FeedSink Interface:

package au.com.xyz.proxy.interfaces;
import org.springframework.cloud.stream.annotation.Input; 
import org.springframework.messaging.MessageChannel;

public interface FeedSink {

String FEED_PLATFORM_EVENTS_INPUT = "feed_platform_events_input";

@Input(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
MessageChannel feedlatformEventsInput();
} 

EventConsumer

package au.com.xyz.proxy.consumer;

@Slf4j
@EnableBinding(FeedSink.class)
public class EventConsumer {

    public static final String SUCCESS_MESSAGE =
            "SEND-SUCCESS : Successfully sent message to platform.";
    public static final String FAULT_MESSAGE = "SOAP-FAULT Code: {}, Description: {}";
    public static final String CONNECT_ERROR_MESSAGE = "CONNECT-ERROR Error Details: {}";
    public static final String EMPTY_NOTIFICATION_ERROR_MESSAGE =
            "EMPTY-NOTIFICATION-ERROR Empty Event Received from platform";

    @Autowired
    private CapPointService service;

    @StreamListener(FeedSink.FEED_PLATFORM_EVENTS_INPUT)
    /**
     * method associated with stream to process message.
     */
    public void message(final @Payload EventNotification eventNotification,
                        final @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {

        String caseMilestone = "UNKNOWN";
        if (!ObjectUtils.isEmpty(eventNotification)) {
            SysMessage sysMessage = processPayload(eventNotification);
            caseMilestone = sysMessage.getCaseMilestone();
            try {
                ClientResponse response = service.sendPayload(sysMessage);
                if (response.hasFault()) {
                    Fault faultDetails = response.getFaultDetails();
                    log.error(FAULT_MESSAGE, faultDetails.getCode(), faultDetails.getDescription());
                } else {
                    log.info(SUCCESS_MESSAGE);
                }
                acknowledgment.acknowledge();
            } catch (Exception e) {
                log.error(CONNECT_ERROR_MESSAGE, e.getMessage());
            }
        } else {
            log.error(EMPTY_NOTIFICATION_ERROR_MESSAGE);
            acknowledgment.acknowledge();
        }
    }



    private SysMessage processPayload(final EventNotification eventNotification) {
        Gson gson = new Gson();
        String jsonString =  gson.toJson(eventNotification.getData());
        log.info("Consumed message for platform events with payload : {} ", jsonString);
        SysMessage sysMessage = gson.fromJson(jsonString, SysMessage.class);
        return sysMessage;
    }
    }

I have set the autocommit property for Kafka and spring container as false. if you see in the EventConsumer class I have used Acknowledge in cases where I service.sendPayload is successful and there are no Exceptions. And I want container to move the offset and poll for next records. What I have observed is:

  • Scenario 1 - In case where the Exception is thrown and there are no new messages published on kafka. There is no retry to process the message and it seems there is no activity. Even if the underlying issue is resolved. The issue I am referring to is down stream server unavailability. Is there a way to retry the processing n times and then give up. Note this is retry of processing or repoll from the last committed offset. This is not about Kafka instance not available. If I restart the service (EC2 instance) then the processing happens from the offset where the last successful Acknowledge was done.

  • Scenario 2 - In case where Exception happened and then a subsequent message is pushed to kafka. I see the new message is processed and the offset moved. It means I lost the message which was not acknowledged. So the question is if I have handled the Acknowledge. How do I control to read from last commit not just the latest message and process it. I am assuming there is internally a poll happening and it did not take into account or did not know about the last message not being acknowledged. I don't think there are multiple threads reading from kafka. I dont know how the @Input and @StreamListener annotations are controlled. I assume the thread is controlled by property consumer.concurrency which controls the thread and by default it is set to 1.

So I have done research and found a lot of links but unfortunately none of them answers my specific questions. I looked at (https://github.com/spring-cloud/spring-cloud-stream/issues/575) which has a comment from Marius (https://stackoverflow.com/users/809122/marius-bogoevici):

Do note that Kafka does not provide individual message acking, which means that acknowledgment translates into updating the latest consumed offset to the offset of the acked message (per topic/partition). That means that if you're acking messages from the same topic partition out of order, a message can 'ack' all the messages before it.

not sure if it is the issue with order when there is one thread.

Apologies for long post, but I wanted to provide enough information. The main thing is I am trying to avoid losing messages when consuming from kafka and I am trying to see if spring-cloud-stream-binder-kafka can do the job or I have to look at alternatives.

Update 6th July 2018

I saw this post https://github.com/spring-projects/spring-kafka/issues/431 Is this a better approach to my problem? I can try latest version of spring-kafka

@KafkaListener(id = "qux", topics = "annotated4", containerFactory = "kafkaManualAckListenerContainerFactory",
                containerGroup = "quxGroup")
public void listen4(@Payload String foo, Acknowledgment ack, Consumer<?, ?> consumer) {
  • Will this help in controlling the offset to be set to where the last successfully processed record? How can I do that from the listen method. consumer.seekToEnd(); and then how will listen method reset to get the that record?
  • Does putting the Consumer in the signature provide support to get handle to consumer? Or I need to do anything more?
  • Should I use Acknowledge or consumer.commitSyncy()
  • What is the significance of containerFactory. do I have to define it as a bean.
  • Do I need @EnableKafka and @Configuration for above approach to work? Bearing in mind the application is a Spring Boot application.
  • By Adding Consumer to listen method I don't need to implement ConsumerAware Interface?

Last but not least, Is it possible to provide some example of above approach if it is feasible.

Update 12 July 2018

Thanks Gary (https://stackoverflow.com/users/1240763/gary-russell) for providing the tip of using maxAttempts. I have used that approach. And I am able to achieve exactly once delivery and preserve the order of the message.

My updated cloud-config:

    spring:
      autoconfigure:
        exclude: org.springframework.cloud.netflix.metrics.servo.ServoMetricsAutoConfiguration
      kafka:
        consumer:
          enable-auto-commit: false
      cloud:
        stream:
          kafka:
            binder:
              brokers: "${BROKER_HOST:xyz-aws.local:9092}"
              headers:
                - X-B3-TraceId
                - X-B3-SpanId
                - X-B3-Sampled
                - X-B3-ParentSpanId
                - X-Span-Name
                - X-Process-Id
              zkNodes: "${ZOOKEEPER_HOST:120.211.316.261:2181,120.211.317.252:2181}"
            bindings:
              feed_platform_events_input:
                consumer:
                  autoCommitOffset: false
          binders:
            xyzkafka:
              type: kafka
          bindings:
            feed_platform_events_input:
              binder: xyzkafka
              destination: platform-events
              group: br-platform-events
              consumer:
                maxAttempts: 2147483647
                backOffInitialInterval: 1000
                backOffMaxInterval: 300000
                backOffMultiplier: 2.0

Event Consumer remains the same as my initial implementation. Except for rethrowing the error for the container to know the processing has failed. If you just catch it then there is no way container knows the message processing has failures. By doing acknoweldgement.acknowledge you are just controlling the offset commit. In order for retry to happen you must throw the exception. Don't forget to set the kafka client autocommit property and spring (container level) autocommitOffset property to false. Thats it.

1

1 Answers

1
votes

As explained by Marius, Kafka only maintains an offset in the log. If you process the next message, and update the offset; the failed message is lost.

You can send the failed message to a dead-letter topic (set enableDlq to true).

Recent versions of Spring Kafka (2.1.x) have special error handlers ContainerStoppingErrorHandler which stops the container when an exception occurs and SeekToCurrentErrorHandler which will cause the failed message to be redelivered.