0
votes

I am working on a Apache Beam project that ran onto an issue with Dataflow service and PubsubIO related to the custom timestamp attribute. Current version of Beam SDK is 2.7.0.

In the project, we have 2 Dataflow jobs communicating via a PubSub topic and subscription:

The first pipeline (sinking data to PubSub)

This pipeline works on per-basis messages, therefore it had no custom window strategy applied besides the GlobalWindows (default by Beam). At the end of this pipeline, we sunk (wrote) all the messages which had already been assigned a map of attributes including their event timestamp (e.g. "published_at") to a PubSub topic using PubsubIO.writeMessages().

Note: if we use PubsubIO.writeMessages().withTimestampAttribute(), this method will tell PubsubIO.ShardFn, PubsubIO.WriteFn and PubsubClient to write/overwrite the sinking pipeline's processing time to this attribute in the map.

The second pipeline (reading data from PubSub)

In the second pipeline (reading pipeline), we have tried PubsubIO.readMessagesWithAttributes().withTimestampAttribute("published_at") and PubsubIO.readStrings().withTimestampAttribute("published_at") for the source.

  • When running with DirectRunner, everything worked well as expected. The messages were read from the PubSub subscription and outputted to the downstream stages with a ProcessContext.timestamp() equals to their event timestamp "published_at".
  • But when running with DataflowRunner, the ProcessContext.timestamp() was always set near real time which is closed to the sinking pipeline's processing time. We checked and can confirm that those timestamps were not from PubSub's publishing time. All the data were then assigned to the wrong windows compared to their event domain timestamp. We expected late data to be dropped not to be assigned into invalid windows.

Note: We had left the Pubsub topic populated with a considerable amount of data before we turned on the second pipeline to have some kind of historical/late data.

Pubsub messages with invalid context timestamp

Assumed root cause

Looking deeper into the source code of DataflowRunner, we can see that Dataflow Service uses a completely different Pubsub code (overriding the PubsubIO.Read at the pipeline's construction time) to Read from and Sink to Pubsub.

So if we want to use the Beam SDK's PubsubIO, we have to use the experimental option "enable_custom_pubsub_source". But so far no luck yet as we have run into this issue https://jira.apache.org/jira/browse/BEAM-5674 and have not been able to test Beam SDK' Pubsub codes.

Workaround solution

Our current workaround is that, after the step assigning windows to the messages, we implemented a DoFn to check their event timestamp against their IntervalWindow. If the windows are invalid, then we just drop the messages and later on run a weekly or half a week jobs to correct them from a historical source. It is better to have some missing data rather than the improperly calculated ones.

Messages dropped due to invalid windows

Please share with us experiences on this case. We know that from the perspective of the Dataflow watermark management, the watermark is said to adjust itself into the current real time if the ingested data is sparsed (not dense enough overtime).

We also believe that we are misunderstanding something about the way Dataflow service maintains the PubsubUnboundedSource's output timestamp as we are still new to Apache Beam and Google's Dataflow so there are things that we have not come to know of yet.

Many Thanks!

1

1 Answers

1
votes

I found the fix for this issue. In my sinking pipeline, the timestamp attribute is set with wrong date format compared to RFC 3339 standard. The formatted dates missed 'Z' character. We either did fix the 'Z' character or changed to use the milliseconds since epoch. Both worked well.

But one thing is that when Dataflow service could not parse the wrong date formats, it did warn or throw error but instead took the processing time for all the elements therefore, they were assigned to the wrong event_time windows.