Using Spark streaming (1.6) I have a filestream for reading lookup data with 2s of batch size, however files are copyied to the directory only every hour.
Once there's a new file, its content is read by the stream, this is what I want to cache into memory and keep there
until new files are read.
There's another stream to which I want to join this dataset therefore I'd like to cache.
This is a follow-up question of Batch lookup data for Spark streaming.
The answer does work fine with updateStateByKey
however I don't know how to deal with cases when a KV pair is
deleted from the lookup files, as the Sequence of values in updateStateByKey
keeps growing.
Also any hint how to do this with mapWithState
would be great.
This is what I tried so far, but the data doesn't seem to be persisted:
val dictionaryStream = ssc.textFileStream("/my/dir")
dictionaryStream.foreachRDD{x =>
if (!x.partitions.isEmpty) {
x.unpersist(true)
x.persist()
}
}