1
votes

I have dataframe in spark that is very complex. I'm trying to use a UDF that takes 2 columns and then runs a function on each row of each column at the same time.

each column has the following identical schema:

root
 |-- A: array (nullable = true)
 |    |-- element: double (containsNull = true)

In some cases, the array will be empty and in other cases, it will have elements, the count will vary.

when I do .dtypes on a column I get:

test: Array[(String, String)] = Array((A,ArrayType(DoubleType,true)))

When I do a take(1) on one of the columns I get a

Array[org.apache.spark.sql.Row] = Array([WrappedArray(1234, 4567, 789, 1346)])

When I simply run a select on a column I get:

org.apache.spark.sql.DataFrame = [A: array<double>]

My goal is to run a function that takes each column's same element.

def inRange = udf((A: ???, B: ??? ) => {
   //iterate over the array and run coolFunction(A(0),B(0))
 })

I'm running the udf in this

df.withColumn("coolFunction", coolFunction(df("A"), df("B"))) 
2

2 Answers

1
votes

You can define your udf function using collection.mutable.WrappedArray[Double] as

def inRange = udf((A: collection.mutable.WrappedArray[Double], B: collection.mutable.WrappedArray[Double]) => {
  //iterate over the array and run coolFunction(A(0),B(0))
})

Or you can also use the parent class of WrappedArray which are IndexedSeq or Seq

def inRange = udf((A: collection.mutable.IndexedSeq[Double], B: collection.mutable.IndexedSeq[Double]) => {
  //iterate over the array and run coolFunction(A(0),B(0))
})

Or

def inRange = udf((A: Seq[Double], B: Seq[Double]) => {
  //iterate over the array and run coolFunction(A(0),B(0))
})
0
votes

Should be:

def inRange = udf((A: Seq[Double], B: Seq[Double]) => {
    //iterate over the array and run coolFunction(A(0),B(0))
})

Reference https://spark.apache.org/docs/latest/sql-programming-guide.html#data-types