6
votes

I'm not able to find a way to read messages from pub/sub using java.

I'm using this maven dependency in my pom

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>0.17.2-alpha</version>
</dependency>

I implemented this main method to create a new topic:

public static void main(String... args) throws Exception {

        // Your Google Cloud Platform project ID
        String projectId = ServiceOptions.getDefaultProjectId();

        // Your topic ID
        String topicId = "my-new-topic-1";
        // Create a new topic
        TopicName topic = TopicName.create(projectId, topicId);
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            topicAdminClient.createTopic(topic); 
        }
}

The above code works well and, indeed, I can see the new topic I created using the google cloud console.

I implemented the following main method to write a message to my topic:

public static void main(String a[]) throws InterruptedException, ExecutionException{
        String projectId = ServiceOptions.getDefaultProjectId(); 
        String topicId = "my-new-topic-1";

        String payload = "Hellooooo!!!";
        PubsubMessage pubsubMessage =
                  PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();

        TopicName topic = TopicName.create(projectId, topicId);

        Publisher publisher;
        try {
            publisher = Publisher.defaultBuilder(
                    topic)
                    .build();
            publisher.publish(pubsubMessage);

            System.out.println("Sent!");
        } catch (IOException e) {
            System.out.println("Not Sended!");
            e.printStackTrace();
        }
}

Now I'm not able to verify if this message was really sent. I would like to implement a message reader using a subscription to my topic. Could someone show me a correct and working java example about reading messages from a topic?

Anyone can help me? Thanks in advance!

4
Welcome to Stack Overflow! Questions asking us to recommend or find a book, tool, software library, tutorial or other off-site resource are off-topic for Stack Overflow as they tend to attract opinionated answers and spam. Instead, describe the problem and what has been done so far to solve it.Joe C
Maybe I was not clear. I'm not looking for a tutorial/book or an external resource. I'm looking for some lines of java code representing an example of how to read message from pubsub via java. I'll update my question.Andrea Zonzin
this is a good link: cloud.google.com/pubsub/docs/… shows the Receiver part.Jeryl Cook
I got stuck on topicAdminClient.createTopic(topic);, I see this in the console: com.google.auth.oauth2.DefaultCredentialsProvider warnAboutProblematicCredentials WARNING: Your application has authenticated using end user credentials from Google Cloud SDK. We recommend that most server applications use service accounts instead. If your application continues to use end user credentials from Cloud SDK, you might receive a "quota exceeded" or "API not enabled" error. For more information about service accounts, see cloud.google.com/docs/authentication. Any idea??CCC

4 Answers

4
votes

Here is the version using the google cloud client libraries.


package com.techm.data.client;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;

/**
 * A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub pull
 * subscription and asynchronously pull messages from it.
 */
public class CreateSubscriptionAndConsumeMessages {

    private static String projectId = "projectId";
    private static String topicId = "topicName";
    private static String subscriptionId = "subscriptionName";

    public static void createSubscription() throws Exception {
        ProjectTopicName topic = ProjectTopicName.of(projectId, topicId);
        ProjectSubscriptionName subscription = ProjectSubscriptionName.of(projectId, subscriptionId);

        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
            subscriptionAdminClient.createSubscription(subscription, topic, PushConfig.getDefaultInstance(), 0);
        }
    }

    public static void main(String... args) throws Exception {
        ProjectSubscriptionName subscription = ProjectSubscriptionName.of(projectId, subscriptionId);       

        createSubscription();


        MessageReceiver receiver = new MessageReceiver() {
            @Override
            public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
                System.out.println("Received message: " + message.getData().toStringUtf8());
                consumer.ack();
            }
        };
        Subscriber subscriber = null;
        try {
            subscriber = Subscriber.newBuilder(subscription, receiver).build();
            subscriber.addListener(new Subscriber.Listener() {
                @Override
                public void failed(Subscriber.State from, Throwable failure) {
                    // Handle failure. This is called when the Subscriber encountered a fatal error
                    // and is
                    // shutting down.
                    System.err.println(failure);
                }
            }, MoreExecutors.directExecutor());
            subscriber.startAsync().awaitRunning();         

            // In this example, we will pull messages for one minute (60,000ms) then stop.
            // In a real application, this sleep-then-stop is not necessary.
            // Simply call stopAsync().awaitTerminated() when the server is shutting down,
            // etc.
            Thread.sleep(60000);
        } finally {
            if (subscriber != null) {
                subscriber.stopAsync().awaitTerminated();
            }
        }
    }
}

This is working fine for me.

1
votes

The Cloud Pub/Sub Pull Subscriber Guide has sample code for reading messages from a topic.

1
votes

I haven't used google cloud client libraries but used the api client libraries. Here is how I created a subscription.

package com.techm.datapipeline.client;

import java.io.IOException;
import java.security.GeneralSecurityException;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Create;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Get;
import com.google.api.services.pubsub.Pubsub.Projects.Topics;
import com.google.api.services.pubsub.model.ExpirationPolicy;
import com.google.api.services.pubsub.model.Subscription;
import com.google.api.services.pubsub.model.Topic;
import com.techm.datapipeline.factory.PubsubFactory;

public class CreatePullSubscriberClient {

    private final static String PROJECT_NAME = "yourProjectId";
    private final static String TOPIC_NAME = "yourTopicName";
    private final static String SUBSCRIPTION_NAME = "yourSubscriptionName";

    public static void main(String[] args) throws IOException, GeneralSecurityException {
        Pubsub pubSub = PubsubFactory.getService();

        String topicName = String.format("projects/%s/topics/%s", PROJECT_NAME, TOPIC_NAME);
        String subscriptionName = String.format("projects/%s/subscriptions/%s", PROJECT_NAME, SUBSCRIPTION_NAME);

        Topics.Get listReq = pubSub.projects().topics().get(topicName);
        Topic topic = listReq.execute();

        if (topic == null) {
            System.err.println("Topic doesn't exist...run CreateTopicClient...to create the topic");
            System.exit(0);
        }

        Subscription subscription = null;
        try {
            Get getReq = pubSub.projects().subscriptions().get(subscriptionName);
            subscription = getReq.execute();
        } catch (GoogleJsonResponseException e) {
            if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
                System.out.println("Subscription " + subscriptionName + " does not exist...will create it");
            }
        }

        if (subscription != null) {
            System.out.println("Subscription already exists ==> " + subscription.toPrettyString());
            System.exit(0);
        }

        subscription = new Subscription();

        subscription.setTopic(topicName);
        subscription.setPushConfig(null); // indicating a pull

        ExpirationPolicy expirationPolicy = new ExpirationPolicy();
        expirationPolicy.setTtl(null); // never expires;
        subscription.setExpirationPolicy(expirationPolicy);

        subscription.setAckDeadlineSeconds(null); // so defaults to 10 sec

        subscription.setRetainAckedMessages(true);

        Long _week = 7L * 24 * 60 * 60;
        subscription.setMessageRetentionDuration(String.valueOf(_week)+"s");

        subscription.setName(subscriptionName);

        Create createReq = pubSub.projects().subscriptions().create(subscriptionName, subscription);
        Subscription createdSubscription = createReq.execute();

        System.out.println("Subscription created ==> " + createdSubscription.toPrettyString());
    }

}

And once you create the subscription (pull type)...this is how you pull the messages from the topic.

package com.techm.datapipeline.client;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.util.Base64;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Acknowledge;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Get;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Pull;
import com.google.api.services.pubsub.model.AcknowledgeRequest;
import com.google.api.services.pubsub.model.Empty;
import com.google.api.services.pubsub.model.PullRequest;
import com.google.api.services.pubsub.model.PullResponse;
import com.google.api.services.pubsub.model.ReceivedMessage;
import com.techm.datapipeline.factory.PubsubFactory;

public class PullSubscriptionsClient {

    private final static String PROJECT_NAME = "yourProjectId";
    private final static String SUBSCRIPTION_NAME = "yourSubscriptionName";

    private final static String SUBSCRIPTION_NYC_NAME = "test";


    public static void main(String[] args) throws IOException, GeneralSecurityException {
        Pubsub pubSub = PubsubFactory.getService();

        String subscriptionName = String.format("projects/%s/subscriptions/%s", PROJECT_NAME, SUBSCRIPTION_NAME);
        //String subscriptionName = String.format("projects/%s/subscriptions/%s", PROJECT_NAME, SUBSCRIPTION_NYC_NAME);

        try {
            Get getReq = pubSub.projects().subscriptions().get(subscriptionName);
            getReq.execute();
        } catch (GoogleJsonResponseException e) {
            if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
                System.out.println("Subscription " + subscriptionName
                        + " does not exist...run CreatePullSubscriberClient to create");
            }
        }

        PullRequest pullRequest = new PullRequest();
        pullRequest.setReturnImmediately(false); // wait until you get a message
        pullRequest.setMaxMessages(1000);

        Pull pullReq = pubSub.projects().subscriptions().pull(subscriptionName, pullRequest);
        PullResponse pullResponse = pullReq.execute();

        List<ReceivedMessage> msgs = pullResponse.getReceivedMessages();
        List<String> ackIds = new ArrayList<String>();
        int i = 0;
        if (msgs != null) {
            for (ReceivedMessage msg : msgs) {
                ackIds.add(msg.getAckId());
                //System.out.println(i++ + ":===:" + msg.getAckId());
                String object = new String(Base64.decodeBase64(msg.getMessage().getData()));
                System.out.println("Decoded object String ==> " + object );
            }

            //acknowledge all the received messages
            AcknowledgeRequest content = new AcknowledgeRequest();
            content.setAckIds(ackIds);
            Acknowledge ackReq = pubSub.projects().subscriptions().acknowledge(subscriptionName, content);
            Empty empty = ackReq.execute();
        }

    }

}

Note: This client only waits until it receives at least one message and terminates if it's receives one (up to a max of value - set in MaxMessages) at once.

Let me know if this helps. I'm going to try the cloud client libraries soon and will post an update once I get my hands on them.

And here's the missing factory class ...if you plan to run it...

package com.techm.datapipeline.factory;


import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.PubsubScopes;

public class PubsubFactory {

    private static Pubsub instance = null;
    private static final Logger logger = Logger.getLogger(PubsubFactory.class.getName());

    public static synchronized Pubsub getService() throws IOException, GeneralSecurityException {
        if (instance == null) {
            instance = buildService();
        }
        return instance;
    }

    private static Pubsub buildService() throws IOException, GeneralSecurityException {
        logger.log(Level.FINER, "Start of buildService");
        HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
        JsonFactory jsonFactory = new JacksonFactory();
        GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);

        // Depending on the environment that provides the default credentials (for
        // example: Compute Engine, App Engine), the credentials may require us to
        // specify the scopes we need explicitly. 
        if (credential.createScopedRequired()) {
            Collection<String> scopes = new ArrayList<>();
            scopes.add(PubsubScopes.PUBSUB);
            credential = credential.createScoped(scopes);
        }

        logger.log(Level.FINER, "End of buildService");

        // TODO - Get the application name from outside.
        return new Pubsub.Builder(transport, jsonFactory, credential).setApplicationName("Your Application Name/Version")
                .build();
    }

}
0
votes

The message reader is injected on the subscriber. This part of the code will handle the messages:

MessageReceiver receiver =
    new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };