using scio version 0.4.7, I have a streaming job that's listening to a pubsub topic, I'm using event processing here with 'timestamp' attribute present on the message properties in RFC3339
val rtEvents: SCollection[RTEvent] = sc.pubsubTopic(args("topic"), timestampAttribute = "timestamp").map(jsonToObject)
val windowedEvents = rtEvents.withFixedWindows(Duration.standardMinutes(1L),
options = WindowOptions(trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
accumulationMode = DISCARDING_FIRED_PANES,
allowedLateness = Duration.standardSeconds(1L)
)
)
I use windowedEvents for further aggregation and calculations in the pipeline
doSomeAggregation(windowedEvents)
def doSomeAggregation(events: SCollection[RTEvent]): SCollection[(String, Map[String, Int])] =
events.map(e => (e.properties.key, (e.properties.category, e.id)))
.groupByKey
.map { case (key, tuple: Iterable[(String, String)]) =>
val countPerCategory: Map[String, Int] = tuple.groupBy(_._1)
.mapValues(_.toList.distinct.size)
//some other http post and logging here
(key, countPerCategory)
}
sc.close().waitUntilFinish()
If i run the job with the following autoscaling parameters on google dataflow
--workerMachineType=n1-standard-8 --autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=4
the job runs and the fixed windows fire correctly if there is only one worker running. As soon as the job autoscales up to more > 1 worker, the fixed windows stop firing and initial pubsub step's system lag and wall time keeps growing, while data watermark does not move forward.
Is there something wrong with my trigger setup? Has anyone else experienced this on dataflow runner or other runners? Any help is greatly appreciated. I'm inclined to drop scio and revert to back to apache-beam java sdk if I can't solve this.