3
votes

I'm trying to pass a partial function to the union of all the RDDs captured in a DStream batch over a sliding window. Lets say I construct a window operation over 10 seconds on a stream discretized into 1 second batches:

val ssc = new StreamingContext(new SparkConf(), Seconds(1))
val stream = ssc.socketStream(...)
val window = stream.window(Seconds(10))

My window will have K many RDDs. I want to use collect(f: PartialFunction[T, U]) on the union of all K of these RDDs. I could call the union operator ++ using foreachRDD, but I want to return an RDD not a Unit and avoid side effects.

What I'm looking for is a reducer like

def reduce(f: (RDD[T], RDD[T]) ⇒ RDD[T]): RDD[T]

on a DStream that I can use like so:

window.reduce(_ ++ _).transform(_.collect(myPartialFunc))

But this is not available in the Spark Streaming API.

Does anyone have any good ideas for combining RDDs captured in a stream into a single RDD so I can pass in a partial function? Or for implementing my own RDD reducer? Perhaps this feature is coming in a subsequent Spark release?

1
The compute function will allow you to get an RDD over a time period. - Anant
@Anant Where does the period begin and end? The DStream method compute only accepts a validTime parameter. Is this the start or end of my window? Also, how will I deal with having to repeatedly call compute on the same interval as my batches? I'm looking for something less stateful. - nmurthy
@nmurthy You cannot do collect on a DStream. Could you further explain what you're trying to do? There's probably another way. - maasg
@maasg Correct, I'm trying to call collect on the union of all the RDDs captured in one DSteram interval. There are two steps to what I'm trying to do: (1) reduce all the RDDs in one DStream interval with the ++ operator into a single RDD, and then (2) callcollect on my reduced RDD using a DStream transform. - nmurthy
And what would you do with the result of the collect afterwards? collect is not much more than combining filter and map that are available on the DStream API - not sure why it's required to union the RDDs, though. - maasg

1 Answers

2
votes

Partial functions are not directly supported by a DStream operation, but it's not difficult to achieve the same functionality.

For example, let's take a trivial partial function that takes a String an produces an Int of the String if it's a number:

val pf:PartialFunction[String,Int] = {case x if (Try(x.toInt).isSuccess) => x.toInt}

And we have a dstream of Strings:

val stringDStream:DStream[String] = ??? // use your stream source here

Then we can then apply the partial function to the DStream like this:

val intDStream = stringDStream.filter(x => pf.isDefinedAt(x)).map(pf)