I am using the Apache-Spark service at bluemix. I am currently having trouble accessing my Object Storage through spark-submit.
I know that the file exists and is accessible through the swift on the jupyter notebook. I use the following to verify:
file_name = "swift://notebooks.spark/small.verbatim"
text_file = sc.textFile(file_name)
print "number of verbatims", text_file.count()
and the output is :
number of verbatims 100
but when i try to do the same with the spark-submit, i get an error.
This is the code that I am submitting through spark-submit:
import sys, traceback
from pymongo import MongoClient
import time
from datetime import datetime
from pyspark import SparkContext
sc = SparkContext('local', 'Schedule Insight Extractor')
try:
file_name = "swift://notebooks.spark/small.verbatim" # small dataset
text_file = sc.textFile(file_name)
r = None
r = "number of verbatims", text_file.count()
except:
e = sys.exc_info()[0]
print ("ERROR %s", e)
traceback.print_exc(file=sys.stdout)
And this throws the following exception:
Traceback (most recent call last):
File "/gpfs/fs01/user/sf6d-7c3a9c08343577-05540e1c503a/data/workdir/spark-driver-cece5080-17dd-48e4-9036-52788e5a7b77/test_spark_submit.py", line 20, in <module>
r = "number of verbatims", text_file.count()
File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/rdd.py", line 1004, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/rdd.py", line 995, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/rdd.py", line 869, in fold
vals = self.mapPartitions(func).collect()
File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/rdd.py", line 771, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.NullPointerException
at org.apache.commons.httpclient.HttpMethodBase.getStatusCode(HttpMethodBase.java:570)
at org.apache.hadoop.fs.swift.exceptions.SwiftInvalidResponseException.<init>(SwiftInvalidResponseException.java:53)
at org.apache.hadoop.fs.swift.http.SwiftRestClient.buildException(SwiftRestClient.java:1827)
at org.apache.hadoop.fs.swift.http.SwiftRestClient.perform(SwiftRestClient.java:1728)
at org.apache.hadoop.fs.swift.http.SwiftRestClient.perform(SwiftRestClient.java:1662)
at org.apache.hadoop.fs.swift.http.SwiftRestClient.authenticate(SwiftRestClient.java:1154)
at org.apache.hadoop.fs.swift.http.SwiftRestClient.authIfNeeded(SwiftRestClient.java:1618)
at org.apache.hadoop.fs.swift.http.SwiftRestClient.preRemoteCommand(SwiftRestClient.java:1634)
at org.apache.hadoop.fs.swift.http.SwiftRestClient.headRequest(SwiftRestClient.java:1085)
at org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystemStore.stat(SwiftNativeFileSystemStore.java:258)
at org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystemStore.getObjectMetadata(SwiftNativeFileSystemStore.java:213)
at org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystemStore.getObjectMetadata(SwiftNativeFileSystemStore.java:182)
at org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem.getFileStatus(SwiftNativeFileSystem.java:174)
at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
at org.apache.hadoop.fs.Globber.glob(Globber.java:252)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:58)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1934)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:507)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:785)
What am i doing wrong ? I am supposed to use the SwiftClient ? Is the url not well structured ?