I'm using kafka 1.0.1-kafka-3.1.0-SNAPSHOT
from CDH (cloudera distribution for hadoop)
On my batch-1 edge server, I can produce messages with :
kafka-console-producer --broker-list batch-1:9092 --topic MyTopic
I can consume messages thanks to Zookeeper on my first node with :
kafka-console-consumer --zookeeper data1:2181 --topic MyTopic --from-beginning
But I get nothing with bootstrap-server option :
kafka-console-consumer --bootstrap-server batch-1:9092 --topic MyTopic --from-beginning
The problem is i'm using kafka on spark :
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.0"
val df = spark.readStream
.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.option("kafka.bootstrap.servers", "batch-1:9092")
.option("subscribe", "MyTopic")
.load()
println("Select :")
val df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)")
.as[(String, String, String)]
println("Show :")
val query = df2.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
I did an export SPARK_KAFKA_VERSION=0.10
on my edge. Then
spark2-submit --driver-memory 2G --jars spark-sql-kafka-0-10_2.11-2.3.0.cloudera4.jar --class "spark.streaming.Poc" poc_spark_kafka_2.11-0.0.1.jar
This force me to use kafka.bootstrap.servers
, it seems to be connected, but I can't get any message.
The output is the same as the kafka-console-consumer
with --bootstrap-server
option :
18/10/30 16:11:48 INFO utils.AppInfoParser: Kafka version : 0.10.0-kafka-2.1.0
18/10/30 16:11:48 INFO utils.AppInfoParser: Kafka commitId : unknown
18/10/30 16:11:48 INFO streaming.MicroBatchExecution: Starting new streaming query.
Then, nothing. Should I connect to Zookeeper ? How ?
Is there a version conflict whereas they said "Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)" here : https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html ?
What did I missed ?