I'd like to join every RDD in a DStream with a non-streaming, unchanging reference file. Here is my code:
val sparkConf = new SparkConf().setAppName("LogCounter")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val sc = new SparkContext()
val geoData = sc.textFile("data/geoRegion.csv")
.map(_.split(','))
.map(line => (line(0), (line(1),line(2),line(3),line(4))))
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val goodIPsFltrBI = lines.filter(...).map(...).filter(...) // details removed for brevity
val vdpJoinedGeo = goodIPsFltrBI.transform(rdd =>rdd.join(geoData))
I'm getting many, many errors, the most common being:
14/11/19 19:58:23 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException
java.io.FileNotFoundException: http://10.102.71.92:40764/broadcast_1
I think I should be broadcasting geoData instead of reading it in with each task (it's a 100MB file), but I'm not sure where to put the code that initializes geoData the first time.
Also I'm not sure if geoData is even defined correctly (maybe it should use ssc instead of sc?). The documentation I've seen just lists the transform and join but doesn't show how the static file was created.
Any ideas on how to broadcast geoData and then join it to each streaming RDD?