2
votes

Hi I am trying to generate output of Salt Examples but without using docker as mentioned in it's documentation. I found the scala code that helps in generating the output which is the Main.scala. I took to modify the Main.scala to a convenient one,

package BinExTest
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row

import software.uncharted.salt.core.projection.numeric._
import software.uncharted.salt.core.generation.request._
import software.uncharted.salt.core.generation.Series
import software.uncharted.salt.core.generation.TileGenerator
import software.uncharted.salt.core.generation.output.SeriesData
import software.uncharted.salt.core.analytic.numeric._

import java.io._

import scala.util.parsing.json.JSONObject

object Main {

  // Defines the tile size in both x and y bin dimensions
  val tileSize = 256

  // Defines the output layer name
  val layerName = "pickups"

  // Creates and returns an Array of Double values encoded as 64bit Integers
  def createByteBuffer(tile: SeriesData[(Int, Int, Int), (Int, Int), Double, (Double, Double)]): Array[Byte] = {
    val byteArray = new Array[Byte](tileSize * tileSize * 8)
    var j = 0
    tile.bins.foreach(b => {
      val data = java.lang.Double.doubleToLongBits(b)
      for (i <- 0 to 7) {
        byteArray(j) = ((data >> (i * 8)) & 0xff).asInstanceOf[Byte]
        j += 1
      }
    })
    byteArray
  }

  def main(args: Array[String]): Unit = {

    val jarFile = "/home/kesava/Studies/BinExTest/BinExTest.jar"; 
    val inputPath = "/home/kesava/Downloads/taxi_micro.csv"
    val outputPath = "/home/kesava/SoftWares/salt/salt-examples/bin-example/Output"

    val conf = new SparkConf().setAppName("salt-bin-example").setJars(Array(jarFile))
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    sqlContext.read.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(s"file://$inputPath")
      .registerTempTable("taxi_micro")

    // Construct an RDD of Rows containing only the fields we need. Cache the result
    val input = sqlContext.sql("select pickup_lon, pickup_lat from taxi_micro")
      .rdd.cache()

    // Given an input row, return pickup longitude, latitude as a tuple
    val pickupExtractor = (r: Row) => {
      if (r.isNullAt(0) || r.isNullAt(1)) {
        None
      } else {
        Some((r.getDouble(0), r.getDouble(1)))
      }
    }

    // Tile Generator object, which houses the generation logic
    val gen = TileGenerator(sc)

    // Break levels into batches. Process several higher levels at once because the
    // number of tile outputs is quite low. Lower levels done individually due to high tile counts.
    val levelBatches = List(List(0, 1, 2, 3, 4, 5, 6, 7, 8), List(9, 10, 11), List(12), List(13), List(14))

    // Iterate over sets of levels to generate.
    val levelMeta = levelBatches.map(level => {

      println("------------------------------")
      println(s"Generating level $level")
      println("------------------------------")

      // Construct the definition of the tiling jobs: pickups
      val pickups = new Series((tileSize - 1, tileSize - 1),
        pickupExtractor,
        new MercatorProjection(level),
        (r: Row) => Some(1),
        CountAggregator,
        Some(MinMaxAggregator))

      // Create a request for all tiles on these levels, generate
      val request = new TileLevelRequest(level, (coord: (Int, Int, Int)) => coord._1)
      val rdd = gen.generate(input, pickups, request)

      // Translate RDD of Tiles to RDD of (coordinate,byte array), collect to master for serialization
      val output = rdd
        .map(s => pickups(s).get)
        .map(tile => {
          // Return tuples of tile coordinate, byte array
          (tile.coords, createByteBuffer(tile))
        })
        .collect()

      // Save byte files to local filesystem
      output.foreach(tile => {
        val coord = tile._1
        val byteArray = tile._2
        val limit = (1 << coord._1) - 1
        // Use standard TMS path structure and file naming
        val file = new File(s"$outputPath/$layerName/${coord._1}/${coord._2}/${limit - coord._3}.bins")
        file.getParentFile.mkdirs()
        val output = new FileOutputStream(file)
        output.write(byteArray)
        output.close()
      })

      // Create map from each level to min / max values.
      rdd
        .map(s => pickups(s).get)
        .map(t => (t.coords._1.toString, t.tileMeta.get))
        .reduceByKey((l, r) => {
          (Math.min(l._1, r._1), Math.max(l._2, r._2))
        })
        .mapValues(minMax => {
          JSONObject(Map(
            "min" -> minMax._1,
            "max" -> minMax._2
          ))
        })
        .collect()
        .toMap
    })

    // Flatten array of maps into a single map
    val levelInfoJSON = JSONObject(levelMeta.reduce(_ ++ _)).toString()
    // Save level metadata to filesystem
    val pw = new PrintWriter(s"$outputPath/$layerName/meta.json")
    pw.write(levelInfoJSON)
    pw.close()

  }
}

I created a separate folder for this scala with another folder in it named lib that had the jars required and I compiled it with scalac as follows,

scalac -cp "lib/salt.jar:lib/spark.jar" Main.scala

This ran successfully and generated classes under a folder BinExTest.

Now, the project's build.gradle had the following lines of code with which identified that this is the command that would help in generating the output dataset,

task run(overwrite: true, type: Exec, dependsOn: [assemble]) {
  executable = 'spark-submit'
  args = ["--class","software.uncharted.salt.examples.bin.Main","/opt/salt/build/libs/salt-bin-example-${version}.jar", "/opt/data/taxi_one_day.csv", "/opt/output"]
}

Seeing this, I made the following command,

spark-submit --class BinExTest.Main lib/salt.jar

When I do this, I get the following error,

java.lang.ClassNotFoundException: Main.BinExTest at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:278) at org.apache.spark.util.Utils$.classForName(Utils.scala:174) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Can somebody help me out in this? I am completely new to this and came this far just by exploration.

[Update 1]

Taking in YoYo's suggestion,

spark-submit --class BinExTest.Main --jars "BinExTest.jar" "lib/salt.jar"

I got the ClassNotFoundException gone generating new error and is as follows,

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: Lost task 1.0 in stage 3.0 (TID 6, localhost): java.lang.NoSuchMethodError: scala.runtime.IntRef.create(I)Lscala/runtime/IntRef; at BinExTest.Main$.createByteBuffer(Main.scala:29) at BinExTest.Main$$anonfun$2$$anonfun$6.apply(Main.scala:101) at BinExTest.Main$$anonfun$2$$anonfun$6.apply(Main.scala:99) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

Any idea what's going on?

[Update 2]

Building Spark from source with Scala2.11 support solved my previous issue. However I got a new error and it is,

6/05/10 18:39:15 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 3, localhost): java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class at software.uncharted.salt.core.util.SparseArray.(SparseArray.scala:37) at software.uncharted.salt.core.util.SparseArray.(SparseArray.scala:57) at software.uncharted.salt.core.generation.rdd.RDDSeriesWrapper.makeBins(RDDTileGenerator.scala:224) at software.uncharted.salt.core.generation.rdd.RDDTileGeneratorCombiner.createCombiner(RDDTileGenerator.scala:128) at software.uncharted.salt.core.generation.rdd.RDDTileGenerator$$anonfun$3.apply(RDDTileGenerator.scala:100) at software.uncharted.salt.core.generation.rdd.RDDTileGenerator$$anonfun$3.apply(RDDTileGenerator.scala:100) at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:187) at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:186) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:148) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

Is this because scala2.11 does not have the mentioned class?

[Final Update]

Adding scala2.10 to the spark-submit did the trick.

spark-submit --class "BinExTest.Main" --jars "BinExTest.jar,lib/scala210.jar" "lib/salt.jar"

1

1 Answers

3
votes

For a Spark job to run, it need to self-replicate it's code over the different nodes that make part of your spark cluster. It does that by literally copying over the jar file to the other nodes.

That means that you need to make sure that your class files are packaged in a .jar file. In my typical solutions, I would build an Uber jar that packages the class files, and the dependent jar files together in a single .jar file. For that I use the Maven Shade plugin. That doesn't have to be your solution, but at least you should build a .jar file out of your generated classes.

To provide manually additional jar files - you will need to add them using the --jars option which will expect a comma delimited list.

Update 1

Actually, even for me there is a lot of confusion about all the available options, specifically to the jar files and how they are distributed, or modify the classpath in spark. See another topic I just posted.

Update 2

For the second part of your question that is already answered on another thread.