3
votes

I'm trying to pass a partial function to the union of all the RDDs captured in a DStream batch over a sliding window. Lets say I construct a window operation over 10 seconds on a stream discretized into 1 second batches:

val ssc = new StreamingContext(new SparkConf(), Seconds(1))
val stream = ssc.socketStream(...)
val window = stream.window(Seconds(10))

My window will have K many RDDs. I want to use collect(f: PartialFunction[T, U]) on the union of all K of these RDDs. I could call the union operator ++ using foreachRDD, but I want to return an RDD not a Unit and avoid side effects.

What I'm looking for is a reducer like

def reduce(f: (RDD[T], RDD[T]) ⇒ RDD[T]): RDD[T]

on a DStream that I can use like so:

window.reduce(_ ++ _).transform(_.collect(myPartialFunc))

But this is not available in the Spark Streaming API.

Does anyone have any good ideas for combining RDDs captured in a stream into a single RDD so I can pass in a partial function? Or for implementing my own RDD reducer? Perhaps this feature is coming in a subsequent Spark release?

1
The compute function will allow you to get an RDD over a time period.Anant
@Anant Where does the period begin and end? The DStream method compute only accepts a validTime parameter. Is this the start or end of my window? Also, how will I deal with having to repeatedly call compute on the same interval as my batches? I'm looking for something less stateful.nmurthy
@nmurthy You cannot do collect on a DStream. Could you further explain what you're trying to do? There's probably another way.maasg
@maasg Correct, I'm trying to call collect on the union of all the RDDs captured in one DSteram interval. There are two steps to what I'm trying to do: (1) reduce all the RDDs in one DStream interval with the ++ operator into a single RDD, and then (2) callcollect on my reduced RDD using a DStream transform.nmurthy
And what would you do with the result of the collect afterwards? collect is not much more than combining filter and map that are available on the DStream API - not sure why it's required to union the RDDs, though.maasg

1 Answers

2
votes

Partial functions are not directly supported by a DStream operation, but it's not difficult to achieve the same functionality.

For example, let's take a trivial partial function that takes a String an produces an Int of the String if it's a number:

val pf:PartialFunction[String,Int] = {case x if (Try(x.toInt).isSuccess) => x.toInt}

And we have a dstream of Strings:

val stringDStream:DStream[String] = ??? // use your stream source here

Then we can then apply the partial function to the DStream like this:

val intDStream = stringDStream.filter(x => pf.isDefinedAt(x)).map(pf)