To test out stream processing and Flink, I have given myself a seemingly simple problem. My Data stream consists of x
and y
coordinates for a particle along with time t
at which the position was recorded. My objective is to annotate this data with velocity of the particular particle. So the stream might look some thing like this.
<timestamp:Long> <particle_id:String> <x:Double> <y:Double>
1612103771212 p1 0.0 0.0
1612103771212 p2 0.0 0.0
1612103771213 p1 0.1 0.1
1612103771213 p2 -0.1 -0.1
1612103771214 p1 0.1 0.2
1612103771214 p2 -0.1 -0.2
1612103771215 p1 0.2 0.2
1612103771215 p2 -0.2 -0.2
Now there is no guaranty that the events would arrive in order i.e. 1612103771213 p2 -0.1 -0.1
might arrive say 10ms
before 1612103771212 p2 0.0 0.0
.
For simplicity, it can be assumed that any late data will arrive within 100ms
of the early data.
I will admit that I am new to stream processing and Flink, so this might be a stupid question to ask with an obvious answer, but I am currently stumped as to how to go about achieving my objective here.
EDIT
Following David's answer I tried using Flink Table API to sort the Datastream, using nc -lk 9999
for text socket stream. The issue is that nothing gets printed to the console until I close the text socket stream. Here is the scala code I wrote -
package processor
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, FieldExpression, WithOperations}
import org.apache.flink.util.Collector
import java.time.Duration
object AnnotateJob {
val OUT_OF_ORDER_NESS = 100
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, bSettings)
env.setParallelism(1)
// Obtain the input data by connecting to the socket. Here you want to connect to the local 9999 port.
val text = env.socketTextStream("localhost", 9999)
val objStream = text
.filter( _.nonEmpty )
.map(new ParticleMapFunction)
val posStream = objStream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[ParticlePos](Duration.ofMillis(OUT_OF_ORDER_NESS))
.withTimestampAssigner(new SerializableTimestampAssigner[ParticlePos] {
override def extractTimestamp(t: ParticlePos, l: Long): Long = t.t
})
)
val tablePos = tableEnv.fromDataStream(posStream, $"t".rowtime() as "et", $"t", $"name", $"x", $"y")
tableEnv.createTemporaryView("pos", tablePos)
val sorted = tableEnv.sqlQuery("SELECT t, name, x, y FROM pos ORDER BY et ASC")
val sortedPosStream = tableEnv.toAppendStream[ParticlePos](sorted)
// sortedPosStream.keyBy(pos => pos.name).process(new ValAnnotator)
sortedPosStream.print()
// execute program
env.execute()
}
case class ParticlePos(t : Long, name : String, x : Double, y : Double) extends Serializable
case class ParticlePosVal(t : Long, name : String, x : Double, y : Double,
var vx : Double = 0.0, var vy : Double = 0.0) extends Serializable
class ParticleMapFunction extends MapFunction[String, ParticlePos] {
override def map(t: String): ParticlePos = {
val parts = t.split("\\W+")
ParticlePos(parts(0).toLong, parts(1), parts(2).toDouble, parts(3).toDouble)
}
}
}