1
votes

How can I access geomesas UDF in spark scala dataframe (not textual) api? I.e. how to convert

How can I make sql UDFs available in the textual spark-sql API available in the scala data frame DSL? I.e. how to enable instead of this expression

spark.sql("select st_asText(st_bufferPoint(geom,10)) from chicago where case_number = 1")

something similar to

df.select(st_asText(st_bufferPoint('geom, 10))).filter('case_number === 1)

How to register geomesas UDF in a way that these are not only available to the sql text mode. SQLTypes.init(spark.sqlContext) from https://github.com/locationtech/geomesa/blob/f13d251f4d8ad68f4339b871a3283e43c39ad428/geomesa-spark/geomesa-spark-sql/src/main/scala/org/apache/spark/sql/SQLTypes.scala#L59-L66 only seems to register textual expressions.

I am already importing

import org.apache.spark.sql.functions._

so these functions

https://github.com/locationtech/geomesa/blob/828822dabccb6062118e36c58df8c3a7fa79b75b/geomesa-spark/geomesa-spark-sql/src/main/scala/org/apache/spark/sql/SQLSpatialFunctions.scala#L31-L41

should be available.

2

2 Answers

1
votes

You can use the udf function in the org.apache.spark.sql.functions you're importing e.g.

val  myUdf = udf((x: String) => doSomethingWithX(x))

you can then use myUdf in the DSL as in df.select(myUdf($"field"))

1
votes

Take a look at the callUDF function from org.apache.spark.sql.functions

val spark = SparkSession.builder()
  .appName("callUDF")
  .master("local[*]")
  .getOrCreate()
import spark.implicits._

val df = spark.createDataset(List("abcde", "bcdef", "cdefg")).toDF("str")
df.createTempView("view")

spark.sql("select length(substring(str, 2, 3)) from view").show()
df.select(callUDF("length", callUDF("substring", $"str", lit(2), lit(3)))).show()

spark.stop()

Tested with Spark 2.1