Hi I am trying to generate output of Salt Examples but without using docker as mentioned in it's documentation. I found the scala code that helps in generating the output which is the Main.scala. I took to modify the Main.scala to a convenient one,
package BinExTest
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import software.uncharted.salt.core.projection.numeric._
import software.uncharted.salt.core.generation.request._
import software.uncharted.salt.core.generation.Series
import software.uncharted.salt.core.generation.TileGenerator
import software.uncharted.salt.core.generation.output.SeriesData
import software.uncharted.salt.core.analytic.numeric._
import java.io._
import scala.util.parsing.json.JSONObject
object Main {
// Defines the tile size in both x and y bin dimensions
val tileSize = 256
// Defines the output layer name
val layerName = "pickups"
// Creates and returns an Array of Double values encoded as 64bit Integers
def createByteBuffer(tile: SeriesData[(Int, Int, Int), (Int, Int), Double, (Double, Double)]): Array[Byte] = {
val byteArray = new Array[Byte](tileSize * tileSize * 8)
var j = 0
tile.bins.foreach(b => {
val data = java.lang.Double.doubleToLongBits(b)
for (i <- 0 to 7) {
byteArray(j) = ((data >> (i * 8)) & 0xff).asInstanceOf[Byte]
j += 1
}
})
byteArray
}
def main(args: Array[String]): Unit = {
val jarFile = "/home/kesava/Studies/BinExTest/BinExTest.jar";
val inputPath = "/home/kesava/Downloads/taxi_micro.csv"
val outputPath = "/home/kesava/SoftWares/salt/salt-examples/bin-example/Output"
val conf = new SparkConf().setAppName("salt-bin-example").setJars(Array(jarFile))
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load(s"file://$inputPath")
.registerTempTable("taxi_micro")
// Construct an RDD of Rows containing only the fields we need. Cache the result
val input = sqlContext.sql("select pickup_lon, pickup_lat from taxi_micro")
.rdd.cache()
// Given an input row, return pickup longitude, latitude as a tuple
val pickupExtractor = (r: Row) => {
if (r.isNullAt(0) || r.isNullAt(1)) {
None
} else {
Some((r.getDouble(0), r.getDouble(1)))
}
}
// Tile Generator object, which houses the generation logic
val gen = TileGenerator(sc)
// Break levels into batches. Process several higher levels at once because the
// number of tile outputs is quite low. Lower levels done individually due to high tile counts.
val levelBatches = List(List(0, 1, 2, 3, 4, 5, 6, 7, 8), List(9, 10, 11), List(12), List(13), List(14))
// Iterate over sets of levels to generate.
val levelMeta = levelBatches.map(level => {
println("------------------------------")
println(s"Generating level $level")
println("------------------------------")
// Construct the definition of the tiling jobs: pickups
val pickups = new Series((tileSize - 1, tileSize - 1),
pickupExtractor,
new MercatorProjection(level),
(r: Row) => Some(1),
CountAggregator,
Some(MinMaxAggregator))
// Create a request for all tiles on these levels, generate
val request = new TileLevelRequest(level, (coord: (Int, Int, Int)) => coord._1)
val rdd = gen.generate(input, pickups, request)
// Translate RDD of Tiles to RDD of (coordinate,byte array), collect to master for serialization
val output = rdd
.map(s => pickups(s).get)
.map(tile => {
// Return tuples of tile coordinate, byte array
(tile.coords, createByteBuffer(tile))
})
.collect()
// Save byte files to local filesystem
output.foreach(tile => {
val coord = tile._1
val byteArray = tile._2
val limit = (1 << coord._1) - 1
// Use standard TMS path structure and file naming
val file = new File(s"$outputPath/$layerName/${coord._1}/${coord._2}/${limit - coord._3}.bins")
file.getParentFile.mkdirs()
val output = new FileOutputStream(file)
output.write(byteArray)
output.close()
})
// Create map from each level to min / max values.
rdd
.map(s => pickups(s).get)
.map(t => (t.coords._1.toString, t.tileMeta.get))
.reduceByKey((l, r) => {
(Math.min(l._1, r._1), Math.max(l._2, r._2))
})
.mapValues(minMax => {
JSONObject(Map(
"min" -> minMax._1,
"max" -> minMax._2
))
})
.collect()
.toMap
})
// Flatten array of maps into a single map
val levelInfoJSON = JSONObject(levelMeta.reduce(_ ++ _)).toString()
// Save level metadata to filesystem
val pw = new PrintWriter(s"$outputPath/$layerName/meta.json")
pw.write(levelInfoJSON)
pw.close()
}
}
I created a separate folder for this scala with another folder in it named lib that had the jars required and I compiled it with scalac as follows,
scalac -cp "lib/salt.jar:lib/spark.jar" Main.scala
This ran successfully and generated classes under a folder BinExTest.
Now, the project's build.gradle had the following lines of code with which identified that this is the command that would help in generating the output dataset,
task run(overwrite: true, type: Exec, dependsOn: [assemble]) {
executable = 'spark-submit'
args = ["--class","software.uncharted.salt.examples.bin.Main","/opt/salt/build/libs/salt-bin-example-${version}.jar", "/opt/data/taxi_one_day.csv", "/opt/output"]
}
Seeing this, I made the following command,
spark-submit --class BinExTest.Main lib/salt.jar
When I do this, I get the following error,
java.lang.ClassNotFoundException: Main.BinExTest at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:278) at org.apache.spark.util.Utils$.classForName(Utils.scala:174) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:689) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Can somebody help me out in this? I am completely new to this and came this far just by exploration.
[Update 1]
Taking in YoYo's suggestion,
spark-submit --class BinExTest.Main --jars "BinExTest.jar" "lib/salt.jar"
I got the ClassNotFoundException gone generating new error and is as follows,
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: Lost task 1.0 in stage 3.0 (TID 6, localhost): java.lang.NoSuchMethodError: scala.runtime.IntRef.create(I)Lscala/runtime/IntRef; at BinExTest.Main$.createByteBuffer(Main.scala:29) at BinExTest.Main$$anonfun$2$$anonfun$6.apply(Main.scala:101) at BinExTest.Main$$anonfun$2$$anonfun$6.apply(Main.scala:99) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)
Any idea what's going on?
[Update 2]
Building Spark from source with Scala2.11 support solved my previous issue. However I got a new error and it is,
6/05/10 18:39:15 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 3, localhost): java.lang.NoClassDefFoundError: scala/collection/GenTraversableOnce$class at software.uncharted.salt.core.util.SparseArray.(SparseArray.scala:37) at software.uncharted.salt.core.util.SparseArray.(SparseArray.scala:57) at software.uncharted.salt.core.generation.rdd.RDDSeriesWrapper.makeBins(RDDTileGenerator.scala:224) at software.uncharted.salt.core.generation.rdd.RDDTileGeneratorCombiner.createCombiner(RDDTileGenerator.scala:128) at software.uncharted.salt.core.generation.rdd.RDDTileGenerator$$anonfun$3.apply(RDDTileGenerator.scala:100) at software.uncharted.salt.core.generation.rdd.RDDTileGenerator$$anonfun$3.apply(RDDTileGenerator.scala:100) at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:187) at org.apache.spark.util.collection.ExternalSorter$$anonfun$5.apply(ExternalSorter.scala:186) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:148) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: scala.collection.GenTraversableOnce$class at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
Is this because scala2.11 does not have the mentioned class?
[Final Update]
Adding scala2.10 to the spark-submit did the trick.
spark-submit --class "BinExTest.Main" --jars "BinExTest.jar,lib/scala210.jar" "lib/salt.jar"