4
votes

I'd like to join every RDD in a DStream with a non-streaming, unchanging reference file. Here is my code:

val sparkConf = new SparkConf().setAppName("LogCounter") 
val ssc =  new StreamingContext(sparkConf, Seconds(2)) 

val sc = new SparkContext() 
val geoData = sc.textFile("data/geoRegion.csv") 
            .map(_.split(',')) 
            .map(line => (line(0), (line(1),line(2),line(3),line(4)))) 

val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap 
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) 

val goodIPsFltrBI = lines.filter(...).map(...).filter(...) // details removed for brevity 
val vdpJoinedGeo = goodIPsFltrBI.transform(rdd =>rdd.join(geoData)) 

I'm getting many, many errors, the most common being:

14/11/19 19:58:23 WARN TaskSetManager: Loss was due to java.io.FileNotFoundException
java.io.FileNotFoundException: http://10.102.71.92:40764/broadcast_1

I think I should be broadcasting geoData instead of reading it in with each task (it's a 100MB file), but I'm not sure where to put the code that initializes geoData the first time.

Also I'm not sure if geoData is even defined correctly (maybe it should use ssc instead of sc?). The documentation I've seen just lists the transform and join but doesn't show how the static file was created.

Any ideas on how to broadcast geoData and then join it to each streaming RDD?

1

1 Answers

4
votes
  • FileNotFound Exception:

The geoData textFile is loaded on all workers from the provided location ("data/geroRegion.csv"). It's most probably that this file in only available in the driver and therefore the workers cannot load it, throwing a file not found exception.

  • Broadcast variable:

Broadcast variables are defined on the driver and used on the workers by unwrapping the broadcast container to get the content. This means that the data contained by the broadcast variable should be loaded by the driver before at the time the job is defined.

This might solve two problems in this case: Assuming that the geoData.csv file is located in the driver node, it will allow proper loading of this data on the driver and an efficient spread over the cluster.

In the code above, replace the geoData loading with a local file reading version:

val geoData = Source.fromFile("data/geoRegion.csv").getLines 
            .map(_.split(',')) 
            .map(line => (line(0), (line(1),line(2),line(3),line(4)))).toMap 

val geoDataBC = sc.broadcast(geoData)

To use it, you access the broadcast contents within a closure. Note that you will get access to the map previously wrapped in the broadcast variable: it's a simple object, not an RDD, so in this case you cannot use join to merge the two datasets. You could use flatMap instead:

val vdpJoinedGeo = goodIPsFltrBI.flatMap{ip => geoDataBC.value.get(ip).map(data=> (ip,data)}