1
votes

I'm using databricks on Azure, and part of my process include using TwoSigma's Flint. I upload the library to the databricks libraries, and I am able to run the example code below in notebooks on the databricks workspace.

The problem arises when I try to use databricks-connect. While normally everything works, when trying to use external libraries, including Flint, the following code, running under spark-shell --packages 'com.twosigma:flint:0.6.0', produces the error below.

import org.apache.spark.sql.functions._
import com.twosigma.flint.timeseries.TimeSeriesRDD
import scala.concurrent.duration._
import spark.implicits._
val df = Seq(("2018-08-20", 1.0), ("2018-08-20", 2.0), ("2018-08-21", 3.0)).toDF("time", "number").withColumn("time", from_utc_timestamp($"time", "UTC"))
val tsRdd = TimeSeriesRDD.fromDF(df)(isSorted=false, timeUnit=DAYS)
val results = tsRdd.groupByCycle()
results.toDF.show

The error is as follows:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 78.0 failed 4 times, most recent failure: Lost task 2.3 in stage 78.0 (TID 2164, 10.139.64.7, executor 0): java.lang.ClassCastException: org.apache.spark.sql.types.StructField cannot be cast to java.lang.Integer
        at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:101)
        at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:302)
        at com.twosigma.flint.timeseries.TimeSeriesStore$.getInternalRowConverter(TimeSeriesStore.scala:108)
        at com.twosigma.flint.timeseries.TimeSeriesStore$$anonfun$2.apply(TimeSeriesStore.scala:53)
        at com.twosigma.flint.timeseries.TimeSeriesStore$$anonfun$2.apply(TimeSeriesStore.scala:52)
        at org.apache.spark.rdd.RDD$client1f520962c6$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:817)
        at org.apache.spark.rdd.RDD$client1f520962c6$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:817)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:340)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:304)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.doRunTask(Task.scala:139)
        at org.apache.spark.scheduler.Task.run(Task.scala:112)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1526)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:503)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Am I specifying the dependencies the wrong way?

1
Is the TwoSigma library installed on your local machine? - Raphael K
I tried loading it using either --packages option or with --jar option in the spark-shell command. - tmrlvi
@tmrlvi which version of spark are you using? thus it is working perfectly fine with spark 2.+. - Mahesh Gupta
@MaheshGupta Which version of Databricks runtime did you use? I used 5.5, which implies spark 2.4.3, scala 2.11 and databricks-connect 5.5.1. - tmrlvi
@tmrlvi I am using community version of 6.1 (includes Apache Spark 2.4.4, Scala 2.11) - Mahesh Gupta

1 Answers

-1
votes

To specify dependencies you'll need to include them as JAR files that get shared across the cluster. From the docs:

Typically your main class or Python file will have other dependency JARs and files. You can add such dependency JARs and files by calling sparkContext.addJar("path-to-the-jar") or sparkContext.addPyFile("path-to-the-file"). You can also add Egg files and zip files with the addPyFile() interface. Every time you run the code in your IDE, the dependency JARs and files are installed on the cluster.

Here's an example in Scala (also from the docs):

package com.example

import org.apache.spark.sql.SparkSession

case class Foo(x: String)

object Test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      ...
      .getOrCreate();
    spark.sparkContext.setLogLevel("INFO")

    println("Running simple show query...")
    spark.read.parquet("/tmp/x").show()

    println("Running simple UDF query...")

    // Adding external library to project
    spark.sparkContext.addJar("./target/scala-2.11/hello-world_2.11-1.0.jar")
    spark.udf.register("f", (x: Int) => x + 1)
    spark.range(10).selectExpr("f(id)").show()

    println("Running custom objects query...")
    val objs = spark.sparkContext.parallelize(Seq(Foo("bye"), Foo("hi"))).collect()
    println(objs.toSeq)
  }
}

It's also worth noting that the DBR version running on the cluster and the DB Connect version running on your local machine need to be the same.