0
votes

I am trying to group the below dataset based on the column "id" and sum the arrays in the column "values" element-wise. How do I do it in Spark using Scala?

Input: (dataset of 2 columns, column1 of type String and column2 of type Array[Int])

| id | values |
---------------
| A | [12,61,23,43]
| A | [43,11,24,45]
| B | [32,12,53,21]
| C | [11,12,13,14]
| C | [43,52,12,52]
| B | [33,21,15,24]

Expected Output: (dataset or dataframe)

| id | values |
---------------
| A | [55,72,47,88]
| B | [65,33,68,45]
| C | [54,64,25,66]

Note: The result has to be flexible and dynamic. That is, even if there are 1000s of columns or even if the file is several TBs or PBs, the solution should still hold good.

2
something like sqlDF.groupBy("id").sum("values").as("sum_values").select("id", "sum_values") should work for you.Ilya Brodezki

2 Answers

2
votes

I'm a little unsure about what you mean when you say it has to be flexible, but just on top of my head, I can think of a couple of ways. The first (and in my opinion the prettiest) one uses a udf:

// Creating a small test example
val testDF = spark.sparkContext.parallelize(Seq(("a", Seq(1,2,3)), ("a", Seq(4,5,6)), ("b", Seq(1,3,4)))).toDF("id", "arr")
val sum_arr = udf((list: Seq[Seq[Int]]) => list.transpose.map(arr => arr.sum))

testDF
  .groupBy('id)
  .agg(sum_arr(collect_list('arr)) as "summed_values")

If you have billions of identical ids, however, the collect_list will of course be a problem. In that case you could do something like this:

testDF
  .flatMap{case Row(id: String, list: Seq[Int]) => list.indices.map(index => (id, index, list(index)))}
  .toDF("id", "arr_index", "arr_element")
  .groupBy('id, 'arr_index)
  .agg(sum("arr_element") as "sum")
  .groupBy('id)
  .agg(collect_list('sum) as "summed_values")
0
votes

The below single-line solution worked for me

ds.groupBy("Country").agg(array((0 until n).map(i => sum(col("Values").getItem(i))) :_* ) as "Values")