1
votes

When I set the value of local to 1, the operation is normal, but when set to 2, the error message is reported as follows

from pyspark import SparkContext
# Changing 1 to 2 will give you an error
sc = SparkContext("local[2]", "sort")


class MySort:
    def __init__(self, tup):
        self.tup = tup

    def __gt__(self, other):
        if self.tup[0] > other.tup[0]:
            return True
        elif self.tup[0] == other.tup[0]:
            if self.tup[1] >= other.tup[1]:
                return True
            else:
                return False
        else:
            return False


r1 = sc.parallelize([(1, 2), (2, 2), (2, 3), (2, 1), (1, 3)])
r2 = r1.sortBy(MySort)
print(r2.collect())
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "E:\spark2.3.1\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 230, in main
  File "E:\spark2.3.1\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 225, in process
  File "E:\spark2.3.1\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 376, in dump_stream
    bytes = self.serializer.dumps(vs)
  File "E:\spark2.3.1\spark-2.3.1-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 555, in dumps
    return pickle.dumps(obj, protocol)
_pickle.PicklingError: Can't pickle : attribute lookup MySort on __main__ failed
2
can you add error messega?hamza tuna
sorry,I forgot itwise_w

2 Answers

0
votes

I think you need to add params to spark-submit with file with your class:

--py-files your_file.py

because spark needs to pass this class to another worker.

0
votes

Its really interesting attribute of spark i did not know it before. I think when you use single core, classes are not pickled(pickle is needed to use class in other places). But you can still use functions (i assume you sorted values by first two values):

key_func = lambda tup : tup[:2]

r1 = sc.parallelize([(1, 2), (2, 2), (2, 3), (2, 1), (1, 3)])
r2 = r1.sortBy(key_func)
print(r2.collect())