1
votes

I have created java pub/sub consumer relying on the following pub/sub doc.

public static void main(String... args) throws Exception {

        TopicName topic = TopicName.create(pubSubProjectName, pubSubTopic);
        SubscriptionName subscription = SubscriptionName.create(pubSubProjectName, "ssvp-sub");

        SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create();
        subscriptionAdminClient.createSubscription(subscription, topic, PushConfig.getDefaultInstance(), 0);

        MessageReceiver receiver =
                new MessageReceiver() {
                    @Override
                    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
                        System.out.println("Got message: " + message.getData().toStringUtf8());
                        consumer.ack();
                    }
                };
        Subscriber subscriber = null;
        try {
            subscriber = Subscriber.defaultBuilder(subscription, receiver).build();
            subscriber.addListener(
                    new Subscriber.Listener() {
                        @Override
                        public void failed(Subscriber.State from, Throwable failure) {
                            // Handle failure. This is called when the Subscriber encountered a fatal error and is shutting down.
                            System.err.println(failure);
                        }
                    },
                    MoreExecutors.directExecutor());
            subscriber.startAsync().awaitRunning();

            Thread.sleep(60000);
        } finally {
            if (subscriber != null) {
                subscriber.stopAsync();
            }
        }
    }

It works well, but every run it ask for a new subscriber name by throwing StatusRuntimeException exception.

io.grpc.StatusRuntimeException: ALREADY_EXISTS: Resource already exists in the project (resource=ssvp-sub).

(see SubscriptionName.create(pubSubProjectName, "ssvp-sub") line in my code snippet)

I found out that in node.js client we can pass "reuseExisting:true" option to reuse existing subscription :

topic.subscribe('maybe-subscription-name', { reuseExisting: true }, function(err, subscription) {
  // subscription was "get-or-create"-ed
});

What option should I pass if I use official java pubsub client?:

 <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-pubsub</artifactId>
      <version>0.13.0-alpha</version>
    </dependency>
1

1 Answers

5
votes

The Java library does not have a method to allow one to call createSubscription with an existing subscription and not have an exception thrown. You have a couple of options, both of which involve using a try/catch block. The choice depends on whether or not you want to be optimistic about the existence of the subscription.

Pessimistic call:

try {
  subscriptionAdminClient.createSubscription(subscription, 
                                             topic,
                                             PushConfig.getDefaultInstance(),
                                             0);
} catch (ApiException e) {
  if (e.getStatusCode() != Status.Code.ALREADY_EXISTS) {
    throw e;
  }
}

// You know the subscription exists and can create a Subscriber.

Optimistic call:

try {
  subscriptionAdminClient.getSubscripton(subscription);
} catch (ApiException e) {
  if (e.getStatusCode() == Status.Code.NOT_FOUND) {
    // Create the subscription
  } else {
    throw e;
  }
}

// You know the subscription exists and can create a Subscriber.

In general, it is often the case that one would create the subscription prior to starting up the subscriber itself (via the Cloud Console or gcloud CLI), so you might even want to do the getSubscription() call and throw an exception no matter what. If a subscription got deleted, you might want to draw attention to this case and handle it explicitly as it has implications (like the fact that messages are no longer being stored to be delivered to the subscription).

However, if you are doing something like building a cache server that just needs to get updates transiently while it is up and running, then creating the subscription on startup could make sense.