0
votes

I am working on an Apache Beam pipeline that reads a bunch of events from a pub/sub and then based on event type, it writes them into separate BigQuery tables.

I know that WriteToBigQuery supports dynamic destinations but the problem in my case is that the destination is derived from the data that is read from the event. For example: an event looks like

{
 "object_id": 123,
 ... some metadata,
 "object_data": {object related info}
}

Data that should be written to BigQuery table is under the object_data key from the event, but, the table name is derived from other fields in the metadata. I tried to use the side inputs params but the issue is that because each event can have different destinations, the side inputs don't update accordingly. The code is below:

class DumpToBigQuery(PTransform):

    def _choose_table(self, element, table_names):
        # table_names = {"table_name": "project_name.dataset.table_name}
        table_name = table_names["table_name"]
        return table_name

    def expand(self, pcoll):
        events = (
            pcoll
            | "GroupByObjectType" >> Map(lambda e: (e["object_type"], e))
            | "Window"
            >> WindowInto(
                windowfn=FixedWindows(self.window_interval_seconds)
            )
            | "GroupByKey" >> GroupByKey()
            | "KeepLastEventOnly" >> ParDo(WillTakeLatestEventForKey()
        )

        table_name = events | Map(lambda e: ["table_name", f"{self.project}:{self.dataset}.{e[0]}"])
        table_names_dct = AsDict(table_name)

        events_to_write = events | Map(lambda e: e[1]) | Map(self._drop_unwanted_fields)

        return events_to_write | "toBQ" >> WriteToBigQuery(
            table=self._choose_table,
            table_side_inputs=(table_names_dct,),
            create_disposition=BigQueryDisposition.CREATE_NEVER,
            insert_retry_strategy=RetryStrategy.RETRY_NEVER,
        )

You can see that the side input is taken from the other branch of the pipeline table_name that is basically extracting the table name from the event. And then, this is given as input to WriteToBigQuery. Unfortunately, this doesn't really work as under load, the side input is not updated and some events are using wrong destinations.

What other approach I can use in this specific case? All the docs are using static examples and don't really cover this dynamic approach.

The other thing I tried is to write a custom DoFn that used the HTTP BigQuery client and inserts the rows, the issue here is the speed of the pipeline as is inserting around 6-7 events per seconds.

1

1 Answers

2
votes

I had a similar problem, for which I have a work around.

I see that you have create_disposition=BigQueryDisposition.CREATE_NEVER so the list of tables is known before the code runs. Perhaps it is unweildy, but it is known. I have a DoFn which yeilds many TaggedOutputs its process method. Then my pipeline looks like:

parser_outputs = ['my', 'list', 'of', 'tables']
with beam.Pipeline(options=PipelineOptions(), argv=args) as p:
    pipe = (
        p
        | "Start" >> beam.Create(["example row"])
        | "Split"
        >> beam.ParDo(MySplitFn()).with_outputs(*parser_outputs)
    )

    for output in parser_outputs:
        pipe[output] | "write {}".format(output) >> beam.io.WriteToBigQuery(
            bigquery.TableReference(
                projectId=options.projectId, datasetId=DATASET_ID, tableId=output
            ),
            schema=padl_shared.getBQSchema(parser.getSchemaForDataflow(rowTypeName=output)),
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
        )

    p.run().wait_until_finish()