0
votes

I am new in scala/Spark development. I have created a simple streaming application from Kafka topic using sbt and scala. I have the following code

build.sbt

name := "kafka-streaming"

version := "1.0"

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

assemblyMergeStrategy in assembly := {
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case PathList(pl @ _*) if pl.contains("log4j.properties") => MergeStrategy.concat
  case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

scalaVersion := "2.11.8"

resolvers += "jitpack" at "https://jitpack.io"

// still want to be able to run in sbt
// https://github.com/sbt/sbt-assembly#-provided-configuration
run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))

fork in run := true
javaOptions in run ++= Seq(
  "-Dlog4j.debug=true",
  "-Dlog4j.configuration=log4j.properties")

libraryDependencies ++= Seq(
  "com.groupon.sparklint" %% "sparklint-spark162" % "1.0.4" excludeAll (
    ExclusionRule(organization = "org.apache.spark")
    ),
  "org.apache.spark" %% "spark-core" % "2.4.0",
  "org.apache.spark" %% "spark-sql" % "2.4.0",
  "org.apache.spark" %% "spark-streaming" % "2.4.0" % "provided",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3"
)

WeatherDataStream.scala

package com.supergloo
import kafka.serializer.StringDecoder
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils

/**
  * Stream from Kafka
  */
object WeatherDataStream {

  val localLogger = Logger.getLogger("WeatherDataStream")

  def main(args: Array[String]) {

    // update
    // val checkpointDir = "./tmp"

    val sparkConf = new SparkConf().setAppName("Raw Weather")
    sparkConf.setIfMissing("spark.master", "local[5]")

    val ssc = new StreamingContext(sparkConf, Seconds(2))

    val kafkaTopicRaw = "spark-topic"
    val kafkaBroker = "127.0.01:9092"

    val topics: Set[String] = kafkaTopicRaw.split(",").map(_.trim).toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> kafkaBroker)

    localLogger.info(s"connecting to brokers: $kafkaBroker")
    localLogger.info(s"kafkaParams: $kafkaParams")
    localLogger.info(s"topics: $topics")

    val rawWeatherStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    localLogger.info(s"Manaaaaaaaaaf  --->>>: $rawWeatherStream")


    //Kick off
    ssc.start()

    ssc.awaitTermination()

    ssc.stop()
  }
}

I have created jar file using command

sbt package

and run the application using command

./spark-submit --master spark://myserver:7077 --class com.supergloo.WeatherDataStream /home/Manaf/kafka-streaming_2.11-1.0.jar

But i got error like this

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$
        at com.supergloo.WeatherDataStream$.main(WeatherDataStream.scala:37)
        at com.supergloo.WeatherDataStream.main(WeatherDataStream.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

Based on my stack overflow analysis, i got idea about create jar using assembly command

sbt assembly

But I got an error like below when executing the assembly command

[error] 153 errors were encountered during merge
[trace] Stack trace suppressed: run last *:assembly for the full output.
[error] (*:assembly) deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.apache.arrow\arrow-vector\jars\arrow-vector-0.10.0.jar:git.properties
[error] C:\Users\amanaf\.ivy2\cache\org.apache.arrow\arrow-format\jars\arrow-format-0.10.0.jar:git.properties
[error] C:\Users\amanaf\.ivy2\cache\org.apache.arrow\arrow-memory\jars\arrow-memory-0.10.0.jar:git.properties
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\javax.inject\javax.inject\jars\javax.inject-1.jar:javax/inject/Inject.class
[error] C:\Users\amanaf\.ivy2\cache\org.glassfish.hk2.external\javax.inject\jars\javax.inject-2.4.0-b34.jar:javax/inject/Inject.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\javax.inject\javax.inject\jars\javax.inject-1.jar:javax/inject/Named.class
[error] C:\Users\amanaf\.ivy2\cache\org.glassfish.hk2.external\javax.inject\jars\javax.inject-2.4.0-b34.jar:javax/inject/Named.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\javax.inject\javax.inject\jars\javax.inject-1.jar:javax/inject/Provider.class
[error] C:\Users\amanaf\.ivy2\cache\org.glassfish.hk2.external\javax.inject\jars\javax.inject-2.4.0-b34.jar:javax/inject/Provider.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\javax.inject\javax.inject\jars\javax.inject-1.jar:javax/inject/Qualifier.class
[error] C:\Users\amanaf\.ivy2\cache\org.glassfish.hk2.external\javax.inject\jars\javax.inject-2.4.0-b34.jar:javax/inject/Qualifier.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\javax.inject\javax.inject\jars\javax.inject-1.jar:javax/inject/Scope.class
[error] C:\Users\amanaf\.ivy2\cache\org.glassfish.hk2.external\javax.inject\jars\javax.inject-2.4.0-b34.jar:javax/inject/Scope.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\javax.inject\javax.inject\jars\javax.inject-1.jar:javax/inject/Singleton.class
[error] C:\Users\amanaf\.ivy2\cache\org.glassfish.hk2.external\javax.inject\jars\javax.inject-2.4.0-b34.jar:javax/inject/Singleton.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4BlockInputStream.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4BlockInputStream.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4BlockOutputStream.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4BlockOutputStream.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4Compressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4Compressor.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4Constants.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4Constants.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4Factory.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4Factory.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4FastDecompressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4FastDecompressor.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4HCJNICompressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4HCJNICompressor.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4HCJavaSafeCompressor$HashTable.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4HCJavaSafeCompressor$HashTable.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4HCJavaSafeCompressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4HCJavaSafeCompressor.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4HCJavaUnsafeCompressor$HashTable.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4HCJavaUnsafeCompressor$HashTable.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4HCJavaUnsafeCompressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4HCJavaUnsafeCompressor.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4JNI.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4JNI.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4JNICompressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4JNICompressor.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4JNIFastDecompressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4JNIFastDecompressor.class
[error] deduplicate: different file contents found in the following:
[error] C:\Users\amanaf\.ivy2\cache\org.lz4\lz4-java\jars\lz4-java-1.4.0.jar:net/jpountz/lz4/LZ4JNISafeDecompressor.class
[error] C:\Users\amanaf\.ivy2\cache\net.jpountz.lz4\lz4\jars\lz4-1.2.0.jar:net/jpountz/lz4/LZ4JNISafeDecompressor.class
2

2 Answers

1
votes

This issue is related to library versions. I have just updated my build.sbt like this

name := "kafka-streaming"

version := "1.0"

assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)

assemblyMergeStrategy in assembly := {
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case PathList(pl @ _*) if pl.contains("log4j.properties") => MergeStrategy.concat
  case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

scalaVersion := "2.11.8"

resolvers += "jitpack" at "https://jitpack.io"

// still want to be able to run in sbt
// https://github.com/sbt/sbt-assembly#-provided-configuration
run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))

fork in run := true
javaOptions in run ++= Seq(
  "-Dlog4j.debug=true",
  "-Dlog4j.configuration=log4j.properties")


libraryDependencies ++= Seq(
  "com.groupon.sparklint" %% "sparklint-spark162" % "1.0.4" excludeAll (
    ExclusionRule(organization = "org.apache.spark")
    ),
  "org.apache.spark" %% "spark-core" % "1.6.2" % "provided",
  "org.apache.spark" %% "spark-sql" % "1.6.2" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.6.2" % "provided",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.6.2",
  "com.datastax.spark" %% "spark-cassandra-connector" % "1.6.0"
)

Now the issue resolved.

0
votes

This is the basic code to ingest messages from Kafka into Spark Streams to do a word frequency count. The code is customized for local machine.

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010._`

 object WordFreqCount {
   def main( args:Array[String] ){
     println("Start")
     val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("KafkaReceiver")
    .set("spark.driver.bindAddress","127.0.0.1")
    println("conf created")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val sc = spark.sparkContext
    val ssc = new StreamingContext(sc, Seconds(10))
    println("ssc created")
    val topics = "wctopic"
    val brokers = "127.0.0.1:9092"
    val groupId = "wcgroup"
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topicsSet, kafkaParams))
    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_.value)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()
    //val kafkaStream = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "wcgroup",Map("wctopic" -> 1))
    messages.print()  //prints the stream of data received
    ssc.start()
    ssc.awaitTermination()
    println("End")`
 }

}