4
votes

I used transform method in a similar use case as described in Transform Operation section of Transformations on DStreams:

spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))

My code is as follows:

sc = SparkContext("local[4]", "myapp")
ssc = StreamingContext(sc, 5)
ssc.checkpoint('hdfs://localhost:9000/user/spark/checkpoint/')
lines = ssc.socketTextStream("localhost", 9999)
counts = lines.flatMap(lambda line: line.split(" "))\
              .map(lambda word: (word, 1))\
              .reduceByKey(lambda a, b: a+b)
filter_rdd = sc.parallelize([(u'A', 1), (u'B', 1)], 2)
filtered_count = counts.transform(
    lambda rdd: rdd.join(filter_rdd).filter(lambda k, (v1, v2): v1 and not v2)
)
filtered_count.pprint()
ssc.start()
ssc.awaitTermination()

But I get the following error

It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

How should I be using my external RDD to filter elements out of a dstream?

1
have u got an answer for this oneBg1850

1 Answers

3
votes

The difference between the Spark doc example and your code is the use of ssc.checkpoint().

Although the specific code example you provided will work without checkpoint, I guess you actually require it. But the concept of introducing an external RDD into the scope of a checkpointed DStream is potentially invalid: when recovering from a checkpoint, the external RDD may have changed.

I tried to checkpoint the external RDD, but I had no luck with it either.