1
votes

I have a Java SpringBoot2 application (app1) that sends messages to a Google Cloud PubSub topic (it is the publisher).

Other Java SpringBoot2 application (app2) is subscribed to a subscription to receive those messages. But in this case, I have more than one instance (the k8s auto-scaling is enabled), so I have more than one pod for this app consuming messages from the PubSub.

Some messages are consumed by one instance of app2, but many others are sent to more than one app2 instance, so the messages process is duplicated for these messages.

Here is the code of consumer (app2):

private final static int ACK_DEAD_LINE_IN_SECONDS = 30;
private static final long POLLING_PERIOD_MS = 250L;
private static final int WINDOW_MAX_SIZE = 1000;
private static final Duration WINDOW_MAX_TIME = Duration.ofSeconds(1L);

@Autowired
private PubSubAdmin pubSubAdmin;

@Bean
public ApplicationRunner runner(PubSubReactiveFactory reactiveFactory) {
    return args -> {
        createSubscription("subscription-id", "topic-id", ACK_DEAD_LINE_IN_SECONDS);
        reactiveFactory.poll(subscription, POLLING_PERIOD_MS) // Poll the PubSub periodically
            .map(msg -> Pair.of(msg, getMessageValue(msg))) // Extract the message as a pair
            .bufferTimeout(WINDOW_MAX_SIZE, WINDOW_MAX_TIME) // Create a buffer of messages to bulk process 
            .flatMap(this::processBuffer) // Process the buffer
            .doOnError(e -> log.error("Error processing event window", e))
            .retry()
            .subscribe();
    };
}

private void createSubscription(String subscriptionName, String topicName, int ackDeadline) {
    pubSubAdmin.createTopic(topicName);
    try {
        pubSubAdmin.createSubscription(subscriptionName, topicName, ackDeadline);
    } catch (AlreadyExistsException e) {
        log.info("Pubsub subscription '{}' already configured for topic '{}': {}", subscriptionName, topicName, e.getMessage());
    }
}

private Flux<Void> processBuffer(List<Pair<AcknowledgeablePubsubMessage, PreparedRecordEvent>> msgsWindow) {
    return Flux.fromStream(
        msgsWindow.stream()
            .collect(Collectors.groupingBy(msg -> msg.getRight().getData())) // Group the messages by same data
            .values()
            .stream()
    )
    .flatMap(this::processDataBuffer);
}

private Mono<Void> processDataBuffer(List<Pair<AcknowledgeablePubsubMessage, PreparedRecordEvent>> dataMsgsWindow) {
    return processData(
        dataMsgsWindow.get(0).getRight().getData(),
        dataMsgsWindow.stream()
            .map(Pair::getRight)
            .map(PreparedRecordEvent::getRecord)
            .collect(Collectors.toSet())
    )
    .doOnSuccess(it ->
        dataMsgsWindow.forEach(msg -> {
            log.info("Mark msg ACK");
            msg.getLeft().ack();
        })
    )
    .doOnError(e -> {
        log.error("Error on PreparedRecordEvent event", e);
        dataMsgsWindow.forEach(msg -> {
            log.error("Mark msg NACK");
            msg.getLeft().nack();
        });
    })
    .retry();
}

private Mono<Void> processData(Data data, Set<Record> records) {
    // For each message, make calculations over the records associated to the data
    final DataQuality calculated = calculatorService.calculateDataQualityFor(data, records); // Arithmetic calculations
    return this.daasClient.updateMetrics(calculated) // Update DB record with a DaaS to wrap DB access
        .flatMap(it -> {
            if (it.getProcessedRows() >= it.getValidRows()) {
                return finish(data);
            }
            return Mono.just(data);
        })
        .then();
}

private Mono<Data> finish(Data data) {
    return dataClient.updateStatus(data.getId, DataStatus.DONE) // Update DB record with a DaaS to wrap DB access
        .doOnSuccess(updatedData -> pubSubClient.publish(
            new Qa0DonedataEvent(updatedData) // Publis a new event in other topic
        ))
        .doOnError(err -> {
            log.error("Error finishing data");
        })
        .onErrorReturn(data);
}

I need that each messages is consumed by one and only one app2 instance. Anybody know if this is possible? Any idea to achieve this?

Maybe the right way is to create one subscription for each app2 instance and configure the topic to send each message t exactly one subscription instead of to every one. It is possible?

According to the official documentation, once a message is sent to a subscriber, Pub/Sub tries not to deliver it to any other subscriber on the same subscription (app2 instances are subscriber of the same subscription):

Once a message is sent to a subscriber, the subscriber should acknowledge the message. A message is considered outstanding once it has been sent out for delivery and before a subscriber acknowledges it. Pub/Sub will repeatedly attempt to deliver any message that has not been acknowledged. While a message is outstanding to a subscriber, however, Pub/Sub tries not to deliver it to any other subscriber on the same subscription. The subscriber has a configurable, limited amount of time -- known as the ackDeadline -- to acknowledge the outstanding message. Once the deadline passes, the message is no longer considered outstanding, and Pub/Sub will attempt to redeliver the message

3
Can you share the piece of code where you connect to PubSub and how you consume/ack the messages in your App2?guillaume blaquiere
Edited to add the consumer code. It uses a buffer to batch process the messages and then group them by "data" to process each time the messages related to the same "data".Alberthoven
Your subscription object isn't clear (even wrong!). I have the feeling that you create a new subscription every time, but I'm not sure.guillaume blaquiere
I create the topic and the subscription on startup, so if I have N instances (pods) of the same app, they try to create the topic and subscription N times. But they are only created if they do not exist!!! So I think there is nothing wrong there.Alberthoven

3 Answers

0
votes

In general, Cloud Pub/Sub has at-least-once delivery semantics. That means that it will be possible to have messages redelivered that have already been acked and to have messages delivered to multiple subscribers receive the same message for a subscription. These two cases should be relatively rare for a well-behaved subscriber, but without keeping track of the IDs of all messages delivered across all subscribers, it will not be possible to guarantee that there won't be duplicates.

If it is happening with some frequency, it would be good to check if your messages are getting acknowledged within the ack deadline. You are buffering messages for 1s, which should be relatively small compared to your ack deadline of 30s, but it also depends on how long the messages ultimately take to process. For example, if the buffer is being processed in sequential order, it could be that the later messages in your 1000-message buffer aren't being processed in time. You could look at the subscription/expired_ack_deadlines_count metric in Cloud Monitoring to determine if it is indeed the case that your acks for messages are late. Note that late acks for even a small number of messages could result in more duplicates. See the "Message Redelivery & Duplication Rate" section of the Fine-tuning Pub/Sub performance with batch and flow control settings post.

0
votes

Ok, after doing tests, reading documentation and reviewing the code, I have found a "small" error in it. We had a wrong "retry" on the "processDataBuffer" method, so when an error happened, the messages in the buffer were marked as NACK, so they were delivered to another instance, but due to retry, they were executed again, correctly, so messages were also marked as ACK. For this, some of them were prosecuted twice.

private Mono<Void> processDataBuffer(List<Pair<AcknowledgeablePubsubMessage, PreparedRecordEvent>> dataMsgsWindow) {
    return processData(
        dataMsgsWindow.get(0).getRight().getData(),
        dataMsgsWindow.stream()
            .map(Pair::getRight)
            .map(PreparedRecordEvent::getRecord)
            .collect(Collectors.toSet())
    )
    .doOnSuccess(it ->
        dataMsgsWindow.forEach(msg -> {
            log.info("Mark msg ACK");
            msg.getLeft().ack();
        })
    )
    .doOnError(e -> {
        log.error("Error on PreparedRecordEvent event", e);
        dataMsgsWindow.forEach(msg -> {
            log.error("Mark msg NACK");
            msg.getLeft().nack();
        });
    })
    .retry(); // this retry has been deleted
}

My question is resolved.

0
votes

Once corrected the mentioned bug, I still receive duplicated messages. It is accepted that Google Cloud's PubSub does not guarantee the "exactly one deliver" when you use buffers or windows. This is exactly my scenario, so I have to implement a mechanism to remove dups based on a message id.