I have set up a GCloud Dataflow pipeline which consumes messages from a Pub/Sub subscription, converts them to table rows and writes those rows to a corresponding BigQuery table.
Table destination is decided based on the contents of the Pub/Sub message and will occasionally lead to the situation that a table does not exist yet and has to be created first. For this I use create disposition CREATE_IF_NEEDED, which works great.
However, I have noticed that if I manually delete a newly created table in BigQuery while the Dataflow job is still running, Dataflow will get stuck and will not recreate the table. Instead I get an error:
Operation ongoing in step write-rows-to-bigquery/StreamingInserts/StreamingWriteTables/StreamingWrite for at least 05m00s without outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at
java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429) at
java.util.concurrent.FutureTask.get(FutureTask.java:191) at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:816) at
org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:881) at
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:143) at
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:115) at
org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn$DoFnInvoker.invokeFinishBundle(Unknown Source)
If I go back to BigQuery and manually recreate this table, Dataflow job will continue working.
However, I am wondering if there is a way to instruct the Dataflow pipeline to recreate the table if it gets deleted during the job run?
