1
votes

My use case is simple: read event logs from Pub/Sub subscription, parse them and save into BigQuery. Because the number of events is expected to grow significantly and I work with unbounded data source I decided to configure sharding in BigQuery: store events into daily tables based on timestamp from the event data (what is called "event time" in the Beam documentation). The question I have is do I need to configure windowing in my case or I can just leave the default configuration which implicitly uses global window? The reason I'm asking is because most of the examples of BigQuery sharding I found assume usage of windowing configuration. But in my case, since I'm not using any grouping operations as GroupByKey and Combine, looks like I should be just fine without any windowing configuration. Or are there any reasons for me to use windowing anyway, maybe it affects how BigQueryIO performs for example?

The way I do sharding now is below.

static class TableNamingFn implements SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination> {
    @Override
    public TableDestination apply(ValueInSingleWindow<TableRow> input) {
        DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);

        TableReference reference = new TableReference();
        reference.setProjectId("test-project");
        reference.setDatasetId("event_log");

        DateTime timestamp = new DateTime(input.getValue().get("event_timestamp"), DateTimeZone.UTC);
        reference.setTableId("events_" + formatter.print(timestamp));
        return new TableDestination(reference, null);
    }
}

// And then
eventRows.apply("BigQueryWrite", BigQueryIO.writeTableRows()
        .to(new TableNamingFn())
        .withSchema(EventSchema)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
1

1 Answers

1
votes

It looks like you are trying to shard the table by date, have you considered using a Date-partitioned Table instead. You could update where you set your table id to using the partition decorator, something like:

reference.setTableId("events$" + formatter.print(timestamp));

This article covers using BigQuery's partitioned tables with Apache Beam. In particular this snippet of code is probably what you want to use: https://gist.githubusercontent.com/alexvanboxel/902099911d86b6827c8ea07f4e1437d4/raw/cc8246eb9b3219550379cfe7b3b7abca8fc77401/medium_bq_tableref_partition