1
votes

I would like to define a UDFs function to filter the DataFrame in Spark. I want to filter an element of array in each column.

Example: Filter element start with 'Z', remove all element in array which not startwith Z

Original Data
+---+-------------+
| _1|           _2|
+---+-------------+
|id1|[AA,BB,CC,Z12]|
|id2|[AA,ZA,CC,Z3]|
|id2|[Z2,XX,CC,A2]|
+---+-------------+
Expected result
+---+-----------+
| _1| _2        |
+---+-----------+
|id1| [Z12]     |
|id2| [ZA,Z3]   |
|id2| [Z2]      |
+---+-----------+
Current result
+---+--------------+
| _1| _2           |
+---+--------------+
|id1| []           |
|id2| []           |
|id2| [Z2,XX,CC,A2]|
+---+--------------+

Current Code

def filterArray = udf((recs: Seq[String]) =>{
    recs.filter(_.startsWith("Z"))
})

val rawData = Seq(("id1",Array("AA,BB,CC,Z12")),("id2",Array("AA,ZA,CC,Z3")),("id2",Array("AA,XX,CC,A2")))
var test = spark.createDataFrame(rawData)
test.show(4)
test = test.withColumn("_2", filterArray(test("_2")))
test.show(4)
1
is there a problem with the solution you gave ?Chitral Verma

1 Answers

0
votes

The problem is that your arrays all have just 1 element, you should first split the containing string and then filter:

def filterArray = udf((recs: Seq[String]) =>{
  recs.flatMap(_.split(",")).filter(_.startsWith("Z"))
})

then you get

+---+--------+
| _1|      _2|
+---+--------+
|id1|   [Z12]|
|id2|[ZA, Z3]|
|id2|      []|
+---+--------+

You can keep your current UDF if the data is defined like:

val rawData = Seq(
   ("id1",Array("AA","BB","CC","Z12")), 
   ("id2",Array("AA","ZA","CC","Z3")), 
   ("id2",Array("AA","XX","CC","A2"))
 )