I'm running a batch Dataflow job which reads all the rows from a BigQuery table, converts them into JSON strings, and then writes the strings out to a PubSub topic. This template will be reused with the same or different parameters and should always publish found rows in BigQuery regardless if it is the same rows as the last job.
The problem I'm experiencing is after every job run I must upload the template again or the next job will succeed without reading anything from BigQuery, even with the same parameters as the first successful job. The worker logs in Google Cloud Console for the second job reports the following when trying to create the BigQueryIO job:
Request failed with code 409, performed 0 retries due to IOExceptions, performed 0 retries due to unsuccessful status codes, HTTP framework says request can be retried, (caller responsible for retrying): https://bigquery.googleapis.com/bigquery/v2/projects/my-project/jobs.
According to Google Cloud documentation the 409 error is either a conflict or duplicate. The job id used for both BigQueryIO jobs are the exact same, thus a duplicate job id is causing the second DataFlow job to fail.
I've tried with different parameters and tables using the same template but the job id never changes until I upload the template again via the below Maven command per Google Cloud Documentation on Custom Templates
mvn compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.BigQuery \
-Dexec.args="--runner=DataflowRunner \
--project=my-project \
--stagingLocation=gs://bkt_test/my_test/staging \
--region=us-central1 \
--templateLocation=gs://bkt_test/my_test/templates/BigQueryToPubSub"
Why isn't the BigQueryIO job id unique for each run? Is this a bug or is there a way I can specify the format of the job id to ensure uniqueness?
Code Reference:
public static void main(String[] args) {
PipelineOptionsFactory.register(BigQueryToPubSubOptions.class);
BigQueryToPubSubOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryToPubSubOptions.class);
Pipeline pipe = Pipeline.create(options);
pipe
.apply(
"Read BigQuery Rows",
BigQueryIO
.readTableRows()
.from(options.getInputTable())
.withoutValidation()
)
.apply(
"Convert Row to String",
MapElements.via(new SimpleFunction<TableRow, String>() {
@Override
public String apply(TableRow input) {
ObjectMapper mapper = new ObjectMapper();
ObjectNode json = mapper.createObjectNode();
input.forEach((key, value) -> {
LOG.info("Column: " + key + "\tValue: " + value);
json.put(key, (String) value);
});
try {
return mapper.writeValueAsString(json);
} catch (JsonProcessingException e) {
LOG.error("Failed to convert row to json", e);
return null;
}
}
})
)
.apply(
"Publish to Topic",
PubsubIO
.writeStrings()
.to(options.getOutputTopic())
)
.getPipeline()
.run();
}
public interface BigQueryToPubSubOptions extends PipelineOptions {
@Description("The BigQuery input table to read from")
ValueProvider<String> getInputTable();
void setInputTable(ValueProvider<String> intputTable);
@Description("The output PubSub topic to write the rows to")
ValueProvider<String> getOutputTopic();
void setOutputTopic(ValueProvider<String> outputTopic);
}