2
votes

Like anticipated by the title, I have some problems to submit a spark job to a spark cluster running on docker.

I wrote a very simple spark job in scala, subscribe to a kafka server arrange some data and store these in an elastichsearch database. kafka and elasticsearch are already running in docker.

Everything works perfectly if I run the spark job from my Ide in my dev environment (Windows / IntelliJ).

Then (and I'm not a java guy at all), I added a spark cluster following these instructions: https://github.com/big-data-europe/docker-spark

The cluster looks healthy when consulting its dashboard. I created a cluster consisting of a master and a worker.

Now, this is my job written in scala:

import java.io.Serializable

import org.apache.commons.codec.StringDecoder
import org.apache.hadoop.fs.LocalFileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark
import org.apache.spark.SparkConf
import org.elasticsearch.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.util.parsing.json.JSON

object KafkaConsumer {
  def main(args: Array[String]): Unit = {

    val sc = new SparkConf()
      .setMaster("local[*]")
      .setAppName("Elastic Search Indexer App")

    sc.set("es.index.auto.create", "true")

    val elasticResource = "iot/demo"
    val ssc = new StreamingContext(sc, Seconds(10))

    //ssc.checkpoint("./checkpoint")

    val kafkaParams = Map(
      "bootstrap.servers" -> "kafka:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "earliest",
      "group.id" -> "group0"
    )

    val topics = List("test")
    val stream = KafkaUtils.createDirectStream(
      ssc,
      PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics.distinct, kafkaParams)
    )

    case class message(key: String, timestamp: Long, payload: Object)
    val rdds = stream.map(record => message(record.key, record.timestamp, record.value))

    val es_config: scala.collection.mutable.Map[String, String] =
      scala.collection.mutable.Map(
        "pushdown" -> "true",
        "es.nodes" -> "http://docker-host",
        "es.nodes.wan.only" -> "true",
        "es.resource" -> elasticResource,
        "es.ingest.pipeline" -> "iot-test-pipeline"
      )


    rdds.foreachRDD { rdd =>
      rdd.saveToEs(es_config)
      rdd.collect().foreach(println)
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

To submit this to the cluster I did:

  • With "sbt-assembly" plugin, I created a fat jar file with all dependencies.
  • Define an assembly strategy in build.sbt to avoid deduplicate errors on merging ...

Then submit with:

./spark-submit.cmd --class KafkaConsumer --master spark://docker-host:7077 /c/Users/shams/Documents/Appunti/iot-demo-app/spark-streaming/target/scala-2.11/ spark-streaming-assembly-1.0.jar

BUT I have this error:

19/02/27 11:18:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Exception in thread "main" java.io.IOException: No FileSystem for scheme: C at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373) at org.apache.spark.util.Utils$.getHadoopFileSystem(Utils.scala:1897) at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:694) at org.apache.spark.deploy.DependencyUtils$.downloadFile(DependencyUtils.scala:135) at org.apache.spark.deploy.SparkSubmit$$anonfun$doPrepareSubmitEnvironment$7.apply(SparkSubmit.scala:416) at org.apache.spark.deploy.SparkSubmit$$anonfun$doPrepareSubmitEnvironment$7.apply(SparkSubmit.scala:416) at scala.Option.map(Option.scala:146) at org.apache.spark.deploy.SparkSubmit$.doPrepareSubmitEnvironment(SparkSubmit.scala:415) at org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:250) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:171) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

After a day of trying I have not solved and I can not understand where in my work wants to access a certain volume as seems to be said by the error

Can be related with the warning message? Then, how I should edit my script to avoid that problem?

Thanks in advance.

UPDATE:

Problem seems not related to my code because I tried tu submit a simple hello world app compiled in the same way but I have the same issue.

1

1 Answers

0
votes

After many attempts and research I have come to the conclusion that the problem could be that I'm using the windows version of spark-submit from my pc to submit the job.

I could not fully understand but for now, moving the file directly to the master and worker node I was able to submit it from there.

First copy on the container:

docker cp spark-streaming-assembly-1.0.jar 21b43cb2e698:/spark/bin

Then I execute (in /spark/bin folder):

./spark-submit --class KafkaConsumer --deploy-mode cluster --master spark://spark-master:7077 spark-streaming-assembly-1.0.jar

This is the workaround that i found at the moment.