1
votes

I want to implement the following scenario using streaming pipeline in Apache Beam (and running it on Google DataFlow):

  1. Read messages from Pub/Sub (JSON strings)
  2. Deserialize JSONs
  3. Use custom field (say timeStamp) as a timestamp value for the processing element
  4. Apply fixed windowing of 60 seconds
  5. Extract key from elemtents and group by key
  6. << perform further processing >>

I've tried to resolve this problem using both Java(Scala) and Python, but non of the solution worked.

  1. Python solution
# p is beam.Pipeline()
_ = (p | beam.io.ReadFromPubSub(subscription="my_sub")
        | beam.Map(add_timestamping)
        | beam.WindowInto(window.FixedWindows(60))
        | beam.Map(lambda elem: elem) # exracting the key somehow, not relevant here
        | beam.GroupByKey()
        # (...)
        | beam.io.WriteToPubSub("output_topic")
        )
p.run()

add_timestamping function as per documentation:

def add_timestamping(elem):
    import json
    import apache_beam as beam
    msg = json.loads(elem)
    unix_timestamp = msg['timeStamp'] / 1000
    return beam.window.TimestampedValue(msg, unix_timestamp)

Output of Python solution:

  1. When using DirectRunner, windows are emitted and the windowing itself is more-or-less appropriate, depending on the delay.
  2. When using DataFlowRunner, ALL messages are skipped with counter appearing in DataFlow UI: droppedDueToLateness.

  1. Java / Scala solution (I've used Scio but this happens to in clean Beam SDK in Java too)
sc.pubsubSubscription[String]("my_sub")
    .applyTransform(ParDo.of(new CustomTs()))
    .withFixedWindows(Duration.standardSeconds(60))
    .map(x => x) // exracting the key somehow, not relevant here
    .groupByKey
    // (...)
    .saveAsPubsub("output_topic")

Adding custom timestamp as per documentation:

import io.circe.parser._
class CustomTs extends DoFn[String, String] {
  @ProcessElement
  def processElement(@Element element: String, out: DoFn.OutputReceiver[String]): Unit = {
    val json = parse(element).right.get
    val timestampMillis: Long = json.hcursor.downField("timeStamp").as[Long].getOrElse(0)
    out.outputWithTimestamp(element, new Instant(timestampMillis))
  }
}

Output of Java / Scala solution:

Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.IllegalArgumentException:
 Cannot output with timestamp 2019-03-02T00:51:39.124Z. 
 Output timestamps must be no earlier than the timestamp of the current input
 (2019-03-28T14:57:53.195Z) minus the allowed skew (0 milliseconds).

I cannot use DoFn.getAllowedTimestampSkew here as it's already deprecated and I don't know what ranges of historical data will be sent.


Having the ability to process historical data is crucial for my project (this data will be sent to Pub/Sub from some store). The pipeline must work both on current data as well as historical one.

My question is: How to process the data using custom timestamps with the ability to operate on windows defined using Beam API?

1

1 Answers

1
votes

If you have the ability to extract the timestamp at insertion point to PubSub, you will be able to make use of user-specified timestamps as metadata. The information on how to is documented under the 1.9 SDK.

https://cloud.google.com/dataflow/model/pubsub-io#timestamps-and-record-ids

"You can use user-specified timestamps for precise control over how elements read from Cloud Pub/Sub are assigned to windows in a Dataflow pipeline. "

As 1.9 is deprecated, in 2.11 you will need https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withTimestampAttribute-java.lang.String-