Google Pub/Sub Java examples

google pub/sub c# example
google pubsub
python pubsub example
google cloud function publish to pubsub
java pubsub consumer
google-cloud-pubsub maven
google pubsub api
google pubsub view messages

I'm not able to find a way to read messages from pub/sub using java.

I'm using this maven dependency in my pom

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>0.17.2-alpha</version>
</dependency>

I implemented this main method to create a new topic:

public static void main(String... args) throws Exception {

        // Your Google Cloud Platform project ID
        String projectId = ServiceOptions.getDefaultProjectId();

        // Your topic ID
        String topicId = "my-new-topic-1";
        // Create a new topic
        TopicName topic = TopicName.create(projectId, topicId);
        try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
            topicAdminClient.createTopic(topic); 
        }
}

The above code works well and, indeed, I can see the new topic I created using the google cloud console.

I implemented the following main method to write a message to my topic:

public static void main(String a[]) throws InterruptedException, ExecutionException{
        String projectId = ServiceOptions.getDefaultProjectId(); 
        String topicId = "my-new-topic-1";

        String payload = "Hellooooo!!!";
        PubsubMessage pubsubMessage =
                  PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build();

        TopicName topic = TopicName.create(projectId, topicId);

        Publisher publisher;
        try {
            publisher = Publisher.defaultBuilder(
                    topic)
                    .build();
            publisher.publish(pubsubMessage);

            System.out.println("Sent!");
        } catch (IOException e) {
            System.out.println("Not Sended!");
            e.printStackTrace();
        }
}

Now I'm not able to verify if this message was really sent. I would like to implement a message reader using a subscription to my topic. Could someone show me a correct and working java example about reading messages from a topic?

Anyone can help me? Thanks in advance!

The Cloud Pub/Sub Pull Subscriber Guide has sample code for reading messages from a topic.

Writing and Responding to Pub/Sub Messages, The samples' README.md has instructions for running the samples. Sample, Source Code, Try it. Create Pull Subscription Example, source code � Open in Cloud� Pub/Sub provides reliable, many-to-many, asynchronous messaging between applications. Publisher applications can send messages to a topic, and other applications can subscribe to that topic to receive the messages. This document describes how to use the Cloud Client Library to send and receive Pub/Sub messages in a Java app. Prerequisites

Here is the version using the google cloud client libraries.

package com.techm.data.client;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;

/**
 * A snippet for Google Cloud Pub/Sub showing how to create a Pub/Sub pull
 * subscription and asynchronously pull messages from it.
 */
public class CreateSubscriptionAndConsumeMessages {

    private static String projectId = "projectId";
    private static String topicId = "topicName";
    private static String subscriptionId = "subscriptionName";

    public static void createSubscription() throws Exception {
        ProjectTopicName topic = ProjectTopicName.of(projectId, topicId);
        ProjectSubscriptionName subscription = ProjectSubscriptionName.of(projectId, subscriptionId);

        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
            subscriptionAdminClient.createSubscription(subscription, topic, PushConfig.getDefaultInstance(), 0);
        }
    }

    public static void main(String... args) throws Exception {
        ProjectSubscriptionName subscription = ProjectSubscriptionName.of(projectId, subscriptionId);       

        createSubscription();


        MessageReceiver receiver = new MessageReceiver() {
            @Override
            public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
                System.out.println("Received message: " + message.getData().toStringUtf8());
                consumer.ack();
            }
        };
        Subscriber subscriber = null;
        try {
            subscriber = Subscriber.newBuilder(subscription, receiver).build();
            subscriber.addListener(new Subscriber.Listener() {
                @Override
                public void failed(Subscriber.State from, Throwable failure) {
                    // Handle failure. This is called when the Subscriber encountered a fatal error
                    // and is
                    // shutting down.
                    System.err.println(failure);
                }
            }, MoreExecutors.directExecutor());
            subscriber.startAsync().awaitRunning();         

            // In this example, we will pull messages for one minute (60,000ms) then stop.
            // In a real application, this sleep-then-stop is not necessary.
            // Simply call stopAsync().awaitTerminated() when the server is shutting down,
            // etc.
            Thread.sleep(60000);
        } finally {
            if (subscriber != null) {
                subscriber.stopAsync().awaitTerminated();
            }
        }
    }
}

This is working fine for me.

Pub/Sub Client Libraries | Cloud Pub/Sub Documentation, The Cloud Pub/Sub Pull Subscriber Guide has sample code for reading messages from a topic. Write and deploy a simple Background Cloud Function, with Pub/Sub as the trigger. OCR (Optical Character Recognition) Tutorial. Use Cloud Functions, Cloud Storage, Cloud Vision, Cloud Translation, and Pub/Sub to upload images, extract text, translate the text, and save the translations.

The message reader is injected on the subscriber. This part of the code will handle the messages:

MessageReceiver receiver =
    new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Tutorials | Cloud Pub/Sub Documentation, Level up your Java code and explore what Spring can do for you. A Google Cloud Platform project with billing and Pub/Sub enabled <html lang="en"> < head> <meta charset="UTF-8"> <title>Spring Integration GCP sample</title> </ head>� Cloud Pub/Sub samples for Java. This repository contains several samples for Cloud Pub/Sub service with Java. appengine-push. A sample for push subscription running on Google App Engine. cmdline-pull. A command line sample for pull subscription. dataflow. Few samples for Cloud Dataflow streaming. grpc. A sample for accessing Cloud Pub/Sub with

I haven't used google cloud client libraries but used the api client libraries. Here is how I created a subscription.

package com.techm.datapipeline.client;

import java.io.IOException;
import java.security.GeneralSecurityException;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Create;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Get;
import com.google.api.services.pubsub.Pubsub.Projects.Topics;
import com.google.api.services.pubsub.model.ExpirationPolicy;
import com.google.api.services.pubsub.model.Subscription;
import com.google.api.services.pubsub.model.Topic;
import com.techm.datapipeline.factory.PubsubFactory;

public class CreatePullSubscriberClient {

    private final static String PROJECT_NAME = "yourProjectId";
    private final static String TOPIC_NAME = "yourTopicName";
    private final static String SUBSCRIPTION_NAME = "yourSubscriptionName";

    public static void main(String[] args) throws IOException, GeneralSecurityException {
        Pubsub pubSub = PubsubFactory.getService();

        String topicName = String.format("projects/%s/topics/%s", PROJECT_NAME, TOPIC_NAME);
        String subscriptionName = String.format("projects/%s/subscriptions/%s", PROJECT_NAME, SUBSCRIPTION_NAME);

        Topics.Get listReq = pubSub.projects().topics().get(topicName);
        Topic topic = listReq.execute();

        if (topic == null) {
            System.err.println("Topic doesn't exist...run CreateTopicClient...to create the topic");
            System.exit(0);
        }

        Subscription subscription = null;
        try {
            Get getReq = pubSub.projects().subscriptions().get(subscriptionName);
            subscription = getReq.execute();
        } catch (GoogleJsonResponseException e) {
            if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
                System.out.println("Subscription " + subscriptionName + " does not exist...will create it");
            }
        }

        if (subscription != null) {
            System.out.println("Subscription already exists ==> " + subscription.toPrettyString());
            System.exit(0);
        }

        subscription = new Subscription();

        subscription.setTopic(topicName);
        subscription.setPushConfig(null); // indicating a pull

        ExpirationPolicy expirationPolicy = new ExpirationPolicy();
        expirationPolicy.setTtl(null); // never expires;
        subscription.setExpirationPolicy(expirationPolicy);

        subscription.setAckDeadlineSeconds(null); // so defaults to 10 sec

        subscription.setRetainAckedMessages(true);

        Long _week = 7L * 24 * 60 * 60;
        subscription.setMessageRetentionDuration(String.valueOf(_week)+"s");

        subscription.setName(subscriptionName);

        Create createReq = pubSub.projects().subscriptions().create(subscriptionName, subscription);
        Subscription createdSubscription = createReq.execute();

        System.out.println("Subscription created ==> " + createdSubscription.toPrettyString());
    }

}

And once you create the subscription (pull type)...this is how you pull the messages from the topic.

package com.techm.datapipeline.client;

import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.util.Base64;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Acknowledge;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Get;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions.Pull;
import com.google.api.services.pubsub.model.AcknowledgeRequest;
import com.google.api.services.pubsub.model.Empty;
import com.google.api.services.pubsub.model.PullRequest;
import com.google.api.services.pubsub.model.PullResponse;
import com.google.api.services.pubsub.model.ReceivedMessage;
import com.techm.datapipeline.factory.PubsubFactory;

public class PullSubscriptionsClient {

    private final static String PROJECT_NAME = "yourProjectId";
    private final static String SUBSCRIPTION_NAME = "yourSubscriptionName";

    private final static String SUBSCRIPTION_NYC_NAME = "test";


    public static void main(String[] args) throws IOException, GeneralSecurityException {
        Pubsub pubSub = PubsubFactory.getService();

        String subscriptionName = String.format("projects/%s/subscriptions/%s", PROJECT_NAME, SUBSCRIPTION_NAME);
        //String subscriptionName = String.format("projects/%s/subscriptions/%s", PROJECT_NAME, SUBSCRIPTION_NYC_NAME);

        try {
            Get getReq = pubSub.projects().subscriptions().get(subscriptionName);
            getReq.execute();
        } catch (GoogleJsonResponseException e) {
            if (e.getStatusCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) {
                System.out.println("Subscription " + subscriptionName
                        + " does not exist...run CreatePullSubscriberClient to create");
            }
        }

        PullRequest pullRequest = new PullRequest();
        pullRequest.setReturnImmediately(false); // wait until you get a message
        pullRequest.setMaxMessages(1000);

        Pull pullReq = pubSub.projects().subscriptions().pull(subscriptionName, pullRequest);
        PullResponse pullResponse = pullReq.execute();

        List<ReceivedMessage> msgs = pullResponse.getReceivedMessages();
        List<String> ackIds = new ArrayList<String>();
        int i = 0;
        if (msgs != null) {
            for (ReceivedMessage msg : msgs) {
                ackIds.add(msg.getAckId());
                //System.out.println(i++ + ":===:" + msg.getAckId());
                String object = new String(Base64.decodeBase64(msg.getMessage().getData()));
                System.out.println("Decoded object String ==> " + object );
            }

            //acknowledge all the received messages
            AcknowledgeRequest content = new AcknowledgeRequest();
            content.setAckIds(ackIds);
            Acknowledge ackReq = pubSub.projects().subscriptions().acknowledge(subscriptionName, content);
            Empty empty = ackReq.execute();
        }

    }

}

Note: This client only waits until it receives at least one message and terminates if it's receives one (up to a max of value - set in MaxMessages) at once.

Let me know if this helps. I'm going to try the cloud client libraries soon and will post an update once I get my hands on them.

And here's the missing factory class ...if you plan to run it...

package com.techm.datapipeline.factory;


import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.PubsubScopes;

public class PubsubFactory {

    private static Pubsub instance = null;
    private static final Logger logger = Logger.getLogger(PubsubFactory.class.getName());

    public static synchronized Pubsub getService() throws IOException, GeneralSecurityException {
        if (instance == null) {
            instance = buildService();
        }
        return instance;
    }

    private static Pubsub buildService() throws IOException, GeneralSecurityException {
        logger.log(Level.FINER, "Start of buildService");
        HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
        JsonFactory jsonFactory = new JacksonFactory();
        GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);

        // Depending on the environment that provides the default credentials (for
        // example: Compute Engine, App Engine), the credentials may require us to
        // specify the scopes we need explicitly. 
        if (credential.createScopedRequired()) {
            Collection<String> scopes = new ArrayList<>();
            scopes.add(PubsubScopes.PUBSUB);
            credential = credential.createScoped(scopes);
        }

        logger.log(Level.FINER, "End of buildService");

        // TODO - Get the application name from outside.
        return new Pubsub.Builder(transport, jsonFactory, credential).setApplicationName("Your Application Name/Version")
                .build();
    }

}

googleapis/java-pubsub, Here is an example of how to publish a message to a Google Cloud Pub/Sub topic: Note: The ObjectMapper is used to convert Java POJOs to and from JSON. Google provides a set of Dataflow templates that offer a UI-based way to start Pub/Sub stream processing pipelines. If you use Java, you can also use the source code of these templates as a starting point to create a custom pipeline.

Google Pub/Sub Java examples, You will learn how to use Cloud Shell and the Cloud SDK gcloud command. This tutorial uses the sample code from the Spring Boot Getting Started guide. In Part 1I talked a little bit about how we like to use Google Cloud Pub/Sub here at QuintoAndar and then showed how to setup a Spring Boot application powered by Spring Cloud GCP. Now we're going

Getting Started, For example, projects/pubsub-demo-2/topics/test-topic. Once you've published the bot in import com.google.api.client.googleapis.javanet. This page shows how to get started with the Cloud Client Libraries for the Pub/Sub API. Note: Instructions on this page apply to the Python 3 and Java 8 App Engine standard environments. To use Python 2 with App Engine Standard, see this sample. To use Java 7, see the Google API Client Library for Java.

4. Google Cloud Pub/Sub, See What is Pub/Sub?. For an end-to-end example, see Building a Pub/Sub system. For additional use cases, see the Pub/Sub tutorials. To get details about the client library for your language of choice, see Client Libraries. To learn more about the concepts discussed in this page, see the Publisher and Subscriber guides.

Comments
  • Welcome to Stack Overflow! Questions asking us to recommend or find a book, tool, software library, tutorial or other off-site resource are off-topic for Stack Overflow as they tend to attract opinionated answers and spam. Instead, describe the problem and what has been done so far to solve it.
  • Maybe I was not clear. I'm not looking for a tutorial/book or an external resource. I'm looking for some lines of java code representing an example of how to read message from pubsub via java. I'll update my question.
  • this is a good link: cloud.google.com/pubsub/docs/… shows the Receiver part.
  • I got stuck on topicAdminClient.createTopic(topic);, I see this in the console: com.google.auth.oauth2.DefaultCredentialsProvider warnAboutProblematicCredentials WARNING: Your application has authenticated using end user credentials from Google Cloud SDK. We recommend that most server applications use service accounts instead. If your application continues to use end user credentials from Cloud SDK, you might receive a "quota exceeded" or "API not enabled" error. For more information about service accounts, see cloud.google.com/docs/authentication. Any idea??
  • Unfortunately, the sample code poorly explains how to the use the client API.
  • If you were just to put the code to create the subscriber in your main method, then it would be expected to quit. The startAsync method returns immediately. You would need to put something in place of the // ... to keep your main thread running.
  • It would greatly help me if you could elaborate a bit. What would that be ? A Thread.sleep() to wait for messages to be read throws errors.