0
votes

I have a basic Spark - Kafka code, I try to run following code:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel

import java.util.regex.Pattern
import java.util.regex.Matcher
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
import Utilities._
object WordCount {
  def main(args: Array[String]): Unit = {

    val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(1))

    setupLogging()

    // Construct a regular expression (regex) to extract fields from raw Apache log lines
    val pattern = apacheLogPattern()

    // hostname:port for Kafka brokers, not Zookeeper
    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
    // List of topics you want to listen for from Kafka
    val topics = List("testLogs").toSet
    // Create our Kafka stream, which will contain (topic,message) pairs. We tack a
    // map(_._2) at the end in order to only get the messages, which contain individual
    // lines of data.
    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics).map(_._2)

    // Extract the request field from each log line
    val requests = lines.map(x => {val matcher:Matcher = pattern.matcher(x); if (matcher.matches()) matcher.group(5)})

    // Extract the URL from the request
    val urls = requests.map(x => {val arr = x.toString().split(" "); if (arr.size == 3) arr(1) else "[error]"})

    // Reduce by URL over a 5-minute window sliding every second
    val urlCounts = urls.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(300), Seconds(1))

    // Sort and print the results
    val sortedResults = urlCounts.transform(rdd => rdd.sortBy(x => x._2, false))
    sortedResults.print()

    // Kick it off
    ssc.checkpoint("/home/")
    ssc.start()
    ssc.awaitTermination()

  }


}

I am using IntelliJ IDE, and create scala project by using sbt. Details of build.sbt file is as follow:

name := "Sample"

version := "1.0"

organization := "com.sundogsoftware"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.2.0" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.4.1",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.4.1",
  "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"

)

However, when I try to build the code, it creates following error:

Error:scalac: missing or invalid dependency detected while loading class file 'StreamingContext.class'. Could not access type Logging in package org.apache.spark, because it (or its dependencies) are missing. Check your build definition for missing or conflicting dependencies. (Re-run with -Ylog-classpath to see the problematic classpath.) A full rebuild may help if 'StreamingContext.class' was compiled against an incompatible version of org.apache.spark.

Error:scalac: missing or invalid dependency detected while loading class file 'DStream.class'. Could not access type Logging in package org.apache.spark, because it (or its dependencies) are missing. Check your build definition for missing or conflicting dependencies. (Re-run with -Ylog-classpath to see the problematic classpath.) A full rebuild may help if 'DStream.class' was compiled against an incompatible version of org.apache.spark.

1

1 Answers

1
votes

When using different Spark libraries together the versions of all libs should always match.

Also, the version of kafka you use matters also, so should be for example: spark-streaming-kafka-0-10_2.11

...
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-streaming" % sparkVersion,
  "org.apache.spark" %% "spark-streaming-kafka-0-10_2.11" % sparkVersion,
  "org.apache.hadoop" % "hadoop-hdfs" % "2.6.0"

)

This is a useful site if you need to check the exact dependencies you should use: https://search.maven.org/