0
votes

I want to filter Spark sql.DataFrame leaving only wanted array elements without any knowledge for the whole schema before hand (don't want to hardcode it). Schema:

root
 |-- callstartcelllabel: string (nullable = true)
 |-- calltargetcelllabel: string (nullable = true)
 |-- measurements: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- enodeb: string (nullable = true)
 |    |    |-- label: string (nullable = true)
 |    |    |-- ltecelloid: long (nullable = true)
 |-- networkcode: long (nullable = true)
 |-- ocode: long (nullable = true)
 |-- startcelllabel: string (nullable = true)
 |-- startcelloid: long (nullable = true)
 |-- targetcelllabel: string (nullable = true)
 |-- targetcelloid: long (nullable = true)
 |-- timestamp: long (nullable = true)

I want whole root only with particular measurements (which are filtered on) and root must contain at least one after filtering.

I have a dataframe of this root, and I have a dataframe of filtering values (one column).

So, example: I would only know that my root contains measurements array, and this array contains labels. So I want whole root with whole measurements which contains labels ("label1","label2").

last trial with explode and collect_list leads to: grouping expressions sequence is empty, and 'callstartcelllabel' is not an aggregate function... Is it even possible to generalize such filtering case ? Don't know how such generic udaf should look like yet.

New in Spark.

EDIT:

Current solution I've came to is:

explode array -> filter out unwanted rows with unwanted array members -> groupby everything but array members -> agg.(collect_list(col("measurements"))

Would it be faster doing it with udf ? I can't figure out how to make a generic udf filtering generic array, knowing only about filtering values...

2
please share example data instead of schema - mtoto

2 Answers

0
votes
case class Test(a:Int,b:Int) // declared case class to show above scenario
var df=List((1,2,Test(1,2)),(2,3,Test(3,4)),(4,2,Test(5,6))).toDF("name","rank","array")
 +----+----+------+
 |name|rank| array|
 +----+----+------+
 |   1|   2|[1, 2]|
 |   2|   3|[3, 4]|
 |   4|   2|[5, 6]|
 +----+----+------+
df.printSchema
 //dataFrame structure look like this 
 root
|-- name: integer (nullable = false)
|-- rank: integer (nullable = false)
|-- array: struct (nullable = true)
|    |-- a: integer (nullable = false)
|    |-- b: integer (nullable = false)

df.filter(df("array")("a")>1).show
//after filter on dataFrame on specified condition
 +----+----+------+
 |name|rank| array|
 +----+----+------+
 |   2|   3|[3, 4]|
 |   4|   2|[5, 6]|
 +----+----+------+

//Above code help you to understand the Scenario

//use this piece of code:
 df.filter(df("measurements")("label")==="label1" || df("measurements")("label")==="label2).show
0
votes
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.functions.udf
var df=Seq((1,2,Array(Test(1,2),Test(5,6))),(1,3,Array(Test(1,2),Test(5,3))),(10,11,Array(Test(1,6)))).toDF("name","rank","array")
 df.show
+----+----+----------------+
|name|rank|           array|
+----+----+----------------+
|   1|   2|[[1, 2], [5, 6]]|
|   1|   3|[[1, 2], [5, 3]]|
|  10|  11|        [[1, 6]]|
+----+----+----------------+
def test={
   udf((a:scala.collection.mutable.WrappedArray[Row])=> {
   val b=a.toArray.map(x=>(x.getInt(0),x.getInt(1)))
   b.filter(y=>y._1>1)
   })}
 df.withColumn("array",test(df("array"))).show
+----+----+--------+
|name|rank|   array|
+----+----+--------+
|   1|   2|[[5, 6]]|
|   1|   3|[[5, 3]]|
|  10|  11|      []|
+----+----+--------+