0
votes

I have built a piece of software which is using GCP Pub/Sub as message queue, Apache Beam to build a pipeline and Flask to build a webserver. It is running smoothly in production but I have trouble to make all the piece connect together with docker-compose, in particular the Apache Beam pipeline.

I have followed Dataflow pipeline and pubsub emulator to make the pipeline listen to a GCP Pub/Sub emulator by replacing the localhost from the SO answer by the name of the service defined in my docker-compose.yaml:

  pubsub_emulator:
    build: docker_images/message_queue
    ports:
      - 8085:8085

  webserver:
    build: docker_images/webserver
    environment:
      PUBSUB_EMULATOR_HOST: pubsub_emulator:8085
      PUBSUB_PROJECT_ID: my-dev
    restart: unless-stopped
    ports:
      - 8899:8080
    depends_on:
      - pubsub_emulator

   pipeline:
    build: docker_images/pipeline
    environment:
      PUBSUB_EMULATOR_HOST: pubsub_emulator:8085
      PUBSUB_PROJECT_ID: my-dev
    restart: unless-stopped
    depends_on:
      - pubsub_emulator

The webserver is able to access the Pub/Sub emulator and to generate topics.

However, the pipeline fails on start-up with a MalformedURLException:

Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: no protocol: pubsub_emulator:8085/v1/projects/my-dev/subscriptions/sync_beam_1702190853678138166

The options of the pipeline seems fine, I defined them with:

final String pubSubEmulatorHost = System.getenv("PUBSUB_EMULATOR_HOST"); 

BasePipeline.PipeOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                                .as(BasePipeline.PipeOptions.class);

options.as(DataflowPipelineOptions.class).setStreaming(true);

options.as(PubsubOptions.class).setPubsubRootUrl(pubSubEmulatorHost);

Pipeline pipeline = Pipeline.create(options);

Do anyone get an hint on what is happening and how to solve it ? Does the only solution imply to set the emulator and the pipeline in the same docker ?

1
have you tried with http://pubsub_emulator:8085 as it seems you are missing the protocol ?Mostafa Hussein
Indeed, the http:// was what missing. It seems that there is a little difference between how the python and the java SDKs are working !Dr Mouse
Glad it worked!Mostafa Hussein

1 Answers

1
votes

You can try to change the value to the following:

http://pubsub_emulator:8085

As the error complaining from missing protocol which expected to be http in your case

According to Apache Beam SDK the value expected to be a fully qualified URL:

// getPubsubRootUrl
@Default.String(value="https://pubsub.googleapis.com")
 @Hidden
java.lang.String getPubsubRootUrl()
// Root URL for use with the Google Cloud Pub/Sub API.

However if you came from a python background you will notice that the Python SDK which uses gRPC Python as showing in here expecting only the server address which consist of the address and the port

# A snippet from google-cloud-python library.
if os.environ.get("PUBSUB_EMULATOR_HOST"):
    kwargs["channel"] = grpc.insecure_channel(
        target=os.environ.get("PUBSUB_EMULATOR_HOST")
    )
grpc.insecure_channel(target, options=None)
Creates an insecure Channel to a server.

The returned Channel is thread-safe.

Parameters: 
target – The server address