1
votes

I would like to extract the timestamp of the messages that are produced by FlinkKafkaConsumer010 as values in the data stream.

I am aware of the AssignerWithPeriodicWatermarks class, but this seems to only extract the timestamp for the purposes of time aggregates via the DataStream API.

I would like to make that Kafka message timestamp available in a Table so later on, I can use SQL on it.

EDIT: Tried this:

  val consumer = new FlinkKafkaConsumer010("test", new SimpleStringSchema, properties)
  consumer.setStartFromEarliest()

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val tenv = TableEnvironment.getTableEnvironment(env)

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

  class KafkaAssigner[T] extends AssignerWithPeriodicWatermarks[T] {
    var maxTs = 0L
    override def extractTimestamp(element: T, previousElementTimestamp: Long): Long = {
      maxTs = Math.max(maxTs, previousElementTimestamp)
      previousElementTimestamp
    }
    override def getCurrentWatermark: Watermark = new Watermark(maxTs - 1L)
  }

  val stream = env
    .addSource(consumer)
    .assignTimestampsAndWatermarks(new KafkaAssigner[String])
    .flatMap(_.split("\\W+"))

  val tbl = tenv.fromDataStream(stream, 'w, 'ts.rowtime)

It compiles, but throws:

Exception in thread "main" org.apache.flink.table.api.TableException: Field reference expression requested.
    at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:630)
    at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
    at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
    at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
    at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)

at the very last line of the above code.

EDIT2: Thanks to @fabian-hueske for pointing me to a workaround. Full code at https://github.com/andrey-savov/flink-kafka

1

1 Answers

0
votes

Flink's Kafka 0.10 consumer automatically sets the timestamp of a Kafka message as the event-time timestamp of produced records if the time characteristic EventTime is configured (see docs).

After you have ingested the Kafka topic into a DataStream with timestamps (still not visible) and watermarks assigned, you can convert it into a Table with the StreamTableEnvironment.fromDataStream(stream, fieldExpr*) method. The fieldExpr* parameter is a list of expressions that describe the schema of the generated table. You can add a field that holds the record timestamp of the stream with an expression mytime.rowtime, where mytime is the name of the new field and rowtime indicates that the value is extracted from the record timestamp. Please check the docs for details.

NOTE: As @bfair pointed out, the conversion of a DataStream of an atomic type (such as DataStream[String]) fails with an exception in Flink 1.3.2 and earlier versions. The bug has been reported as FLINK-7939 and will be fixed in the next versions.