I want to apply a ProcessFunction() on an input data stream in Flink, to process each and every incoming element, with a single cache object. My code looks something like this :
object myJob extends FlinkJob {
private val myCache = InMemoryCache()
private def updateCache(myCache,someValue) : Boolean = {//some code}
private def getValue(myCache,someKey) : Boolean = {//some code}
def run(params, executionEnv) : Unit = {
val myStream = executionEnv.getStream()
val processedStream = myStream.process(new ProcessFunction {
def processElement(value,context,collector) : Unit = {
//Update cache
//Collect updated event
}
}
processedStream.write()
}
}
When I parallelize this job, I'm assuming that each parallel instance of the job will have it's own cacheObject and hence, a single cache key, could be present, in multiple cacheObjects. However, I'd like there to be a single cache entry for a particular key, that is, all of the records, corresponding to a particular key, must be processed by a single instance and a single cacheObject. Will using keyBy() on myStream, ensure this, that all of the incoming events, with the same key, are processed by a single parallel task/instance of the Flink job and hence, also by a single cacheObject?