4
votes

I get the following error:

Py4JError(u'An error occurred while calling o73.createDirectStreamWithoutMessageHandler. Trace:\npy4j.Py4JException: Method createDirectStreamWithoutMessageHandler([class org.apache.spark.streaming.api.java.JavaStreamingContext, class java.util.HashMap, class java.util.HashSet, class java.util.HashMap]) does not exist\n\tat py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)\n\tat py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)\n\tat py4j.Gateway.invoke(Gateway.java:252)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:209)\n\tat java.lang.Thread.run(Thread.java:745)\n\n',)

I am using spark-streaming-kafka-assembly_2.10-1.6.0.jar (which is present in the /usr/lib/hadoop/lib/ folder on all my nodes + master)

(EDIT) The actual error was: java.lang.NoSuchMethodError: org.apache.hadoop.yarn.util.Apps.crossPlatformify(Ljava/lang/String;)Ljava/lang/String;

This was due to a wrong hadoop version. Therefore spark should be compiled with the correct hadoop version:

mvn -Phadoop-2.6 -Dhadoop.version=2.7.2 -DskipTests clean package

This will result in a jar in the external/kafka-assembly/target folder.

1
when was your cluster created / did you pass in any --image-version flags when creating it? Can you give some context on how you're invoking KafkaUtils.createStream() (or, alternatively, how you're calling underlying methods)?Angus Davis
The cluster was created only yesterday, with image version 1.0 (with spark 1.6). At the moment. we still use image version 0.1 (spark 1.5.0) with spark-streaming-kafka-assembly_2.10-1.5.0.jar, and also image version 0.2 with the same jar, as the 1.5.2 jar did not work their either. For the newest release however, none of the jars seem to work. The error is thrown when using the KafkaUtils.createDirectStream method. When I run the code locally with spark 1.6 and jar spark-streaming-kafka-assembly_2.10-1.6.0.jar (with the --jars option via spark-submit) the code seems to run perfectly.bjorndv
I've been trying to replicate this this afternoon and haven't had much luck. Is there any chance that Spark 1.5 made its way onto your Dataproc 1.0 cluster with Spark 1.6 (e.g., spark-1.6 and spark-1.5 were both installed, or alternatively, spark-1.5 was packaged with your job)? I've gone through each spark streaming kafka jar I could get my hands on (and built an assembly jar from source) and after unpacking checked out the KafkaUtilsPythonHelper to ensure that the createDirectStreamWithoutMessageHandler method exists with the correct signature.Angus Davis
My init script only installs some python packages, so I don't think that's possible. As I said before, it worked locally, so the jar shouldn't be a problem. Could it be that the jar can't be found anymore when it is placed in the /usr/lib/hadoop/lib/ folder ? I would expect another error then though... Also, the --jars option does not seem to work when using gcloud dataproc jobs submit pyspark, do I need to perform an update different from gcloud components update?bjorndv
Feel free to email me, my SO name without spaces at google.com, if the below doesn't work for you and we can try to figure what's going on with your installation.Angus Davis

1 Answers

1
votes

Using image version 1, I've successfully run the pyspark streaming / kafka example wordcount

In each of these examples "ad-kafka-inst" is my test kafka instance with a 'test' topic.

  1. Using a cluster with no initialization actions:

    $ gcloud dataproc jobs submit pyspark --cluster ad-kafka2 --properties spark.jars.packages=org.apache.spark:spark-streaming-kafka_2.10:1.6.0 ./kafka_wordcount.py ad-kafka-inst:2181 test 
    
  2. Using initialization actions with a full kafka assembly:

    • Download / unpack spark-1.6.0.tgz
    • Build with:

      $ mvn -Phadoop-2.6 -Dhadoop.version=2.7.2 package
      
    • Upload spark-streaming-kafka-assembly_2.10-1.6.0.jar to a new GCS bucket (MYBUCKET for example).
    • Create the following initialization action in the same GCS bucket (e.g., gs://MYBUCKET/install_spark_kafka.sh):

      $ #!/bin/bash
      
      gsutil cp gs://MY_BUCKET/spark-streaming-kafka-assembly_2.10-1.6.0.jar /usr/lib/hadoop/lib/
      chmod 755 /usr/lib/hadoop/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar 
      
    • Start a cluster with the above initialization action:

      $ gcloud dataproc clusters create ad-kafka-init --initialization-actions gs://MYBUCKET/install_spark_kafka.sh
      
    • Start the streaming word count:

      $ gcloud dataproc jobs submit pyspark --cluster ad-kafka-init ./kafka_wordcount.py ad-kafka-inst:2181 test