1
votes

I'm running an out-of-the-box EMR cluster using Spark 1.6.0 and Zeppelin 0.5.6 on AWS. My goal is to get a simple Spark Streaming context initialized and pointed to an internal Kinesis stream just as a proof-of-concept. However, when I run it I get :

Py4JJavaError: An error occurred while calling o89.loadClass. : 
java.lang.ClassNotFoundException: org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

The code I'm running (via Zeppelin) is simply :

%pyspark
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

ssc = StreamingContext(sc, 1)

appName = '{my-app-name}'
streamName = '{my-stream-name}'
endpointUrl = '{my-endpoint}'
regionName = '{my-region}'

lines = KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2)

When I ran into this locally, I made sure to build spark-streaming-kinesis-asl from source and include these jars in my spark config :

spark.driver.extraClassPath /path/to/kinesis/asl/assembly/jars/*

However, I can't seem to get this to work when on EMR. To be safe, I including it in the following, to no avail :

spark.driver.extraClassPath
spark.driver.extraLibraryPath
spark.executor.extraClassPath
spark.executor.extraLibraryPath

Has anyone run into this before? I'm printing out the spark config when I restart the context to confirm that these changes are being picked up. Maybe this needs to be done on the slave nodes as well? Or perhaps another config option/key altogether?

1

1 Answers

2
votes

Add the dependency to zeppelin context "z". Heres an example of adding the sparkcsv package

%dep
z.load("com.databricks:spark-csv_2.11:1.3.0")