0
votes

As I'm new to Spark & Kafka. In Spark facing issues while I'm trying to consume data from Kafka topic. I'm getting the following error. Can somebody help me please?

In SBT project added all the dependencies: build.sbt file

name := "Sample_streaming"
     
version := "0.1"
     
scalaVersion := "2.11.12"
     
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.2"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % 2.3.2"
libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % "2.0.0" % Test
libraryDependencies += "com.github.harbby" % "spark-sql-kafka-0-8" % "1.0.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
// libraryDependencies += "org.slf4j" % "slf4j-log4j12" % "1.7.16" % Test
libraryDependencies += "commons-logging" % "commons-logging" % "1.2"
dependencyOverrides ++= {
  Seq(
    "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.6.7.1",
    "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7",
    "com.fasterxml.jackson.core" % "jackson-core" % "2.6.7"
  )
}

'kafka_stream.scala '

package com.spark.learning
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
     
object kafkar {
    def main(args: Array[String]) {
        Logger.getLogger("org").setLevel(Level.OFF)
        Logger.getLogger("akka").setLevel(Level.OFF)

        println("program started")

        val conf = new SparkConf().setMaster("spark://192.1xx.x.xxx:7077").setAppName("kafkar")
        println("master set")
        val ssc = new StreamingContext(conf, Seconds(2))
        println("Streaming Context set")

        val kafkaStream = KafkaUtils.createStream(ssc, "192.xxx.x.xxx:2181", "test-consumer-group", Map("demo1" -> 5))
        println("Kafka stream set")

        kafkaStream.print()
        println("Kafka stream printed")
        ssc.start
        ssc.awaitTermination()
    }
}

When I run above SBT, I get some exceptions/errors :

[root@hadoop-single Sample_streaming]# sbt compile
   [info] welcome to sbt 1.3.13 (Oracle Corporation Java 1.8.0_252)
    [info] loading project definition from /root/Sample_streaming/project
    [info] loading settings for project sample_streaming from build.sbt ...
    [info] set current project to Sample_streaming (in build file:/root/Sample_streaming/)
    [info] running com.spark.learning.kafkar 
    program started
    master set
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    20/07/16 06:00:48 INFO SparkContext: Running Spark version 2.3.2
    20/07/16 06:00:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    20/07/16 06:00:54 INFO SecurityManager: Changing modify acls groups to: 
    20/07/16 06:00:54 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
    20/07/16 06:00:57 INFO Utils: Successfully started service 'sparkDriver' on port 46733.
    20/07/16 06:00:57 INFO SparkEnv: Registering MapOutputTracker
    20/07/16 06:00:57 INFO SparkEnv: Registering BlockManagerMaster
    20/07/16 06:00:57 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
    20/07/16 06:00:57 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
    20/07/16 06:00:57 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-xxxxx-xxxx-xxxx
    20/07/16 06:00:57 INFO MemoryStore: MemoryStore started with capacity 415.5 MB
    20/07/16 06:00:58 INFO SparkEnv: Registering OutputCommitCoordinator
    20/07/16 06:00:59 INFO Utils: Successfully started service 'SparkUI' on port 4040.
    20/07/16 06:00:59 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://hadoop-single.accesshost.internal:4040
    20/07/16 06:01:01 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://192.1xx.x.xx:7077...
    20/07/16 06:01:01 INFO TransportClientFactory: Successfully created connection to /192.1xx.x.xx:7077 after 401 ms (0 ms spent in bootstraps)
    20/07/16 06:01:02 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20200716060102-0010
    20/07/16 06:01:02 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44965.
    20/07/16 06:01:02 INFO NettyBlockTransferService: Server created on hadoop-single.accesshost.internal:44965
    20/07/16 06:01:02 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
    20/07/16 06:01:03 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, hadoop-single.accesshost.internal, 44965, None)
    20/07/16 06:01:03 INFO BlockManagerMasterEndpoint: Registering block manager hadoop-single.accesshost.internal:44965 with 415.5 MB RAM, BlockManagerId(driver, hadoop-single.accesshost.internal, 44965, None)
    20/07/16 06:01:03 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, hadoop-single.accesshost.internal, 44965, None)
    20/07/16 06:01:03 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, hadoop-single.accesshost.internal, 44965, None)
    20/07/16 06:01:04 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
    Streaming Context set
    [error] (run-main-0) java.lang.NoClassDefFoundError: org/apache/spark/Logging
    [error] java.lang.NoClassDefFoundError: org/apache/spark/Logging
    [error]     at java.lang.ClassLoader.defineClass1(Native Method)
    [error]     at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    [error]     at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    [error]     at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    [error]     at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    [error]     at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    [error]     at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    [error]     at java.security.AccessController.doPrivileged(Native Method)
    [error]     at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    [error] Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
    [error]     at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    [error] stack trace is suppressed; run last Compile / bgRun for the full output
    20/07/16 06:01:05 ERROR Utils: uncaught error in thread spark-listener-group-shared, stopping SparkContext
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:88)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
    org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:181)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
        at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
        at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73)
    20/07/16 06:01:05 ERROR Utils: uncaught error in thread spark-listener-group-appStatus, stopping SparkContext
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
    20/07/16 06:01:05 ERROR Utils: uncaught error in thread spark-listener-group-executorManagement, stopping SparkContext
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
    20/07/16 06:01:05 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-executorManagement
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
    20/07/16 06:01:05 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-appStatus
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
     org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
    20/07/16 06:01:05 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-shared
    java.lang.InterruptedException
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
        at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
        at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
    20/07/16 06:01:05 INFO SparkContext: SparkContext already stopped.
    20/07/16 06:01:05 INFO SparkContext: SparkContext already stopped.
    20/07/16 06:01:05 INFO SparkUI: Stopped Spark web UI at http://hadoop-single.accesshost.internal:4040
    20/07/16 06:01:05 INFO StandaloneSchedulerBackend: Shutting down all executors
    [error] Nonzero exit code: 1
    [error] (Compile / run) Nonzero exit code: 1
    [error] Total time: 41 s, completed Jul 16, 2020 6:01:06 AM
    20/07/16 06:01:06 INFO DiskBlockManager: Shutdown hook called
    20/07/16 06:01:06 INFO ShutdownHookManager: Shutdown hook called
    20/07/16 06:01:06 INFO ShutdownHookManager: Deleting directory /tmp/spark-xxxx
    20/07/16 06:01:06 INFO ShutdownHookManager: Deleting directory /tmp/spark-xxxx/userFiles-xxxx
2

2 Answers

0
votes

It looks like you are using few extra dependencies which are redundant and few dependencies are old one. Please use below code to consume data from Kafka topic.

Dependencies

private lazy val sparkStreamingVersion = "2.3.4"   
private lazy val kafkaVersion = "2.3.0"
private lazy val excludeJpountz = ExclusionRule(organization = "net.jpountz.lz4", name = "lz4")

private lazy val sparkStreaming = "org.apache.spark" %% "spark-streaming" % sparkStreamingVersion
  private lazy val sparkStreamingKafka = "org.apache.spark" %% "spark-streaming-kafka-0-10" % kafkaVersion excludeAll excludeJpountz

 libraryDependencies ++= Seq(
    sparkStreaming,
    sparkStreamingKafka
  )

Below should work fine.

def main(args: Array[String]): Unit ={

    val conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreaming with Kafka")
    // Streaming Context with batch interval = 10 seconds
    val streamingContext = new StreamingContext(conf, Seconds(10))

    val bootStrapServers = "bootstrap_server_names"

    // Kafka Parameters
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> bootStrapServers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("topic_name")
    
    // Direct Kafka Stream
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    stream.map{record => (record.topic(), record.value(), record.partition(), record.offset())}.print()

    streamingContext.start() // Start the computation
    streamingContext.awaitTermination() // Wait for the computation to terminate
  }

Here we can use Direct Kafka Stream instead of createStream. KafkaUtils.createDirectStream is a new receiver-less “direct” approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees.Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch.

@param locationStratergy - Below are possible location strategies.

 1. PreferConsistent - This will distribute partitions evenly across available 
     executors. 
 2. PreferBrokers - If your executors are on the same hosts as your Kafka 
     brokers, use PreferBrokers, which will prefer to schedule partitions on the 
     Kafka leader for that partition. 
 3. PreferFixed - if you have a significant skew in load among partitions, 
     use PreferFixed. This allows you to specify an explicit mapping of partitions 
     to hosts (any unspecified partitions will use a consistent location).

@param ConsumerStratergy -

 1. Subscribe - Allows you to subscribe to a fixed collection of topics.
 2. SubscribePattern - Allows you to use Regex to specify topics of interest.
 3. Allign - Allows you to specify a fixed collection of partitions.