0
votes

I am new to Kafka and trying to implement Kafka consumer logic in spark2 and when I run all my code in the shell and start the streaming it shows nothing.

I have viewed many posts in StackOverflow but nothing helped me. I have even downloaded all the dependency jars from maven and tried to run but it still shows nothing.

Spark Version: 2.2.0 Scala version 2.11.8 jars I downloaded are kafka-clients-2.2.0.jar and spark-streaming-kafka-0-10_2.11-2.2.0.jar

but it still I face the same issue.

Please find the below code snippet

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.{StreamingContext, Seconds}
import org.apache.spark.streaming.kafka010.{KafkaUtils, ConsumerStrategies, LocationStrategies}

val brokers = "host1:port, host2:port"
val groupid = "default"
val topics = "kafka_sample"
val topicset = topics.split(",").toSet

val ssc = new StreamingContext(sc, Seconds(2))

val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  ConsumerConfig.GROUP_ID_CONFIG -> groupid,
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
)

val msg = KafkaUtils.createDirectStream[String, String](
ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicset, kafkaParams)
)

msg.foreachRDD{
rdd => rdd.collect().foreach(println)
}
ssc.start()

I am expecting SparkStreaming to start but it doesn't do anything. What mistake have I done here? Or is this a known issue?

2
Did you include ssc.awaitTermination() ?Piyush Patel
try using assign specific partition which think have messages as below . ConsumerStrategies.Assign[String, Array[Byte]](fromOffsets.keys,kafkaParams,fromOffsets))...Nitin
I was trying it in the shell so once I do ssc.start() it should trigger streaming but nothing shows up. it is not evening showing any logs or streaming timestampAbhishek Allamsetty
One more question are u able to consume message using simple consumer?Nitin
Yes I am able to consume in CLIAbhishek Allamsetty

2 Answers

1
votes

The driver will be sitting idle unless you call ssc.awaitTermination() at the end. If you're using spark-shell then it's not a good tool for streaming jobs. Please, use interactive tools like Zeppelin or Spark notebook for interacting with streaming or try building your app as jar file and then deploy.

Also, if you're trying out spark streaming, Structured Streaming would be better as it is quite easy to play with.

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

0
votes
  1. After ssc.start() use ssc.awaitTermination() in your code.
  2. For testing, write your code in a Main Object and run it in any IDE like Intellij
  3. You can use command shell and publish messages from the Kafka producer.

I have written all these steps in a simple example in a blog post with working code in GitHub. Please refer to: http://softwaredevelopercentral.blogspot.com/2018/10/spark-streaming-and-kafka-integration.html