The approach I took (works in the streaming mode, too):
- Define a custom window for the incoming record
Convert the window into the table/partition name
p.apply(PubsubIO.Read
.subscription(subscription)
.withCoder(TableRowJsonCoder.of())
)
.apply(Window.into(new TablePartitionWindowFn()) )
.apply(BigQueryIO.Write
.to(new DayPartitionFunc(dataset, table))
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
);
Setting the window based on the incoming data, the End Instant can be ignored, as the start value is used for setting the partition:
public class TablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
private IntervalWindow assignWindow(AssignContext context) {
TableRow source = (TableRow) context.element();
String dttm_str = (String) source.get("DTTM");
DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZoneUTC();
Instant start_point = Instant.parse(dttm_str,formatter);
Instant end_point = start_point.withDurationAdded(1000, 1);
return new IntervalWindow(start_point, end_point);
};
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
return Arrays.asList(assignWindow(c));
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return false;
}
@Override
public IntervalWindow getSideInputWindow(BoundedWindow window) {
if (window instanceof GlobalWindow) {
throw new IllegalArgumentException(
"Attempted to get side input window for GlobalWindow from non-global WindowFn");
}
return null;
}
Setting the table partition dynamically:
public class DayPartitionFunc implements SerializableFunction<BoundedWindow, String> {
String destination = "";
public DayPartitionFunc(String dataset, String table) {
this.destination = dataset + "." + table+ "$";
}
@Override
public String apply(BoundedWindow boundedWindow) {
// The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
String dayString = DateTimeFormat.forPattern("yyyyMMdd")
.withZone(DateTimeZone.UTC)
.print(((IntervalWindow) boundedWindow).start());
return destination + dayString;
}}
Is there a better way of achieving the same outcome?