I'm running an out-of-the-box EMR cluster using Spark 1.6.0 and Zeppelin 0.5.6 on AWS. My goal is to get a simple Spark Streaming context initialized and pointed to an internal Kinesis stream just as a proof-of-concept. However, when I run it I get :
Py4JJavaError: An error occurred while calling o89.loadClass. :
java.lang.ClassNotFoundException: org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
The code I'm running (via Zeppelin) is simply :
%pyspark
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
ssc = StreamingContext(sc, 1)
appName = '{my-app-name}'
streamName = '{my-stream-name}'
endpointUrl = '{my-endpoint}'
regionName = '{my-region}'
lines = KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2)
When I ran into this locally, I made sure to build spark-streaming-kinesis-asl from source and include these jars in my spark config :
spark.driver.extraClassPath /path/to/kinesis/asl/assembly/jars/*
However, I can't seem to get this to work when on EMR. To be safe, I including it in the following, to no avail :
spark.driver.extraClassPath
spark.driver.extraLibraryPath
spark.executor.extraClassPath
spark.executor.extraLibraryPath
Has anyone run into this before? I'm printing out the spark config when I restart the context to confirm that these changes are being picked up. Maybe this needs to be done on the slave nodes as well? Or perhaps another config option/key altogether?