I've setup my first spark cluster (1 master, 2 workers) and an iPython notebook server that I've setup to access the cluster. I'm running the workers from Anaconda to make sure the python setup is correct on each box. The iPy notebook server appears to have everything setup correctly, and I'm able to initialize Spark and push a job out. However, the job is failing, and I'm not sure how to troubleshoot. Here's the code:
from pyspark import SparkContext
from numpy import random
CLUSTER_URL = 'spark://192.168.1.20:7077'
sc = SparkContext( CLUSTER_URL, 'pyspark')
def sample(p):
from numpy import random
x, y = random(), random()
return 1 if x*x + y*y < 1 else 0
count = sc.parallelize(xrange(0, 20)).map(sample).reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / 20)
And here's the error:
Py4JJavaError Traceback (most recent call last) in () 3 return 1 if xx + yy < 1 else 0 4 ----> 5 count = sc.parallelize(xrange(0, 20)).map(sample).reduce(lambda a, b: a + b) 6 print "Pi is roughly %f" % (4.0 * count / 20)
/opt/spark-1.2.0/python/pyspark/rdd.pyc in reduce(self, f) 713 yield reduce(f, iterator, initial) 714 --> 715 vals = self.mapPartitions(func).collect() 716 if vals: 717 return reduce(f, vals)
/opt/spark-1.2.0/python/pyspark/rdd.pyc in collect(self) 674 """ 675 with SCCallSiteSync(self.context) as css: --> 676 bytesInJava = self._jrdd.collect().iterator() 677 return list(self._collect_iterator_through_file(bytesInJava)) 678
/opt/spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in call(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, --> 538 self.target_id, self.name) 539 540 for temp_arg in temp_args:
/opt/spark-1.2.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. --> 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError(
Py4JJavaError: An error occurred while calling o28.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 31 in stage 0.0 failed 4 times, most recent failure: Lost task 31.3 in stage 0.0 (TID 72, 192.168.1.21): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/opt/spark-1.2.0/python/pyspark/worker.py", line 107, in main process() File "/opt/spark-1.2.0/python/pyspark/worker.py", line 98, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/opt/spark-1.2.0/python/pyspark/serializers.py", line 227, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/opt/spark-1.2.0/python/pyspark/rdd.py", line 710, in func initial = next(iterator) File "", line 2, in sample TypeError: 'module' object is not callable
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:137) at org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:174) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:96) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I'm not even sure where to start debugging / diagnosing this, so any help would be appreciated. Happy to post other logs if that would be helpful.