0
votes

I have a dataframe as below ,I am trying to get the max(sum) for the user groupby name.

+-----+-----------------------------+
|name |nt_set                       |
+-----+-----------------------------+
|Bob  |[av:27.0, bcd:29.0, abc:25.0]|
|Alice|[abc:95.0, bcd:55.0]         |
|Bob  |[abc:95.0, bcd:70.0]         |
|Alice|[abc:125.0, bcd:90.0]        |
+-----+-----------------------------+

Below is the udf I am using to get the max(sum) for the user

val maxfunc = udf((arr: Array[String]) => {
val step1 = arr.map(x => (x.split(":", -1)(0), x.split(":", -1)(1))).groupBy(_._1).mapValues(arr => arr.map(_._2.toInt).sum).maxBy(_._2)
val result = step1._1 + ":" + step1._2
result})

And when I run the udf,Its throwing the below error

 val c6 = c5.withColumn("max_nt", maxfunc(col("nt_set"))).show(false)

Error: Failed to execute user defined function($anonfun$1: (array) =>string)

How do I achieve this in a better performed way because I need to do this in a larger dataset

The expected result is

expected result:
+-----+-----------------------------+
|name |max_nt                       |
+-----+-----------------------------+
|Bob  |abc:120.0                    |
|Alice|abc:220.0                    |
+-----+-----------------------------+
2

2 Answers

1
votes

From what I understand of what you are trying to do, your example is wrong. Alice's bcd fields only sum to 145 whereas her abc fields sum to 220. So abc should be selected for her as well. If I am wrong, then I misunderstood your problem.

Anyway, you don't need a udf to do what you want. Let's generate your data:

val df = sc.parallelize(Seq(
    ("Bob", Array("av:27.0", "bcd:29.0", "abc:25.0")), 
    ("Alice", Array("abc:95.0", "bcd:55.0")), 
    ("Bob", Array("abc:95.0", "bcd:70.0")), 
    ("Alice", Array("abc:125.0", "bcd:90.0"))) )
        .toDF("name", "nt_set")

Then, one way to go is to explode nt_set into a column nt that only contains one string/value pair.

df.withColumn("nt", explode('nt_set))
  //then we split the string and the value
  .withColumn("nt_string", split('nt, ":")(0))
  .withColumn("nt_value", split('nt, ":")(1).cast("int"))
  //then we sum the values by name and "string"
  .groupBy("name", "nt_string")
  .agg(sum('nt_value) as "nt_value")
  /* then we build a struct with the value first to be able to select
     the nt field with max value while keeping the corresponding string */
  .withColumn("nt", struct('nt_value, 'nt_string))
  .groupBy("name")
  .agg(max('nt) as "nt")
  // And we rebuild the "nt" column.
  .withColumn("max_nt", concat_ws(":", $"nt.nt_string", $"nt.nt_value"))
  .drop("nt").show(false)

+-----+-------+
|name |max_nt |
+-----+-------+
|Bob  |abc:120|
|Alice|abc:220|
+-----+-------+
1
votes

The core logic of your maxfunc works correctly except that it should handle a post-groupBy array-column, which is a nested Seq collection:

val df = Seq(
  ("Bob", Seq("av:27.0", "bcd:29.0", "abc:25.0")),
  ("Alice", Seq("abc:95.0", "bcd:55.0")),
  ("Zack", Seq()),
  ("Bob", Seq("abc:50.0", null)),
  ("Bob", Seq("abc:95.0", "bcd:70.0")),
  ("Alice", Seq("abc:125.0", "bcd:90.0"))
).toDF("name", "nt_set")

import org.apache.spark.sql.functions._

val maxfunc = udf( (ss: Seq[Seq[String]]) => {
  val groupedSeq: Map[String, Double] = ss.flatMap(identity).
    collect{ case x if x != null => (x.split(":")(0), x.split(":")(1)) }.
    groupBy(_._1).mapValues(_.map(_._2.toDouble).sum)

  groupedSeq match {
    case x if x == Map.empty[String, Double] => ("", -999.0)
    case _ => groupedSeq.maxBy(_._2)
  }
} )

df.groupBy("name").agg(collect_list("nt_set").as("arr_nt")).
  withColumn("max_nt", maxfunc($"arr_nt")).
  select($"name", $"max_nt._1".as("max_key"), $"max_nt._2".as("max_val")).
  show
// +-----+-------+-------+
// | name|max_key|max_val|
// +-----+-------+-------+
// | Zack|       | -999.0|
// |  Bob|    abc|  170.0|
// |Alice|    abc|  220.0|
// +-----+-------+-------+