1
votes

Why Spark forced to build RDD from a list of tuples, in case of doing reducebykey transformation?

reduce_rdd = sc.parallelize([{'k1': 1}, {'k2': 2}, {'k1': -2}, {'k3': 4}, {'k2': -5}, {'k1': 4}])
print(reduce_rdd.reduceByKey(lambda x, y: x + y).take(100))

error:

for k, v in iterator:
ValueError: need more than 1 value to unpack

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

If reduceByKey() is intended to work with a collection of key-value pairs, it seems obvious to me that each pair should reside in the type of Python object intended for key-value pairs, a dictionary not a tuple.

1
Dictionaries have unique keys, and therefore can't be easily partitioned. This makes it a unsuitable data structure for a distributed computing framework like Spark. - Alex
Thx for reply. Yes, right dicts have unique key, but in terms of one dictionary. In my case I have list of dicts, which could be interpreted as separate objects in rdd. Or I didn't get you? Could you pls show an example. - Dipas

1 Answers

5
votes

reducebykey works on Pair RDDs. Pair RDDs are effectively a distributed version of list of tuples. As these data structures can be easily partitioned, they are a natural choice for distributed computing of key:value data.

There are projects that implement an IndexedRDD, but at the time of writing these have not yet been integrated into the spark-core code. If you are interested, you can install a PySpark version of IndexedRDD from this Github repository.

Coming back to your problem, it can be easily solved without an IndexedRDD:

reduce_rdd = sc.parallelize([{'k1': 1}, {'k2': 2}, {'k1': -2}, 
                             {'k3': 4}, {'k2': -5}, {'k1': 4}])
reduce_rdd.map(lambda x:x.items()[0]).reduceByKey(lambda x, y: x + y).collectAsMap()

This returns the following output:

{'k1': 3, 'k2': -3, 'k3': 4}