0
votes

I'm writing a Beam data pipeline reading from an unbounded source like Kafka. I am not performing any analytic functions. I would like to transform the elements and write to the sink let's say after the record count of the PCollection reaches a certain threshold. This is to throttle the data being sent to the sink

Looked at the existing triggers but couldn't figure if they are a good fit

1
Are you using a Beam IO sink or do you push your data in a DoFn? If within a DoFn do you have multiple keys that you run a gbk against before sending to the sink?Reza Rokni
This sounds like a perfect use case for AfterPane.elementCountAtLeast(threshold), which appears in the Composite Triggers section of the Beam Programming Guide (beam.apache.org/documentation/programming-guide/…).Jeff Klukas

1 Answers

0
votes

I have tested triggers and they work as expected, here is a scala code example

val data: PCollection[Type] = results
  .apply(
  Window
    .into[Type](FixedWindows.of(Duration.millis(2000)))
    .withAllowedLateness(Duration.millis(1000))
    .triggering(AfterPane.elementCountAtLeast(4)
    .accumulatingFiredPanes()
)

It waits for 4 elements then trigger the window.