0
votes

I've used the subscriber example from the google documentation for Google PubSub the only modification I've made is commenting out the acknowledgement of the messages.

The subscriber doesn't add messages to the queue anymore while messages should be resent according to the interval set in the google cloud console.

Why is this happening or am I missing something?

public class SubscriberExample {

use the default project id
private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();

static class MessageReceiverExample implements MessageReceiver {



    @Override
    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        messages.offer(message);

        //consumer.ack();
    }
}
/** Receive messages over a subscription. */
public static void main(String[] args) throws Exception {
    // set subscriber id, eg. my-sub
    String subscriptionId = args[0];
    ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(
            PROJECT_ID, subscriptionId);
    Subscriber subscriber = null;
    try {
        // create a subscriber bound to the asynchronous message receiver
        subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample()).build();
        subscriber.startAsync().awaitRunning();
        // Continue to listen to messages
        while (true) {
            PubsubMessage message = messages.take();
            System.out.println("Message Id: " + message.getMessageId());
            System.out.println("Data: " + message.getData().toStringUtf8());
        }
    } finally {
       if (subscriber != null) {
            subscriber.stopAsync();
        }
    }
}
}
1

1 Answers

1
votes

When you do not acknowledge a messages, the Java client library calls modifyAckDeadline on the message until maxAckExtensionPeriod passes. By default, this value is one hour. Therefore, if you don't ack/nack the message or change this value, it is likely the message will not be redelivered for an hour. If you want to change the max ack extension period, set it on the builder:

subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample())
    .setMaxAckExtensionPeriod(Duration.ofSeconds(60))
    .build();               

It is also worth noting that when you don't ack or nack messages, then flow control may prevent the delivery of more messages. By default, the Java client library allows up to 1000 messages to be outstanding, i.e., waiting for ack or nack or for the max ack extension period to pass.