1
votes

I have a use case where I intend to group by key(s) while aggregating over column(s). I am using Dataset and tried to achieve these operations by using groupBy and agg. For example take the following scenario

case class Result(deptId:String,locations:Seq[String])
case class Department(deptId:String,location:String)

// using spark 2.0.2
// I have a Dataset `ds` of type Department   

+-------+--------------------+
|deptId |      location      |
+-------+--------------------+
|     d1|delhi               |            
|     d1|mumbai              |
|    dp2|calcutta            |
|    dp2|hyderabad           |       
+-------+--------------------+

I intended to convert it to

// Dataset `result` of type Result

+-------+--------------------+
|deptId |      locations     |
+-------+--------------------+
|     d1|[delhi,mumbai]      |            
|    dp2|[calcutta,hyderabad]|            
+-------+--------------------+

For this I searched on stack and found the following:

val flatten = udf(
  (xs: Seq[Seq[String]]) => xs.flatten)

val result = ds.groupBy("deptId").
                agg(flatten(collect_list("location")).as("locations")

The above seemed pretty neat for me.

  1. But before searching for the above, I first searched if Dataset had an inbuilt reduceByKey like a RDD does. But couldn't find, so opted for above. But I read this article grouByKey vs reduceByKey and came to know reduceByKey has less shuffles and is more efficient. Which is my first reason to ask the question, should I opt for RDD in my scenario ?
  2. The reason I initially went for Dataset was solely enforcement of type,ie. each row being of type Department. But as my result has an entirely different schema should I bother with type safety ? So I tried doing result.as[Result] but that doesn't seem to do any compile time type check. Another reason I chose Dataset was, I'll pass the result Dataset to some other function, having a structure makes code easy to maintain. Also the case class can be highly nested, I cannot imagine maintaining that nesting in pairRDD while writing reduce/map operations.
  3. Another thing I'm unsure of is about using udf. I came across post, where people said they would prefer changing Dataset to RDD, rather than using udf for complex aggregations/grouby.
  4. I also googled around a bit and saw posts/articles where people said Dataset has overhead of type checking, but in higher spark version is better performance wise compared to RDD. Again not sure should I switch back to RDD ?

PS: please forgive, if I used some terms incorrectly.

1

1 Answers

4
votes

To answer some of you questions:

  • groupBy + agg is not groupByKey - DataFrame / Dataset groupBy behaviour/optimization - in general case. There are specific cases where it might behave like one, this includes collect_list.
  • reduceByKey is not better than RDD-style groupByKey when groupByKey-like logic is required - Be Smart About groupByKey - and in fact it is almost always worse.

  • There is a important trade-off between static type checking and performance in Spark's Dataset - Spark 2.0 Dataset vs DataFrame

  • The linked post specifically advises against using UserDefinedAggregateFunction (not UserDefinedFunction) because of excessive copying of data - Spark UDAF with ArrayType as bufferSchema performance issues

  • You don't even need UserDefinedFunction as flattening is not required in your case:

    val df = Seq[Department]().toDF
    
    df.groupBy("deptId").agg(collect_list("location").as("locations"))
    

    And this is what you should go for.

    A statically typed equivalent would be

    val ds = Seq[Department]().toDS
    
    ds
      .groupByKey(_.deptId)
      .mapGroups { case (deptId, xs) => Result(deptId, xs.map(_.location).toSeq) }
    

    considerably more expensive than the DataFrame option.