10
votes

Suppose I have a PCollection<Foo> and I want to write it to multiple BigQuery tables, choosing a potentially different table for each Foo.

How can I do this using the Apache Beam BigQueryIO API?

2

2 Answers

23
votes

This is possible using a feature recently added to BigQueryIO in Apache Beam.

PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
  @Override
  public TableDestination apply(ValueInSingleWindow<Foo> value) {  
    Foo foo = value.getValue();
    // Also available: value.getWindow(), getTimestamp(), getPane()
    String tableSpec = ...;
    String tableDescription = ...;
    return new TableDestination(tableSpec, tableDescription);
  }
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
  @Override
  public TableRow apply(Foo foo) {
    return ...;
  }
}).withSchema(...));

Depending on whether the input PCollection<Foo> is bounded or unbounded, under the hood this will either create multiple BigQuery import jobs (one or more per table depending on amount of data), or it will use the BigQuery streaming inserts API.

The most flexible version of the API uses DynamicDestinations, which allows you to write different values to different tables with different schemas, and even allows you to use side inputs from the rest of the pipeline in all of these computations.

Additionally, BigQueryIO has been refactored into a number of reusable transforms that you can yourself combine to implement more complex use cases - see files in the source directory.

This feature will be included in the first stable release of Apache Beam and into the next release of Dataflow SDK (which will be based on the first stable release of Apache Beam). Right now you can use this by running your pipeline against a snapshot of Beam at HEAD from github.

3
votes

As of Beam 2.12.0, this feature is available in the Python SDK as well. It is marked as experimental, so you will have to pass --experiments use_beam_bq_sink to enable it. You'd do something like so:

def get_table_name(element):
  if meets_some_condition(element):
    return 'mytablename1'
  else:
    return 'mytablename2'


p = beam.Pipeline(...)

my_input_pcoll = p | ReadInMyPCollection()

my_input_pcoll | beam.io.gcp.bigquery.WriteToBigQuery(table=get_table_name)

The new sink supports a number of other options, which you can review in the pydoc