2
votes

I was able to create a dataflow pipeline which reads data from pub/sub and after processing it writes to big query in streaming mode.

Now instead of stream mode i would like to run my pipeline in batch mode to reduce the costs.

Currently my pipeline is doing streaming inserts in bigquery with dynamic destinations. I would like to know if there is a way to perform a batch insert operation with dynamic destinations.

Below is the

public class StarterPipeline {  
   public interface StarterPipelineOption extends PipelineOptions {

    /**
     * Set this required option to specify where to read the input.
     */
    @Description("Path of the file to read from")
    @Default.String(Constants.pubsub_event_pipeline_url)
    String getInputFile();

    void setInputFile(String value);

}

@SuppressWarnings("serial")
public static void main(String[] args) throws SocketTimeoutException {

    StarterPipelineOption options = PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(StarterPipelineOption.class);

    Pipeline p = Pipeline.create(options);

    PCollection<String> datastream = p.apply("Read Events From Pubsub",
            PubsubIO.readStrings().fromSubscription(Constants.pubsub_event_pipeline_url));

    PCollection<String> windowed_items = datastream.apply(Window.<String>into(new GlobalWindows())
            .triggering(Repeatedly.forever(
                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(300))))
            .withAllowedLateness(Duration.standardDays(10)).discardingFiredPanes());

    // Write into Big Query
     windowed_items.apply("Read and make event table row", new
     ReadEventJson_bigquery())

     .apply("Write_events_to_BQ",
     BigQueryIO.writeTableRows().to(new DynamicDestinations<TableRow, String>() {
     public String getDestination(ValueInSingleWindow<TableRow> element) {
     String destination = EventSchemaBuilder
     .fetch_destination_based_on_event(element.getValue().get("event").toString());
     return destination;
     }

     @Override
     public TableDestination getTable(String table) {
     String destination =
     EventSchemaBuilder.fetch_table_name_based_on_event(table);
     return new TableDestination(destination, destination);
     }

     @Override
     public TableSchema getSchema(String table) {
     TableSchema table_schema =
     EventSchemaBuilder.fetch_table_schema_based_on_event(table);
     return table_schema;
     }
     }).withCreateDisposition(CreateDisposition.CREATE_NEVER)
     .withWriteDisposition(WriteDisposition.WRITE_APPEND)
     .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));

    p.run().waitUntilFinish();

    log.info("Events Pipeline Job Stopped");

}

}

2

2 Answers

0
votes

Batch or Streaming are determined by the PCollection, so you would need to transform your data stream PCollection from Pub/Sub into a batch PCollection to write to BigQuery. The transform that allows to do this is GroupIntoBatches<K,InputT>.

Note that since this Transform uses Key-Value pairs, batches will contain only elements of a single key. For non-KV elements, check this related answer.

Once you have created your PCollection as batch using this transform, then apply the BigQuery write with Dynamic Destinations as you did with the stream PCollection.

0
votes

You can limit the costs by using file loads for Streaming jobs. The Insertion Method section states that BigQueryIO.Write supports two methods of inserting data into BigQuery specified using BigQueryIO.Write.withMethod (org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method). If no method is supplied, then a default method will be chosen based on the input PCollection. See BigQueryIO.Write.Method for more information about the methods.

The different insertion methods provide different tradeoffs of cost, quota, and data consistency. Please see BigQuery documentation for more information about these tradeoffs.