3
votes

I'm using Spark 1.5.2 and Python 2.7 in the Ubuntu environment.

According to the documentation about countByValue and countByValueAndWindow: Transformations on dstreams
Window operations

countByValue: When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.

countByValueAndWindow: When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.

So basically the return value for these two functions should be a list of (K, Long) pairs, right?

However, when I was doing some experiments, the return value turned out to be a list of integers, not pairs!

What's more, in the official test codes on Github for pySpark: Link1 Link2

You can see the "expected results" are a list of integers! And it seems to me that it's counting the number of distinct elements and combining them all together.

I thought I misunderstood the documentation somehow until I saw the test codes on Github for scala: Link1 Link2

Similar test cases, but the results are a sequence of pairs at this time!

So in summary, the documentation and test cases of scala told us the result are pairs. But the python test cases and my own experiments showed that the result are integers.

I'm new to PySpark and spark streaming. Could someone help me explain this inconsistency a little bit? Right now I'm using reduceByKey and reduceByKeyAndWindow as a workaround.

References:

  1. PySpark streaming documentation about countByValue

  2. PySpark streaming documentation about countByVauleAndWindow

  3. Dpark test cases of countByVauleAndWindow

  4. An example using countByValue in PySpark (not streaming)


UPDATE

This bug is scheduled to be fixed in pyspark 2.0.0

1
If you search "countByKeyAndWindow" on github, you can only find one place in python that used this function...unpopular or because of bugs?Krist Jin
I have raise this as a bug: issues.apache.org/jira/browse/SPARK-12353Krist Jin

1 Answers

0
votes

I agree, countByValueAndWindow has a bug, it should return count-by-value and not only the counts without the value. Even if you run the same test case in Python as the Scala version runs (link) you can see how the pyspark version of this function only returns the counts and not what values they are for (eg. pairs)

>>> input = [['a'], ['b', 'b'], ['a', 'b']]
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc, 1)
>>> input = [sc.parallelize(d, 1) for d in input]
>>> input_stream = ssc.queueStream(input)
>>> input_stream2 = input_stream.countByValueAndWindow(2, 1)
>>> def f(rdd):
...     rows = rdd.collect()
...     for r in rows:
...         print r
... 
>>> input_stream2.foreachRDD(f)
>>> 
>>> sc.setCheckpointDir('/home/xxxx/checkpointdir')
>>> ssc.start() 
>>> 1
2
2
2
0

You should raise this as a bug in Jira (link) and this should be easy to fix. I can't see how anybody could use this function in its current form, as those value numbers returned are pretty meaningless without the keys.