I'm using databricks on Azure, and part of my process include using TwoSigma's Flint. I upload the library to the databricks libraries, and I am able to run the example code below in notebooks on the databricks workspace.
The problem arises when I try to use databricks-connect. While normally everything works, when trying to use external libraries, including Flint, the following code, running under spark-shell --packages 'com.twosigma:flint:0.6.0', produces the error below.
import org.apache.spark.sql.functions._
import com.twosigma.flint.timeseries.TimeSeriesRDD
import scala.concurrent.duration._
import spark.implicits._
val df = Seq(("2018-08-20", 1.0), ("2018-08-20", 2.0), ("2018-08-21", 3.0)).toDF("time", "number").withColumn("time", from_utc_timestamp($"time", "UTC"))
val tsRdd = TimeSeriesRDD.fromDF(df)(isSorted=false, timeUnit=DAYS)
val results = tsRdd.groupByCycle()
results.toDF.show
The error is as follows:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 78.0 failed 4 times, most recent failure: Lost task 2.3 in stage 78.0 (TID 2164, 10.139.64.7, executor 0): java.lang.ClassCastException: org.apache.spark.sql.types.StructField cannot be cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:302)
at com.twosigma.flint.timeseries.TimeSeriesStore$.getInternalRowConverter(TimeSeriesStore.scala:108)
at com.twosigma.flint.timeseries.TimeSeriesStore$$anonfun$2.apply(TimeSeriesStore.scala:53)
at com.twosigma.flint.timeseries.TimeSeriesStore$$anonfun$2.apply(TimeSeriesStore.scala:52)
at org.apache.spark.rdd.RDD$client1f520962c6$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:817)
at org.apache.spark.rdd.RDD$client1f520962c6$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:817)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Am I specifying the dependencies the wrong way?
--packagesoption or with--jaroption in thespark-shellcommand. - tmrlvi