1
votes

I am using python with pyspark

I run a standalone cluster on docker for testing purposes.

Using this repo of code

Note that when running it you should run this command before in order to be able to log into it

docker network create --gateway 10.5.0.1 --subnet 10.5.0.0/24 spark_master

I ssh into the worker and master and when using

which python

I get similar python versions (3.5)

When I run the simplest pyspark code (out of the containers)

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('appName').setMaster('spark://0.0.0.0:7077')
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4])
rdd.map(lambda x: x**2).collect()

I keep getting this error

Exception: Python in worker has different version 3.5 than that in driver 3.7, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

19/12/05 08:14:29 INFO CoarseGrainedExecutorBackend: Got assigned task 7 19/12/05 08:14:29 INFO Executor: Running task 1.3 in stage 0.0 (TID 7) 19/12/05 08:14:29 INFO Executor: Executor is trying to kill task 1.3 in stage 0.0 (TID 7), reason: Stage cancelled 19/12/05 08:14:29 INFO Executor: Executor killed task 1.3 in stage 0.0 (TID 7), reason: Stage cancelled 19/12/05 08:14:29 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown 19/12/05 08:14:29 INFO MemoryStore: MemoryStore cleared 19/12/05 08:14:29 INFO BlockManager: BlockManager stoppe

My client's python version is 3.7 but I tried setting up new conda with python 3.5 and still got the same error.

I read online here that I should set the driver and worker python to be the same, but they seem to be the same. What am i missing? should I set it up on the client? to the best of my knowledge the client is only serving a job to the driver, it should not affect the driver's python version, and evan if it does, I tried using python 3.5 with no luck...

edit: Tried setting the env variables using os.environ: I tried to add this to my python code

os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.5"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3.5"
os.environ["PYSPARK_WORKER_PYTHON"] = "/usr/bin/python3.5"

Still getting the same error. Seems that when I tried adding a missing path, it did throw another error of

no such file or directory

I assume it means that this code does somehow run in the cluster but seems that it does not affect the actual problem.

1
Python version on Spark and your driver needs to be the same. Please check if you have defined pyspark_python to the correct version in your spark config. If not you can set it using os.environ["PYSPARK_PYTHON"] = path to python version in your driver program. Also do set export PYSPARK_PYTHON=path to python in your spark.env.sh fileJason Chia
Let me know if that fixes your problem.Jason Chia
@JasonChia by driver program you mean the code I run on my client, or the docker file of the master?thebeancounter
Does the python code I run on the client has any affect on the environment in the driver itself? it's supposed to be a contained environmentthebeancounter
@JasonChia Tried that, no luck, see editthebeancounter

1 Answers

0
votes

Finally, the problem was with the client's python version, changing it to the same (3.5) version of the driver and workers solved the problem.

I do not understand why, to the best of my knowledge the client's version should not collide with the driver in any way.

Possible reason in using pyspark to run user functions written in python.