3
votes

I have written a streaming pipeline using the Google Cloud Dataflow SDK, but I want to test my pipelines locally. My pipeline takes input data from Google Pub/Sub.

Is it possible to run jobs that access Pub/Sub (pubsubIO) using the DirectPipelineRunner (local execution, not in Google Cloud)?

I am running into permissions issues while logged in as my normal user account. I am the owner of the project with the pub/sub topic I am trying to access.

4
By "trying to access", what did you do exactly? Every operations should just work if you're an owner of the project.Takashi Matsuo

4 Answers

3
votes

The InProcessPipelineRunner is a new version of the DirectPipelineRunner introduce in Dataflow SDK for Java 1.6.0 that includes support for unbounded PCollections.

(Note: In Apache Beam, this functionality has already been added to the DirectRunner, but in the Dataflow SDK for Java, we can't do that until 2.0 since its better checking of the model may cause additional test failures, which we consider a backwards incompatible change. Hence the addition of the companion InProcessPipelineRunner for the time being.)

There's also some great new support for testing late and out of order data.

2
votes

PubsubIO is not currently supported in the DirectPipelineRunner. When used locally, you will get an error stating that there is "no evaluator registered for PubsubIO.Read".

It is likely that your permission issues are coming from some other source.

0
votes

Just to help anyone who would search for this,

With the latest version, you can do this. If you want to run the pipeline in local, use "DirectRunner" to run this in local. Use "DataflowRunner" to run this in the cloud.

set the staging location and the runner as shown below.

streamingOption.setStagingLocation(PipelineConstants.PUBSUB_STAGING_LOCATION);

streamingOption.setRunner(DataflowRunner.class);

or pass it as arguments.

Can you elaborate a bit more about the permission issue that you have faced?

-1
votes

It actually is possible but the DirectPipelineRunner doesn't support unbounded datasources. Therefore you'll have to set a maxReadTime or maxNumRecords like so:

PubsubIO.Read.topic("projects/<project-id>/topics/<topic>").maxNumRecords(1000);

From PubSub documentation:

A PTransform that continuously reads from a Cloud Pub/Sub stream and returns a PCollection of Strings containing the items from the stream. When running with a PipelineRunner that only supports bounded PCollections (such as DirectPipelineRunner), only a bounded portion of the input Pub/Sub stream can be processed. As such, either PubsubIO.Read.Bound.maxNumRecords(int) or PubsubIO.Read.Bound.maxReadTime(Duration) must be set.