0
votes

I have a batch Apache Beam job that takes a file from GCS as input. My goal is to move the file to one of two GCS buckets depending on the pipeline's state after execution. If the pipeline executed successfully, move the file to bucket A, otherwise, if the pipeline had any kind of unhandled exceptions during execution, move the file to bucket B.

I'm using Apache Beam version 2.24.0 for Java, and I need to create a Dataflow template in order to run it multiple times with different input files.

Currently my approach is to run the pipeline with pipeline.run().waitUntilFinish(), wrapping the call with a try-catch and using the resulting PipelineResult.State object (null when there was an exception) to decide to which bucket to move the file. Using the DirectRunner it works fine, but when creating a Dataflow template and executing it with DataflowRunner, it completely ignores any code outside of the pipeline execution graph.

Is there any way to execute code defined outside of the Dataflow template's execution graph? I would like to achieve this directly in the same Java code, without the need to execute another program after the job finishes (e.g. Cloud Function).

1
Do you use custom template?aga
@Ines Yes, I'm creating my own custom templateAlexander

1 Answers

0
votes

The reason for that could be that java Exception is thrown on the Dataflow worker and cannot be caught on host that submits the jobs.

If you want to to move the file to one of GCS buckets depending on the pipeline's state after execution, I would recommend to use the Dataflow API to query the final state of the job:

PipelineResult res = p.run();
  String jobId = ((DataflowPipelineJob) res).getJobId();
  DataflowClient client = DataflowClient.create(options);
  /* use client to poll final state */

Then, you can specify moving the files to the bucket A only when the job is successful and to the bucket B when the job failed.

Additionally, I want to share with you some useful links: exanple with waitUntilFinish, SO thread.