So I perform the necessary imports etc
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.types._
import spark.implicits._
then define some latlong points
val london = (1.0, 1.0)
val suburbia = (2.0, 2.0)
val southampton = (3.0, 3.0)
val york = (4.0, 4.0)
I then create a spark Dataframe like this and check that it works:
val exampleDF = Seq((List(london,suburbia),List(southampton,york)),
the dataframe consists of the following types
DataFrame = [AR1: array<struct<_1:double,_2:double>>, AR2: array<struct<_1:double,_2:double>>]
I create a function to create a combination of points
// function to do what I want
val latlongexplode = (x: Array[(Double,Double)], y: Array[(Double,Double)]) => {
for (a <- x; b <-y) yield (a,b)
I check that the function works
and it does. However after i create a UDF out of this function
// declare function into a Spark UDF
val latlongexplodeUDF = udf (latlongexplode)
when i try to use it in the spark dataframe I have created above like this:
exampleDF.withColumn("latlongexplode", latlongexplodeUDF($"AR1",$"AR2")).show(false)
I get a really long stacktrace which basically boils down to :
java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Lscala.Tuple2;
org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$f$3(ScalaUDF.scala:121) org.apache.spark.sql.catalyst.expressions.ScalaUDF.eval(ScalaUDF.scala:1063) org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:151) org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:50) org.apache.spark.sql.catalyst.expressions.InterpretedProjection.apply(Projection.scala:32) scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
How can I get this udf to work in Scala Spark? (im using 2.4 at the moment if this helps)
EDIT: it could be that the way I construct my example df has an issue. But what I have as the actual data is an array (of unknown size) of lat/long tuples on each column.