2
votes

I have an RDD of dictionaries, and I'd like to get an RDD of just the distinct elements. However, when I try to call

rdd.distinct()

PySpark gives me the following error

TypeError: unhashable type: 'dict'

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/02/19 16:55:56 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 2346, in pipeline_func
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 317, in func
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/rdd.py", line 1776, in combineLocally
  File "/usr/local/Cellar/apache-spark/1.6.0/libexec/python/lib/pyspark.zip/pyspark/shuffle.py", line 238, in mergeValues
    d[k] = comb(d[k], v) if k in d else creator(v)
TypeError: unhashable type: 'dict'

I do have a key inside of the dict that I could use as the distinct element, but the documentation doesn't give any clues on how to solve this problem.

EDIT: The content is made up of strings, arrays of strings, and a dictionary of numbers

EDIT 2: Example of a dictionary... I'd like dicts with equal "data_fingerprint" keys to be considered equal:

{"id":"4eece341","data_fingerprint":"1707db7bddf011ad884d132bf80baf3c"}

Thanks

2
What exactly is the content of the dictionaries? How do you want to hash these? - zero323
answered up in question - noli
It is not enough. You need an exact strategy how you want to compare dictionaries. These are not hashable for a two reasons: mutability and undefined order. In your case situation is even worse since it contains unhashable values as well. So question is what makes dictionaries equal for you? - zero323
Oh ok.. I've added a "data_fingerprint" key to the dictionary contents, and equal dictionaries have equal keys. Example updated in question - noli

2 Answers

2
votes

As @zero323 pointed out in his comment you have to decide how to compare dictionaries as they are not hashable. One way would be to sort the keys (as they are not in any particular order) for example by lexycographic order. Then create a string of the form:

def dict_to_string(dict):
    ...
    return 'key1|value1|key2|value2...|keyn|valuen'

If you have nested unhashable objects you have to do this recursively.

Now you can just transform your RDD to pair with string as a key (or some kind of hash of it)

pairs = dictRDD.map(lambda d: (dict_to_string(d), d))

To get what you want you just have to reduce by key as fallows

distinctDicts = pairs.reduceByKey(lambda val1, val2: val1).values()
1
votes

Since your data provides an unique key you can simply do something like this:

(rdd
    .keyBy(lambda d: d.get("data_fingerprint"))
    .reduceByKey(lambda x, y: x)
    .values())

There are at least two problems with Python dictionaries which make them bad candidates for hashing:

  • mutability - which makes any hashing tricky
  • arbitrary order of keys

A while ago there was a PEP proposing frozerdicts (PEP 0416) but it was finally rejected.