I am working on a Apache Beam project that ran onto an issue with Dataflow service and PubsubIO related to the custom timestamp attribute. Current version of Beam SDK is 2.7.0.
In the project, we have 2 Dataflow jobs communicating via a PubSub topic and subscription:
The first pipeline (sinking data to PubSub)
This pipeline works on per-basis messages, therefore it had no custom window strategy applied besides the GlobalWindows
(default by Beam). At the end of this pipeline, we sunk (wrote) all the messages which had already been assigned a map of attributes including their event timestamp (e.g. "published_at") to a PubSub topic using PubsubIO.writeMessages()
.
Note: if we use PubsubIO.writeMessages().withTimestampAttribute()
, this method will tell PubsubIO.ShardFn
, PubsubIO.WriteFn
and PubsubClient
to write/overwrite the sinking pipeline's processing time to this attribute in the map.
The second pipeline (reading data from PubSub)
In the second pipeline (reading pipeline), we have tried PubsubIO.readMessagesWithAttributes().withTimestampAttribute("published_at")
and PubsubIO.readStrings().withTimestampAttribute("published_at")
for the source.
- When running with DirectRunner, everything worked well as expected. The messages
were read from the PubSub subscription and outputted to the
downstream stages with a
ProcessContext.timestamp()
equals to their event timestamp"published_at"
. - But when running with DataflowRunner, the
ProcessContext.timestamp()
was always set near real time which is closed to the sinking pipeline's processing time. We checked and can confirm that those timestamps were not from PubSub's publishing time. All the data were then assigned to the wrong windows compared to their event domain timestamp. We expected late data to be dropped not to be assigned into invalid windows.
Note: We had left the Pubsub topic populated with a considerable amount of data before we turned on the second pipeline to have some kind of historical/late data.
Pubsub messages with invalid context timestamp
Assumed root cause
Looking deeper into the source code of DataflowRunner, we can see that Dataflow Service uses a completely different Pubsub code (overriding the PubsubIO.Read at the pipeline's construction time) to Read from and Sink to Pubsub.
So if we want to use the Beam SDK's PubsubIO, we have to use the experimental option "enable_custom_pubsub_source"
. But so far no luck yet as we have run into this issue https://jira.apache.org/jira/browse/BEAM-5674 and have not been able to test Beam SDK' Pubsub codes.
Workaround solution
Our current workaround is that, after the step assigning windows to the messages, we implemented a DoFn
to check their event timestamp against their IntervalWindow
. If the windows are invalid, then we just drop the messages and later on run a weekly or half a week jobs to correct them from a historical source. It is better to have some missing data rather than the improperly calculated ones.
Messages dropped due to invalid windows
Please share with us experiences on this case. We know that from the perspective of the Dataflow watermark management, the watermark is said to adjust itself into the current real time if the ingested data is sparsed (not dense enough overtime).
We also believe that we are misunderstanding something about the way Dataflow service maintains the PubsubUnboundedSource's output timestamp as we are still new to Apache Beam and Google's Dataflow so there are things that we have not come to know of yet.
Many Thanks!