2
votes

I have objects I want to do calculations on in parallel, thus I thought I can resort to pyspark.

Consider this example, a class whose objects do have a number i, that can be squared wiht square():

class MyMathObject():
    def __init__(self, i):
        self.i = i
    def square(self):
        return self.i ** 2


print(MyMathObject(3).square()) # Test one instance with regular python - works

Aditionally, I set up pyspark (in a jupyter notebook), and now I want to calculate the squares from 0 to 4 in parallel on my objects:

import findspark
findspark.init()
from pyspark import SparkContext

sc = SparkContext("local[2]")

rdd = sc.parallelize([MyMathObject(i) for i in range(5)])
rdd.map(lambda obj: obj.square()).collect() # This fails

This does not work - it results in a very long and for me mostly unhelpful error message. The only line I find somewhat interesting is:

AttributeError: Can't get attribute 'MyMathObject' on <module 'pyspark.daemon' from '/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/daemon.py'>

So, it seems to be something related with how the attribute square() is called, or so. I copy the full message at the end.

Pyspark itself seems to work; for example executing a the following on a plain python list returns the squared numbers as expected.

rdd = sc.parallelize([i for i in range(5)])
rdd.map(lambda i: i**2).collect()

Thus, there seems to be something flawed with the way I create or operate on my objects, but I can not track down the mistake.

The complete error message:

Py4JJavaError Traceback (most recent call last) in 1 rdd = sc.parallelize([MyMathObject(i) for i in range(5)]) ----> 2 rdd.map(lambda obj: obj.square()).collect()

/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/pyspark/rdd.py in collect(self) 887 """ 888 with SCCallSiteSync(self.context) as css: --> 889 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 890 return list(_load_from_socket(sock_info, self._jrdd_deserializer)) 891

/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in call(self, *args) 1302 1303 answer = self.gateway_client.send_command(command) -> 1304 return_value = get_return_value( 1305 answer, self.gateway_client, self.target_id, self.name) 1306

/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 2, 192.168.2.108, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 147, in load_stream yield self._read_with_length(stream) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length return self.loads(obj) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads return pickle.loads(obj, encoding=encoding) AttributeError: Can't get attribute 'MyMathObject' on <module 'pyspark.daemon' from '/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/daemon.py'>

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2154) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2179) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at org.apache.spark.rdd.RDD.collect(RDD.scala:1003) at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168) at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main process() File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process serializer.dump_stream(out_iter, outfile) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 271, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 147, in load_stream yield self._read_with_length(stream) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length return self.loads(obj) File "/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads return pickle.loads(obj, encoding=encoding) AttributeError: Can't get attribute 'MyMathObject' on <module 'pyspark.daemon' from '/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/daemon.py'>

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638) at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:315) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more

1

1 Answers

2
votes

It doesn't look like your mistake, but a way around this is to put the module in a separate Python file and import it:

e.g. Open a file mymodule.py with

class MyMathObject():
    def __init__(self, i):
        self.i = i
    def square(self):
        return self.i ** 2

And in your main script, you can do

from mymodule import MyMathObject

rdd = sc.parallelize([MyMathObject(i) for i in range(5)])
rdd.map(lambda obj: obj.square()).collect()

which should give [0, 1, 4, 9, 16].