1
votes

We are currently developing an Apache Beam pipeline to read data from GCP Pub/Sub and to write the data received to a bucket in AWS S3.

We are using TextIO.write in beam-sdks-java-io.amazon-web-services to write to S3.

TextIO.write()
        .withWindowedWrites()
        .withNumShards(options.getNumShards)
        .withTempDirectory(FileBasedSink.convertToFileResourceIfPossible(options.getTempLocation))
        .to(FileBasedSink.convertToFileResourceIfPossible(options.getOutputDirectory))

So first we tested this pipeline locally using the DirectRunner and that worked fine. (Data incoming from Pub/Sub was received by the pipeline and written to S3.

options.setRunner(classOf[DirectRunner])
options.setStagingLocation("./outputFolder/staging")
options.setTempLocation("s3://my-s3-bucket/temp")
options.setOutputDirectory("s3://my-s3-bucket/output")

In the last part we wanted to run this pipeline using Dataflow runner without any code change, so we modified the code to use the DataflowRunner

options.setRunner(classOf[DataflowRunner])
options.setStagingLocation("gs://my-gcs-bucket/binaries")
options.setGcpTempLocation("gs://my-gcs-bucket/temp")
options.setTempLocation("s3://my-s3-bucket/temp")
options.setOutputDirectory("s3://my-s3-bucket/output")

With this setting, data is received by the pipeline from pub/sub but it does not write to S3. There are also no errors written to the Dataflow logs in StackDriver.

Does anyone know what the issue could be? Is the pipeline options config incorrect? Or is the write to S3 failing silently?

Does anyone have suggestion on how to config the logs in beam-sdks-java-io.amazon-web-services to output the DEBUG level logging?

Thanks!

1

1 Answers

1
votes

To execute your pipeline using the DataflowRunner, you must set the following fields in PipelineOptions:

  • project - The ID of your Google Cloud project
  • runner - The pipeline runner that will parse your program and construct your pipeline.
  • gcpTempLocation - A Cloud Storage path for Dataflow to stage any temporary files. You must create this bucket ahead of time, before running your pipeline.
  • stagingLocation - A Cloud Storage bucket for Dataflow to stage your binary files. If you do not set this option, what you specified for the tempLocation will be used for the staging location as well.

You need to specify project option, e.g. options.setProject("my-project-id");.

One important thing, if you use the Apache Beam SDK for Java 2.15.0 or later, you must also specify region option. Please, refer to the official documentation for more information.

I hope it helps.