0
votes

using scio version 0.4.7, I have a streaming job that's listening to a pubsub topic, I'm using event processing here with 'timestamp' attribute present on the message properties in RFC3339

val rtEvents: SCollection[RTEvent] = sc.pubsubTopic(args("topic"), timestampAttribute = "timestamp").map(jsonToObject)
val windowedEvents = rtEvents.withFixedWindows(Duration.standardMinutes(1L),
  options = WindowOptions(trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
    accumulationMode = DISCARDING_FIRED_PANES,
    allowedLateness = Duration.standardSeconds(1L)
  )
)

I use windowedEvents for further aggregation and calculations in the pipeline

doSomeAggregation(windowedEvents)

def doSomeAggregation(events: SCollection[RTEvent]): SCollection[(String, Map[String, Int])] =
        events.map(e => (e.properties.key, (e.properties.category, e.id)))
          .groupByKey
          .map { case (key, tuple: Iterable[(String, String)]) =>
            val countPerCategory: Map[String, Int] = tuple.groupBy(_._1)
              .mapValues(_.toList.distinct.size)
            //some other http post and logging here
            (key, countPerCategory)
          }

    sc.close().waitUntilFinish()

If i run the job with the following autoscaling parameters on google dataflow

--workerMachineType=n1-standard-8 --autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=4

the job runs and the fixed windows fire correctly if there is only one worker running. As soon as the job autoscales up to more > 1 worker, the fixed windows stop firing and initial pubsub step's system lag and wall time keeps growing, while data watermark does not move forward.

Is there something wrong with my trigger setup? Has anyone else experienced this on dataflow runner or other runners? Any help is greatly appreciated. I'm inclined to drop scio and revert to back to apache-beam java sdk if I can't solve this.

1
I don't see anything wrong with the application. It is mostly not an Scio issue. Do you have a job_id we can take a look at? Btw, did the pipeline work with multiple workers? (you can start it with two workers with --numWorkers=2 option).Raghu Angadi
Hey Raghu, thanks for the reply. Here's the job_id where i started with a single worker, and the problem occurred after the job upscaled to 4 workers : 2018-02-11_20_56_03-1568786864154833556. I did start the job with 2 workers, and it did not fire any windows at all - job id : 2018-02-11_20_43_27-4213713692706340214. I guess you work for google, if you'd like a formal ticket to debug this let me know.ASingh
Filing a formal ticket will be good. Please go head and mentioned this post. Meanwhile I will request permission to look at the job status.Raghu Angadi
This job ran only for 10 minutes. If you are using custom timestamp, it could take some time for Dataflow to scan the timestamp of the messages in pubsub and advance it. Did you have a lot of messages to process at the start?Raghu Angadi
if you're talking about the job 4213713692706340214, yes it ran only for 10 odd minutes before i killed it since I expected the window to fire in the first minute itself. I have a continuous stream of production data on the pub sub topic, so yes I did have a lot of messages to process at the start. I can give you another job id if you'd like where I start with a single worker and everything runs fine.ASingh

1 Answers

2
votes

I managed to resolve the issue. In my current setup the workers were unable to communicate with each other. The job silently fails without any timeout errors (something beam should probably propagate up as an error).

If you're using dataflow as your runner, make sure the firewall defined for dataflow on your project is defined for 'default' network.

If the dataflow firewall is defined for your network, you will need to pass additional runtime parameter into your job

--workerMachineType=n1-standard-8 --autoscalingAlgorithm=THROUGHPUT_BASED --maxNumWorkers=4 --network='your-network'