I have a Flink application for click stream collection and processing. The application consists of Kafka as event source, a map function and a sink as image shown below:
I want to enrich the incoming click stream data with user's IP location based on userIp field in raw event ingested from Kafka.
a simplified slice of the CSV file as shown below
start_ip,end_ip,country
"1.1.1.1","100.100.100.100","United States of America"
"100.100.100.101","200.200.200.200","China"
I have made some researches and found a couple of potential solutions:
1. Solution: Broadcast the enrichment data and connect with event stream with some IP matching logic.
1. Result: It worked well for a couple sample IP location data but not with whole CSV data. JVM heap has reached to 3.5 GB and due to the broadcast state, there is no way to put the broadcast state into disk (for RocksDb)
2. Solution: Load CSV data in open()
method in RichFlatMapFunction
into the state(ValueState) before start of the event processing and enrich event data in flatMap
method.
2. Result: Due to the enrichment data is so big to store in JVM heap, it's impossible to load into ValueState. And also de/serializing through ValueState is bad practice for data in key-value nature.
3. Solution: To avoid to deal with JVM heap constraint, I have tried to put the enrichment data into RocksDB(uses disk) as state with MapState.
3. Result: Trying to load the CSV file into MapState in open()
method, gave me error that tells me you cannot put into MapState in open()
method because I was not in keyed context in open()
method like this question: Flink keyed stream key is null
4. Solution: Because of need of the keyed context for MapState(to put RocksDB), I tried to load whole CSV file into local RocksDB instance(disk) in the process function after making the DataStream into KeyedStream:
class KeyedIpProcess extends KeyedProcessFunction[Long, Event, Event] {
var ipMapState: MapState[String, String] = _
var csvFinishedFlag: ValueState[Boolean] = _
override def processElement(event: Event,
ctx: KeyedProcessFunction[Long, Event, Event]#Context,
out: Collector[Event]): Unit = {
val ipDescriptor = new MapStateDescriptor[String, String]("ipMapState", classOf[String], classOf[String])
val csvFinishedDescriptor = new ValueStateDescriptor[Boolean]("csvFinished", classOf[Boolean])
ipMapState = getRuntimeContext.getMapState(ipDescriptor)
csvFinishedFlag = getRuntimeContext.getState(csvFinishedDescriptor)
if (!csvFinishedFlag.value()) {
val csv = new CSVParser(defaultCSVFormat)
val fileSource = Source.fromFile("/tmp/ip.csv", "UTF-8")
for (row <- fileSource.getLines()) {
val Some(List(start, end, country)) = csv.parseLine(row)
ipMapState.put(start, country)
}
fileSource.close()
csvFinishedFlag.update(true)
}
out.collect {
if (ipMapState.contains(event.userIp)) {
val details = ipMapState.get(event.userIp)
event.copy(data =
event.data.copy(
ipLocation = Some(details.country)
))
} else {
event
}
}
}
}
4. Result: It's too hacky and prevents event processing due to blocking file read operation.
Could you tell me what can I do for this situation?
Thanks