1
votes

I'm currently having a streaming pipeline processing events and pushing them to a BigQuery table named EventsTable:

TransactionID    EventType
1                typeA
1                typeB
1                typeB
1                typeC
2                typeA
2                typeC
3                typeA

I want to add a branch to my processing pipeline and also "group" together transaction-related data into a TransactionsTable. Roughly, the value in the type columns in the TransactionsTable would be the count of the related eventType for a given transaction. With the previous example events, the output would look like this:

TransactionID      typeA     typeB     typeC
1                  1         2         1
2                  1         0         1
3                  1         0         0

The number of "type" columns would be equal to the number of different eventType that exists within the system.

I'm trying to see how I could do this with Dataflow, but cannot find any clean way to do it. I know that PCollections are immutable, so I cannot store the incoming data in a growing PCollection structure that would queue the incoming events up to the moment where the needed other elements are present and that I could write them to the second BigQuery table. Is there a sort of windowing function that would allow to do this with Dataflow (like queuing the events in a temporary windowed structure with a sort of expiry date or something)?

I could probably do something with batched jobs and PubSub, but this would be far more complex. On the other hand, I do understand that Dataflow is not meant to have ever growing data structures and that the data, once it goes in, has to go through the pipeline and exit (or be discarded). Am I missing something?

1
Does every transaction have 3 associated values? If not, is there anything special about why you chose Value1, Value2 and Value3 as columns? Any more details about the transactions and the structure of the Key/Value pairs?Ben Chambers
I did edit the original post to be clearer and give more details. Hope this helps!kzeon

1 Answers

1
votes

In general, the easiest way to do this kind of "aggregate data across many events" is to use a CombineFn which allows you to combine all of the values associated with a specific key. This is typically more efficient than just queueing the events, because it only needs to accumulate the result rather than accumulate all of the events.

For your specific case, you could create a custom CombineFn. The accumulator would be a Map<EventType, Long>. For instance:

public class TypedCountCombineFn
    extends CombineFn<EventType, Map<EventType, Long>, TransactionRow> {
  @Override
  public Map<EventType, Long> createAccumulator() {
    return new HashMap<>();
  }
  @Override
  public Map<EventType, Long> addInput(
      Map<EventType, Long> accum, EventType input) {
    Long count = accum.get(input);
    if (count == null) { count = 0; accum.put(input, count); }
    count++;
    return accum;
  }
  @Override
  public Map<EventType, Long> mergeAccumulators(
      Iterable<Map<EventType, Long>> accums) {
    // TODO: Sum up all the counts for similar event types
  }
  @Override
  public TransactionRow extractOutput(Map<EventType, Long> accum) {
    // TODO: Build an output row from the per-event-type accumulator
  }
}

Applying this CombineFn can be done globally (across all transactions in a PCollection) or per-key (such as per transaction ID):

PCollection<EventType> pc = ...;

// Globally
PCollection<TransactionRow> globalCounts = pc.apply(Combine.globally(new TypedCountCombineFn()));

// PerKey
PCollection<KV<Long, EventType>> keyedPC = pc.apply(WithKeys.of(new SerializableFunction<EventType, Long>() {
  @Override
  public long apply(EventType in) {
    return in.getTransactionId();
  }
});
PCollection<KV<Long, TransactionRow>> keyedCounts =
  keyedPC.apply(Combine.perKey(new TypedCountCombineFn()));