2
votes

I'm working on poc project in java using kafka -> flink -> elastic search.

On kafka will be produced an unpredictable number of events from 0 up to thousands of events/sec such as on a specific topic.

{"gid":"abcd-8910-2ca4227527f9", "state":"stateA", "timestamp:1465566255, "other unusefull info":"..."} 

Flink will consume this events and should sink every second into elastic search the number of events in each state ex:

{"stateA":54, "stateB":100, ... "stateJ":34}

I have 10 states: [Created, ... , Deleted] with an average life cycle of 15 minutes. The state can change twice a second. Theoretically new states could be added.

In order to sink streams every second I'm thinking to use flink's time windows https://flink.apache.org/news/2015/12/04/Introducing-windows.html

The problem is that I need stateful objects with info about guid->previous-state and stateX->count in order to be able to increment/decrement the count when new event occurs.

I found a draft document about stateful steam processing https://cwiki.apache.org/confluence/display/FLINK/Stateful+Stream+Processing

I'm new to flink and stream processing, I didn't dig into flink stateful stream processing yet. For the first phase I'm thinking to use static objects for this, but this approach won't work when several flink instances will be launch.

I want to ask you:

  1. What do you think about this approach ?
  2. Is flink suited for this kind of stream processing ?
  3. What will be your approach for solving this problem ?

Also I'd appreciate some code snippets for the windowed stateful stream solution (or other solutions).

Thanks,

1

1 Answers

1
votes

How about something like the following?

It uses 15 minute windows, after which the window state will be cleaned up. It also uses a custom trigger that evaluates the window every second. As far as the windowing operation goes there is a ReduceFunction that simply keeps the latest state for each guid and a WindowFunction that emits a (state, 1) tuple. We then key by this state and sum that. I think this should give you the result you're looking for.

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val stream = env.addSource(new FlinkKafkaProducer(...))

val results = stream
  .keyBy(_.guid)
  .timeWindow(Time.minutes(15))
  .trigger(ProcessingTimeTriggerWithPeriodicFirings(1000))
  .apply(
    (e1, e2) => e2,
    (k, w, i, c: Collector[(String, Long)]) => {
      if (i.head != null) c.collect((i.head.state, 1))
    }
  )
  .keyBy(0)
  .timeWindow(Time.seconds(1))
  .sum(1)
  .addSink(new ElasticsearchSink<>(...))

env.execute("Count States")

ProcessingTimeTriggerWithPeriodicFirings is defined as follows:

object ProcessingTimeTriggerWithPeriodicFirings {
  def apply(intervalMs: Long) = {
    new ProcessingTimeTriggerWithPeriodicFirings(intervalMs)
  }
}

class ProcessingTimeTriggerWithPeriodicFirings(intervalMs: Long)
  extends Trigger[Event, TimeWindow] {

  private val startTimeDesc =
    new ValueStateDescriptor[Long]("start-time", classOf[Long], 0L)

  override def onElement(element: Event, timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
    val startTime = ctx.getPartitionedState(startTimeDesc)
    if (startTime.value == 0) {
      startTime.update(window.getStart)
      ctx.registerProcessingTimeTimer(window.getEnd)
      ctx.registerProcessingTimeTimer(System.currentTimeMillis() + intervalMs)
    }
    TriggerResult.CONTINUE
  }

  override def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
    if (time == window.getEnd) {
      TriggerResult.PURGE
    }
    else {
      ctx.registerProcessingTimeTimer(time + intervalMs)
      TriggerResult.FIRE
    }
  }

  override def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
}