3
votes

I'm newbie to pyspark, last day I drew crime data to map and it worked fine, today I'm facing this issue.

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 3.0 failed 1 times, most recent failure: Lost task 5.0 in stage 3.0 (TID 8, localhost, executor driver): org.apache.spark.api.python.PythonException

this is full traceback

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-2-a4ce64abb6b1> in <module>()
 43     gmap.scatter(t_lat, t_lng, '#8A15DE', size=40, marker=False)
 44     gmap.draw('crimefile.html')
---> 45 init()

<ipython-input-2-a4ce64abb6b1> in init()
 34 
 35     gmap = gmplot.GoogleMapPlotter(41.881832, -87.623177, 16).from_geocode('Chicago')
---> 36     lat = parsedData.filter(lambda x: x.Primary_Type == 'BATTERY' and x.Year == '2015')                    .map(lambda x:float(x.Latitude)).collect()
 37     lng = parsedData.filter(lambda x: x.Primary_Type == 'BATTERY' and x.Year == '2015')                   .map(lambda x:float(x.Longitude)).collect()
 38 

/Users/Mubin/Spark/python/pyspark/rdd.pyc in collect(self)
807         """
808         with SCCallSiteSync(self.context) as css:
--> 809             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
810         return list(_load_from_socket(port, self._jrdd_deserializer))
811 

/Users/Mubin/Spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/Users/Mubin/Spark/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/Users/Mubin/Spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure:     Task 5 in stage 3.0 failed 1 times, most recent failure: Lost task 5.0 in     stage 3.0 (TID 8, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/Mubin/Spark/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
process()
  File "/Users/Mubin/Spark/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File     "/Users/Mubin/Spark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
  File "<ipython-input-2-a4ce64abb6b1>", line 36, in <lambda>
 ValueError: could not convert string to float: 

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at  org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:934)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/Mubin/Spark/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
process()
File "/Users/Mubin/Spark/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File    "/Users/Mubin/Spark/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "<ipython-input-2-a4ce64abb6b1>", line 36, in <lambda>
ValueError: could not convert string to float: 

at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:156)
at org.apache.spark.api.python.PythonRunner$$anon$1.next(PythonRDD.scala:152)
at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:43)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more

Here is my code.

import csv
from StringIO import StringIO
from collections import namedtuple
from gmplot import *
Crime = ''
def loadData():
    sc = SparkContext.getOrCreate()
    filePath = '/Users/Mubin/SparkData/chicago.csv'
    return sc.textFile(filePath)
def parse(row):
    global Crime
    reader = csv.reader(StringIO(row))
    row = reader.next()
    return Crime(*row)
def prepareHeaders(header):
    return header.replace(' ', '_').replace('/', '_').split(',')

def createNamedTuple(header):
    return namedtuple('Crime', header, verbose=False)
def init():
    global Crime
    chicago = loadData()
    headers = chicago.first()
    #return chicago.first()
    woHeaders = chicago.filter(lambda x:x <> headers)
    #prepare headers[remove spaces, slaches] and convert to list.
    fields = prepareHeaders(headers)
    Crime = createNamedTuple(fields)
    #map header to tuples data to access properties as object.
    parsedData = woHeaders.map(parse)
    #return parsedData.take(1)
    #return parsedData.map(lambda x:x.Primary_Type).countByValue()
    #return parsedData.filter(lambda x:x.Primary_Type == 'BATTERY').map(lambda x:x.Year).countByValue()

    gmap = gmplot.GoogleMapPlotter(41.881832, -87.623177, 16).from_geocode('Chicago')
    lat = parsedData.filter(lambda x: x.Primary_Type == 'BATTERY' and x.Year == '2015')\
                .map(lambda x:float(x.Latitude)).collect()
    lng = parsedData.filter(lambda x: x.Primary_Type == 'BATTERY' and x.Year == '2015')\
               .map(lambda x:float(x.Longitude)).collect()

    t_lat = parsedData.filter(lambda x: x.Primary_Type == 'THEFT' and x.Year == '2015')\
                .map(lambda x:float(x.Latitude)).collect()
    t_lng = parsedData.filter(lambda x: x.Primary_Type == 'THEFT' and x.Year == '2015')\
               .map(lambda x:float(x.Longitude)).collect()

    gmap.scatter(lat, lng, '#DE1515', size=40, marker=False)
    gmap.scatter(t_lat, t_lng, '#8A15DE', size=40, marker=False)
    gmap.draw('crimefile.html')
init()

if I uncomment any return line, it just returns data fine, but I'm unable to draw maps anymore.

Thanks.

1
gmap.draw('crimefile.html') is a byte array or static file ? All request waiting a answer(or header). Try sending some data/header or redirector page on waiting time. None isn't acceptable value for java !dsgdfg
@dsgdfg: yesterday it was working fine and created file using gmap.draw('crimefile.html') now it is not; can you refer how can I pass header to gmap library before creating file?Mubin
Is x.Year a string or numeric type? i.e does this work lat = parsedData.filter(lambda x: x.Primary_Type == 'BATTERY' and x.Year == 2015).map(lambda x:float(x.Latitude)).collect()ImDarrenG
Alternatively is there a reason that the cast for your x.Latitude .map(lambda x:float(x.Latitude)).collect() - has it been parsed incorrectly or does a line in the file contain corrupt data?ImDarrenG

1 Answers

2
votes

The error you're getting is

ValueError: could not convert string to float

My best guess is that you're trying to convert string values to float without filtering empty values(I can't find it in your code). So in your code, do something like this for both Latitude and Longitude.

lambda x: x.Primary_Type == 'NARCOTICS' and x.Latitude != ''

Hope this will help.