0
votes

I want to apply a ProcessFunction() on an input data stream in Flink, to process each and every incoming element, with a single cache object. My code looks something like this :

object myJob extends FlinkJob {
 private val myCache = InMemoryCache()

 private def updateCache(myCache,someValue) : Boolean = {//some code}

 private def getValue(myCache,someKey) : Boolean = {//some code}

 def run(params, executionEnv) : Unit = {
   val myStream = executionEnv.getStream()

   val processedStream = myStream.process(new ProcessFunction {
     def processElement(value,context,collector) : Unit = {
      //Update cache
      //Collect updated event
     }
   }

   processedStream.write()
 }
}

When I parallelize this job, I'm assuming that each parallel instance of the job will have it's own cacheObject and hence, a single cache key, could be present, in multiple cacheObjects. However, I'd like there to be a single cache entry for a particular key, that is, all of the records, corresponding to a particular key, must be processed by a single instance and a single cacheObject. Will using keyBy() on myStream, ensure this, that all of the incoming events, with the same key, are processed by a single parallel task/instance of the Flink job and hence, also by a single cacheObject?

2

2 Answers

0
votes

Yes, keyBy guarantees that every event with the same key will be processed by the same instance of an operator. This is essential for high throughput, low-latency stateful stream processing.

This enables flink's state to be local, which makes it easy to work with, and fast. Timers also take advantage of this keyed partitioning.

Using Flink's keyed state would probably work much better than using cache objects.

0
votes

Instead of an object I believe you should use a state.

All events with the same key will have access to the same state thus to the same value. Modifying one of those states will not affect the other keyed states.