As I'm new to Spark & Kafka. In Spark facing issues while I'm trying to consume data from Kafka topic. I'm getting the following error. Can somebody help me please?
In SBT project added all the dependencies: build.sbt file
name := "Sample_streaming"
version := "0.1"
scalaVersion := "2.11.12"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.2"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % 2.3.2"
libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % "2.0.0" % Test
libraryDependencies += "com.github.harbby" % "spark-sql-kafka-0-8" % "1.0.1"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
// libraryDependencies += "org.slf4j" % "slf4j-log4j12" % "1.7.16" % Test
libraryDependencies += "commons-logging" % "commons-logging" % "1.2"
dependencyOverrides ++= {
Seq(
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.6.7.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7",
"com.fasterxml.jackson.core" % "jackson-core" % "2.6.7"
)
}
'kafka_stream.scala '
package com.spark.learning
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object kafkar {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
println("program started")
val conf = new SparkConf().setMaster("spark://192.1xx.x.xxx:7077").setAppName("kafkar")
println("master set")
val ssc = new StreamingContext(conf, Seconds(2))
println("Streaming Context set")
val kafkaStream = KafkaUtils.createStream(ssc, "192.xxx.x.xxx:2181", "test-consumer-group", Map("demo1" -> 5))
println("Kafka stream set")
kafkaStream.print()
println("Kafka stream printed")
ssc.start
ssc.awaitTermination()
}
}
When I run above SBT, I get some exceptions/errors :
[root@hadoop-single Sample_streaming]# sbt compile
[info] welcome to sbt 1.3.13 (Oracle Corporation Java 1.8.0_252)
[info] loading project definition from /root/Sample_streaming/project
[info] loading settings for project sample_streaming from build.sbt ...
[info] set current project to Sample_streaming (in build file:/root/Sample_streaming/)
[info] running com.spark.learning.kafkar
program started
master set
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/07/16 06:00:48 INFO SparkContext: Running Spark version 2.3.2
20/07/16 06:00:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/07/16 06:00:54 INFO SecurityManager: Changing modify acls groups to:
20/07/16 06:00:54 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
20/07/16 06:00:57 INFO Utils: Successfully started service 'sparkDriver' on port 46733.
20/07/16 06:00:57 INFO SparkEnv: Registering MapOutputTracker
20/07/16 06:00:57 INFO SparkEnv: Registering BlockManagerMaster
20/07/16 06:00:57 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/07/16 06:00:57 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/07/16 06:00:57 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-xxxxx-xxxx-xxxx
20/07/16 06:00:57 INFO MemoryStore: MemoryStore started with capacity 415.5 MB
20/07/16 06:00:58 INFO SparkEnv: Registering OutputCommitCoordinator
20/07/16 06:00:59 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/07/16 06:00:59 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://hadoop-single.accesshost.internal:4040
20/07/16 06:01:01 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://192.1xx.x.xx:7077...
20/07/16 06:01:01 INFO TransportClientFactory: Successfully created connection to /192.1xx.x.xx:7077 after 401 ms (0 ms spent in bootstraps)
20/07/16 06:01:02 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20200716060102-0010
20/07/16 06:01:02 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44965.
20/07/16 06:01:02 INFO NettyBlockTransferService: Server created on hadoop-single.accesshost.internal:44965
20/07/16 06:01:02 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/07/16 06:01:03 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, hadoop-single.accesshost.internal, 44965, None)
20/07/16 06:01:03 INFO BlockManagerMasterEndpoint: Registering block manager hadoop-single.accesshost.internal:44965 with 415.5 MB RAM, BlockManagerId(driver, hadoop-single.accesshost.internal, 44965, None)
20/07/16 06:01:03 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, hadoop-single.accesshost.internal, 44965, None)
20/07/16 06:01:03 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, hadoop-single.accesshost.internal, 44965, None)
20/07/16 06:01:04 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
Streaming Context set
[error] (run-main-0) java.lang.NoClassDefFoundError: org/apache/spark/Logging
[error] java.lang.NoClassDefFoundError: org/apache/spark/Logging
[error] at java.lang.ClassLoader.defineClass1(Native Method)
[error] at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
[error] at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
[error] at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
[error] at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
[error] at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
[error] at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
[error] at java.security.AccessController.doPrivileged(Native Method)
[error] at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
[error] Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
[error] at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
[error] stack trace is suppressed; run last Compile / bgRun for the full output
20/07/16 06:01:05 ERROR Utils: uncaught error in thread spark-listener-group-shared, stopping SparkContext
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:88)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:181)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)
at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73)
20/07/16 06:01:05 ERROR Utils: uncaught error in thread spark-listener-group-appStatus, stopping SparkContext
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/07/16 06:01:05 ERROR Utils: uncaught error in thread spark-listener-group-executorManagement, stopping SparkContext
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/07/16 06:01:05 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-executorManagement
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply$mcJ$sp(AsyncEventQueue.scala:97)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch$1.apply(AsyncEventQueue.scala:87)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/07/16 06:01:05 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-appStatus
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/07/16 06:01:05 ERROR Utils: throw uncaught fatal error in thread spark-listener-group-shared
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1$$anonfun$run$1.apply$mcV$sp(AsyncEventQueue.scala:83)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1323)
at org.apache.spark.scheduler.AsyncEventQueue$$anon$1.run(AsyncEventQueue.scala:82)
20/07/16 06:01:05 INFO SparkContext: SparkContext already stopped.
20/07/16 06:01:05 INFO SparkContext: SparkContext already stopped.
20/07/16 06:01:05 INFO SparkUI: Stopped Spark web UI at http://hadoop-single.accesshost.internal:4040
20/07/16 06:01:05 INFO StandaloneSchedulerBackend: Shutting down all executors
[error] Nonzero exit code: 1
[error] (Compile / run) Nonzero exit code: 1
[error] Total time: 41 s, completed Jul 16, 2020 6:01:06 AM
20/07/16 06:01:06 INFO DiskBlockManager: Shutdown hook called
20/07/16 06:01:06 INFO ShutdownHookManager: Shutdown hook called
20/07/16 06:01:06 INFO ShutdownHookManager: Deleting directory /tmp/spark-xxxx
20/07/16 06:01:06 INFO ShutdownHookManager: Deleting directory /tmp/spark-xxxx/userFiles-xxxx