I want to implement the following scenario using streaming pipeline in Apache Beam (and running it on Google DataFlow):
- Read messages from Pub/Sub (JSON strings)
- Deserialize JSONs
- Use custom field (say
timeStamp
) as a timestamp value for the processing element - Apply fixed windowing of
60 seconds
- Extract key from elemtents and group by key
- << perform further processing >>
I've tried to resolve this problem using both Java(Scala) and Python, but non of the solution worked.
- Python solution
# p is beam.Pipeline()
_ = (p | beam.io.ReadFromPubSub(subscription="my_sub")
| beam.Map(add_timestamping)
| beam.WindowInto(window.FixedWindows(60))
| beam.Map(lambda elem: elem) # exracting the key somehow, not relevant here
| beam.GroupByKey()
# (...)
| beam.io.WriteToPubSub("output_topic")
)
p.run()
add_timestamping
function as per documentation:
def add_timestamping(elem):
import json
import apache_beam as beam
msg = json.loads(elem)
unix_timestamp = msg['timeStamp'] / 1000
return beam.window.TimestampedValue(msg, unix_timestamp)
Output of Python solution:
- When using
DirectRunner
, windows are emitted and the windowing itself is more-or-less appropriate, depending on the delay. - When using
DataFlowRunner
, ALL messages are skipped with counter appearing in DataFlow UI: droppedDueToLateness.
- Java / Scala solution (I've used Scio but this happens to in clean Beam SDK in Java too)
sc.pubsubSubscription[String]("my_sub")
.applyTransform(ParDo.of(new CustomTs()))
.withFixedWindows(Duration.standardSeconds(60))
.map(x => x) // exracting the key somehow, not relevant here
.groupByKey
// (...)
.saveAsPubsub("output_topic")
Adding custom timestamp as per documentation:
import io.circe.parser._
class CustomTs extends DoFn[String, String] {
@ProcessElement
def processElement(@Element element: String, out: DoFn.OutputReceiver[String]): Unit = {
val json = parse(element).right.get
val timestampMillis: Long = json.hcursor.downField("timeStamp").as[Long].getOrElse(0)
out.outputWithTimestamp(element, new Instant(timestampMillis))
}
}
Output of Java / Scala solution:
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.IllegalArgumentException:
Cannot output with timestamp 2019-03-02T00:51:39.124Z.
Output timestamps must be no earlier than the timestamp of the current input
(2019-03-28T14:57:53.195Z) minus the allowed skew (0 milliseconds).
I cannot use DoFn.getAllowedTimestampSkew
here as it's already deprecated and I don't know what ranges of historical data will be sent.
Having the ability to process historical data is crucial for my project (this data will be sent to Pub/Sub from some store). The pipeline must work both on current data as well as historical one.
My question is: How to process the data using custom timestamps with the ability to operate on windows defined using Beam API?