I am working on an Apache Beam pipeline that reads a bunch of events from a pub/sub and then based on event type, it writes them into separate BigQuery tables.
I know that WriteToBigQuery
supports dynamic destinations but the problem in my case is that the destination is derived from the data that is read from the event. For example:
an event looks like
{
"object_id": 123,
... some metadata,
"object_data": {object related info}
}
Data that should be written to BigQuery table is under the object_data
key from the event, but, the table name is derived from other fields in the metadata.
I tried to use the side inputs params but the issue is that because each event can have different destinations, the side inputs don't update accordingly. The code is below:
class DumpToBigQuery(PTransform):
def _choose_table(self, element, table_names):
# table_names = {"table_name": "project_name.dataset.table_name}
table_name = table_names["table_name"]
return table_name
def expand(self, pcoll):
events = (
pcoll
| "GroupByObjectType" >> Map(lambda e: (e["object_type"], e))
| "Window"
>> WindowInto(
windowfn=FixedWindows(self.window_interval_seconds)
)
| "GroupByKey" >> GroupByKey()
| "KeepLastEventOnly" >> ParDo(WillTakeLatestEventForKey()
)
table_name = events | Map(lambda e: ["table_name", f"{self.project}:{self.dataset}.{e[0]}"])
table_names_dct = AsDict(table_name)
events_to_write = events | Map(lambda e: e[1]) | Map(self._drop_unwanted_fields)
return events_to_write | "toBQ" >> WriteToBigQuery(
table=self._choose_table,
table_side_inputs=(table_names_dct,),
create_disposition=BigQueryDisposition.CREATE_NEVER,
insert_retry_strategy=RetryStrategy.RETRY_NEVER,
)
You can see that the side input is taken from the other branch of the pipeline table_name
that is basically extracting the table name from the event. And then, this is given as input to WriteToBigQuery
. Unfortunately, this doesn't really work as under load, the side input is not updated and some events are using wrong destinations.
What other approach I can use in this specific case? All the docs are using static examples and don't really cover this dynamic approach.
The other thing I tried is to write a custom DoFn
that used the HTTP BigQuery client and inserts the rows, the issue here is the speed of the pipeline as is inserting around 6-7 events per seconds.