2
votes

To test out stream processing and Flink, I have given myself a seemingly simple problem. My Data stream consists of x and y coordinates for a particle along with time t at which the position was recorded. My objective is to annotate this data with velocity of the particular particle. So the stream might look some thing like this.

<timestamp:Long> <particle_id:String> <x:Double> <y:Double>

1612103771212 p1 0.0 0.0
1612103771212 p2 0.0 0.0
1612103771213 p1 0.1 0.1
1612103771213 p2 -0.1 -0.1
1612103771214 p1 0.1 0.2
1612103771214 p2 -0.1 -0.2
1612103771215 p1 0.2 0.2
1612103771215 p2 -0.2 -0.2

Now there is no guaranty that the events would arrive in order i.e. 1612103771213 p2 -0.1 -0.1 might arrive say 10ms before 1612103771212 p2 0.0 0.0.

For simplicity, it can be assumed that any late data will arrive within 100ms of the early data.

I will admit that I am new to stream processing and Flink, so this might be a stupid question to ask with an obvious answer, but I am currently stumped as to how to go about achieving my objective here.

EDIT

Following David's answer I tried using Flink Table API to sort the Datastream, using nc -lk 9999 for text socket stream. The issue is that nothing gets printed to the console until I close the text socket stream. Here is the scala code I wrote -


package processor

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, FieldExpression, WithOperations}
import org.apache.flink.util.Collector

import java.time.Duration


object AnnotateJob {

  val OUT_OF_ORDER_NESS = 100

  def main(args: Array[String]) {
    // set up the streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()

    val tableEnv = StreamTableEnvironment.create(env, bSettings)

    env.setParallelism(1)

    // Obtain the input data by connecting to the socket. Here you want to connect to the local 9999 port.
    val text = env.socketTextStream("localhost", 9999)
    val objStream = text
      .filter( _.nonEmpty )
      .map(new ParticleMapFunction)

    val posStream = objStream
      .assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[ParticlePos](Duration.ofMillis(OUT_OF_ORDER_NESS))
          .withTimestampAssigner(new SerializableTimestampAssigner[ParticlePos] {
            override def extractTimestamp(t: ParticlePos, l: Long): Long = t.t
          })
      )

    val tablePos = tableEnv.fromDataStream(posStream, $"t".rowtime() as "et", $"t", $"name", $"x", $"y")
    tableEnv.createTemporaryView("pos", tablePos)
    val sorted = tableEnv.sqlQuery("SELECT t, name, x, y FROM pos ORDER BY et ASC")

    val sortedPosStream = tableEnv.toAppendStream[ParticlePos](sorted)

    // sortedPosStream.keyBy(pos => pos.name).process(new ValAnnotator)

    sortedPosStream.print()

    // execute program
    env.execute()
  }

  case class ParticlePos(t : Long, name : String, x : Double, y : Double) extends Serializable
  case class ParticlePosVal(t : Long, name : String, x : Double, y : Double,
                            var vx : Double = 0.0, var vy : Double = 0.0) extends Serializable

  class ParticleMapFunction extends MapFunction[String, ParticlePos] {
    override def map(t: String): ParticlePos = {
      val parts = t.split("\\W+")
      ParticlePos(parts(0).toLong, parts(1), parts(2).toDouble, parts(3).toDouble)
    }
  }

}

2

2 Answers

2
votes

One way of doing this in Flink might be to use a KeyedProcessFunction, i.e. a function that can:

  • process each event in your stream
  • maintain some state
  • trigger some logic with a timer based on event time

So it would go something like this:

  • you need to know some kind of "max out of orderness" about your data. Based on your description, let's assume 100ms for example, such that when processing data at timestamp 1612103771212 you decide to consider you're sure to have received all data until 1612103771112.
  • your first step is to keyBy() your stream, keying by particle id. This means that the logic of next operators in your Flink application can now be expressed in terms of a sequence of events of just one particle, and each particle is processed in this manner in parallel.

Something like this:

yourStream.keyBy(...lookup p1 or p2 here...).process(new YourProcessFunction())
  • during the initialization of the ProcessFunction of YourProcessFunction (i.e. during the open method), initialize a ListState where you can safely store stuff.
  • when processing an element in your stream, in the processElement method, just add it to the listState and register a timer trigger in, say, 100ms
  • when the onTimer() method triggers, say at time t, look at all elements in the listState that have a time < t - 100 and, if you have at least two of them, sort them, remove them from the state, apply the speed calculation and annotation logic you described, and emit the result downstream.

You'll find an example in the official Flink training that is using this kind of logic for the duration of taxi rides, which has lot's of similarities with your use case. Have also a look at the various Readme.md files of that repo for more details.

2
votes

In general, watermarks in combination with event-time timers are the solution to the problems posed by out-of-order event streams. The section of the official Flink training that covers Event Time and Watermarks explains how this works.

At a higher level it is sometimes easier to use something like Flink's CEP library, or Flink SQL, because they make it very easy to sort a stream by time, thus removing all of the out-of-orderness. For example, see How to sort a stream by event time using Flink SQL for an example of a Flink DataStream program that uses Flink SQL to sort a stream by event time.

In your case, a fairly simple MATCH_RECOGNIZE query would do what you're looking for. That might look something like this,

SELECT *
    FROM event
    MATCH_RECOGNIZE (
        PARTITION BY particleId
        ORDER BY ts
        MEASURES 
            b.ts, 
            b.particleId, 
            velocity(a, b)
        AFTER MATCH SKIP TO NEXT ROW
        PATTERN (a b)
        DEFINE
            a AS TRUE,
            b AS TRUE
    )

where velocity(a, b) is a user-defined function that computes the velocity, given two sequential events (a and b) for the same particle.