0
votes

I am using the Apache-Spark service at bluemix. I am currently having trouble accessing my Object Storage through spark-submit.

I know that the file exists and is accessible through the swift on the jupyter notebook. I use the following to verify:

file_name = "swift://notebooks.spark/small.verbatim"
text_file = sc.textFile(file_name)
print "number of verbatims", text_file.count()  

and the output is :

number of verbatims 100 

but when i try to do the same with the spark-submit, i get an error.

This is the code that I am submitting through spark-submit:

import sys, traceback
from pymongo import MongoClient
import time
from datetime import datetime
from pyspark import SparkContext
sc = SparkContext('local', 'Schedule Insight Extractor')

try:
    file_name = "swift://notebooks.spark/small.verbatim"  # small dataset
    text_file = sc.textFile(file_name)
    r = None
    r = "number of verbatims", text_file.count()
except:
    e = sys.exc_info()[0]
    print ("ERROR %s", e)
    traceback.print_exc(file=sys.stdout)

And this throws the following exception:

Traceback (most recent call last):
  File "/gpfs/fs01/user/sf6d-7c3a9c08343577-05540e1c503a/data/workdir/spark-driver-cece5080-17dd-48e4-9036-52788e5a7b77/test_spark_submit.py", line 20, in <module>
    r = "number of verbatims", text_file.count()
  File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/rdd.py", line 1004, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/rdd.py", line 995, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/rdd.py", line 869, in fold
    vals = self.mapPartitions(func).collect()
  File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/pyspark.zip/pyspark/rdd.py", line 771, in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/src/spark160master/spark-1.6.0-bin-2.6.0/python/lib/py4j-0.9-src.zip/py4j/protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.lang.NullPointerException
        at org.apache.commons.httpclient.HttpMethodBase.getStatusCode(HttpMethodBase.java:570)
        at org.apache.hadoop.fs.swift.exceptions.SwiftInvalidResponseException.<init>(SwiftInvalidResponseException.java:53)
        at org.apache.hadoop.fs.swift.http.SwiftRestClient.buildException(SwiftRestClient.java:1827)
        at org.apache.hadoop.fs.swift.http.SwiftRestClient.perform(SwiftRestClient.java:1728)
        at org.apache.hadoop.fs.swift.http.SwiftRestClient.perform(SwiftRestClient.java:1662)
        at org.apache.hadoop.fs.swift.http.SwiftRestClient.authenticate(SwiftRestClient.java:1154)
        at org.apache.hadoop.fs.swift.http.SwiftRestClient.authIfNeeded(SwiftRestClient.java:1618)
        at org.apache.hadoop.fs.swift.http.SwiftRestClient.preRemoteCommand(SwiftRestClient.java:1634)
        at org.apache.hadoop.fs.swift.http.SwiftRestClient.headRequest(SwiftRestClient.java:1085)
        at org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystemStore.stat(SwiftNativeFileSystemStore.java:258)
        at org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystemStore.getObjectMetadata(SwiftNativeFileSystemStore.java:213)
        at org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystemStore.getObjectMetadata(SwiftNativeFileSystemStore.java:182)
        at org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem.getFileStatus(SwiftNativeFileSystem.java:174)
        at org.apache.hadoop.fs.Globber.getFileStatus(Globber.java:57)
        at org.apache.hadoop.fs.Globber.glob(Globber.java:252)
        at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
        at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:257)
        at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
        at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313)
        at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:58)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
        at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1934)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
        at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
        at java.lang.reflect.Method.invoke(Method.java:507)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
        at py4j.Gateway.invoke(Gateway.java:259)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:209)
        at java.lang.Thread.run(Thread.java:785)

What am i doing wrong ? I am supposed to use the SwiftClient ? Is the url not well structured ?

2
You have a NullPointerException. Can you tell us which is the line where it is thrown?Lajos Arpad
@LajosArpad the exception is on the file 'r = "number of verbatims", text_file.count()', when it is trying to "collect" the data. With RDD's I noticed, that it does not throw an error if the "file does not exist", but it throws when you try to do some kind of operation on RDD from "file that does not exist"Codious-JR
Are you sure that the user running the code has the necessary privileges to read the file and all the folders which contain it?Lajos Arpad
I have a runnable example of object storage access here: github.com/snowch/bluemix-spark-examplesChris Snow
@SHC I would be very interested to see your code. I am not fluent in Java, I can't seem to find the file which has the code doing the ObjectStorage communication. From what I understand is that you have already 'packaged' your app into a Jar file, so the code is already in binary format. It is possible for me to take that apart to see the code that does the magic ? Thank youCodious-JR

2 Answers

2
votes

You are trying access the notebooks container using default hadoop configuration of 'spark' that can be used in your Notebook Enviornment on Bluemix Service.

With spark-submit, you are actually invoking a new spark context to work with. You would need explicitly set the new object storage credentials:-

def set_hadoop_config(creds):

prefix = "fs.swift.service." + creds['name']
hconf = sc._jsc.hadoopConfiguration()
hconf.set(prefix + ".auth.url", creds['auth_url'] + '/v2.0/tokens')
hconf.set(prefix + ".auth.endpoint.prefix", "endpoints")
hconf.set(prefix + ".tenant", creds['project_id'])
hconf.set(prefix + ".username", creds['user_id'])
hconf.set(prefix + ".password", creds['password'])
hconf.setInt(prefix + ".http.port", 8080)
hconf.set(prefix + ".region", creds['region'])
hconf.setBoolean(prefix + ".public", True)

ObjectSCredentials = {

'username': 'XXXXXXXXXXX',
'password': 'XXXXXXXXXXX',
'auth_url': 'https://identity.open.softlayer.com',
'project': 'XXXXXXXXXXX',
'project_id': 'XXXXXXXXXXX',
'region': 'dallas',
'user_id': 'XXXXXXXXXXX',
'domain_id': 'XXXXXXXXXXX',
'domain_name': 'XXXXXXXXXXX',
'filename': 'small.verbatim',
'container': 'notebooks',
'tenantId': ‘XXXXXXXXX'

}

ObjectSCredentials['name'] = ’TEST'

set_hadoop_config(ObjectSCredentials)

rdddata = sc.textFile("swift://notebooks." + ObjectSCredentials['name'] + “/small.verbatim")

Thanks,

Charles.

1
votes

Can you use the following for auth url property: Replace

hconf.set(prefix + ".auth.url", creds['auth_url'] + '/v2.0/tokens')

WITH

hconf.set(prefix + ".auth.url", creds['auth_url']+'/v3/auth/tokens')