0
votes

I have an issue regarding goolge dataflow.

I'm writing a dataflow pipeline which reads data from PubSub, and write to BigQuery, it's works.
Now I have to handle late data and i was following some examples on intenet but it's not working properly, here is my code:

pipeline.apply(PubsubIO.readStrings()
            .withTimestampAttribute("timestamp").fromSubscription(Constants.SUBSCRIBER))
        .apply(ParDo.of(new ParseEventFn()))        
        .apply(Window.<Entity> into(FixedWindows.of(WINDOW_SIZE))
            // processing of late data.
            .triggering(
                    AfterWatermark
                            .pastEndOfWindow()
                            .withEarlyFirings(
                                    AfterProcessingTime
                                            .pastFirstElementInPane()
                                            .plusDelayOf(DELAY_SIZE))
                            .withLateFirings(AfterPane.elementCountAtLeast(1)))
            .withAllowedLateness(ALLOW_LATE_SIZE)
            .accumulatingFiredPanes())
        .apply(ParDo.of(new ParseTableRow()))
        .apply("Write to BQ", BigQueryIO.<TableRow>write()...

Here is my pubsub message:

{
...,
"timestamp" : "2015-08-31T09:52:25.005Z"
}

When I manually push some messages(go to PupsubTopic and publish) with timestamp is << ALLOW_LATE_SIZE but these messages are still passed.

1
I don't really understand your question. Can you reword/edit it to make it clearer as to exactly what the problem is? - Graham Polley
Sorry if my explain is not clear. - Nhjm
My question is I had added customs timestamp when calling data from pubsub,but when I applied trigger to get late data with timestamp about 10mins with current event time,then all data I was pushed manually with timestamp before 15mins(or any data before so much more time) still collected into my data list in ParseTableRow method. Pls help me to explain this,I thing that trigering is not fired or I'm missing some code to handle lata data? Thanks for any advise. - Nhjm

1 Answers

0
votes

You should specify the allowed lateness formally using the "Duration" object as: .withAllowedLateness(Duration.standardMinutes(ALLOW_LATE_SIZE)), assuming you have set the value of ALLOW_LATE_SIZE in minutes.

You may check the documentation page for "Google Cloud Dataflow SDK for Java", specifically the "Triggers" sub-chapter.