1
votes

I am running the script via this command

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 direct_kafka_wordcount.py localhost 9092 

I am unable to connect my Kafka topic and retrieve information. I have tried everything but no luck. I am running this simple code of wordcount of my live Kafka stream.

Ivy Default Cache set to: /home/sagar/.ivy2/cache The jars for the packages stored in: /home/sagar/.ivy2/jars :: loading settings :: url = jar:file:/usr/local/spark-2.4.3-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.apache.spark#spark-streaming-kafka-0-10_2.11 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-be411cc2-fb3f-4049-b222-e3eca55e020b;1.0 confs: [default] found org.apache.spark#spark-streaming-kafka-0-10_2.11;2.2.0 in central found org.apache.kafka#kafka_2.11;0.10.0.1 in central found com.101tec#zkclient;0.8 in central found org.slf4j#slf4j-api;1.7.16 in central found org.slf4j#slf4j-log4j12;1.7.16 in central found log4j#log4j;1.2.17 in central found com.yammer.metrics#metrics-core;2.2.0 in central found org.scala-lang.modules#scala-parser-combinators_2.11;1.0.4 in central found org.apache.kafka#kafka-clients;0.10.0.1 in central found net.jpountz.lz4#lz4;1.3.0 in central found org.xerial.snappy#snappy-java;1.1.2.6 in central found org.spark-project.spark#unused;1.0.0 in central :: resolution report :: resolve 1491ms :: artifacts dl 9ms :: modules in use: com.101tec#zkclient;0.8 from central in [default] com.yammer.metrics#metrics-core;2.2.0 from central in [default] log4j#log4j;1.2.17 from central in [default] net.jpountz.lz4#lz4;1.3.0 from central in [default] org.apache.kafka#kafka-clients;0.10.0.1 from central in [default] org.apache.kafka#kafka_2.11;0.10.0.1 from central in [default] org.apache.spark#spark-streaming-kafka-0-10_2.11;2.2.0 from central in [default] org.scala-lang.modules#scala-parser-combinators_2.11;1.0.4 from central in [default] org.slf4j#slf4j-api;1.7.16 from central in [default] org.slf4j#slf4j-log4j12;1.7.16 from central in [default] org.spark-project.spark#unused;1.0.0 from central in [default] org.xerial.snappy#snappy-java;1.1.2.6 from central in [default] --------------------------------------------------------------------- | | modules || artifacts | | conf | number| search|dwnlded|evicted|| number|dwnlded| --------------------------------------------------------------------- | default | 12 | 1 | 1 | 0 || 12 | 0 | --------------------------------------------------------------------- :: retrieving :: org.apache.spark#spark-submit-parent-be411cc2-fb3f-4049-b222-e3eca55e020b confs: [default] 0 artifacts copied, 12 already retrieved (0kB/8ms) 19/07/09 14:28:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Traceback (most recent call last): File "/usr/local/spark-2.4.3-bin-hadoop2.7/examples/src/main/python/streaming/direct_kafka_wordcount.py", line 48, in kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) File "/usr/local/spark/python/lib/pyspark.zip/pyspark/streaming/kafka.py", line 146, in createDirectStream File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o26.createDirectStreamWithoutMessageHandler. : org.apache.spark.SparkException: Broker not in the correct format of : [localhost] at org.apache.spark.streaming.kafka.KafkaCluster$SimpleConsumerConfig$$anonfun$7.apply(KafkaCluster.scala:390) at org.apache.spark.streaming.kafka.KafkaCluster$SimpleConsumerConfig$$anonfun$7.apply(KafkaCluster.scala:387) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.streaming.kafka.KafkaCluster$SimpleConsumerConfig.(KafkaCluster.scala:387) at org.apache.spark.streaming.kafka.KafkaCluster$SimpleConsumerConfig$.apply(KafkaCluster.scala:422) at org.apache.spark.streaming.kafka.KafkaCluster.config(KafkaCluster.scala:53) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:130) at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:119) at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211) at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:720) at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:688) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

1
looks like there should be more logs about the exception typeiurii_n
caused by ... ?aran
I have updated my logs please have a look again.sagar pawar

1 Answers

1
votes

Bad syntax, try this (check the kafka broker host part):

spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 
direct_kafka_wordcount.py localhost:9092

In general terms, connecting to kafka's bootstrap servers always requires a host:port syntax.