2
votes

I have a dataframe with columns col1,col2,col3. col1,col2 are strings. col3 is a Map[String,String] defined below

 |-- col3: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

I have grouped by col1,col2 and aggregated using collect_list to get an Array of Maps and stored in col4.

 df.groupBy($"col1", $"col2").agg(collect_list($"col3").as("col4"))

 |-- col4: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

However I would like to get col4 as a single map with all the maps combined. Currently I have:

[[a->a1,b->b1],[c->c1]]

Expected output

[a->a1,b->b1,c->c1]

Using an udf would be ideal?

Any help is appreciated. Thanks.

2
you can create user defined aggregate function in that case: docs.databricks.com/spark/latest/spark-sql/udaf-scala.htmlUninformedUser

2 Answers

2
votes

You can use aggregate and map_concat:

import org.apache.spark.sql.functions.{expr, collect_list}

val df = Seq(
  (1, Map("k1" -> "v1", "k2" -> "v3")),
  (1, Map("k3" -> "v3")),
  (2, Map("k4" -> "v4")),
  (2, Map("k6" -> "v6", "k5" -> "v5"))
).toDF("id", "data")

val mergeExpr = expr("aggregate(data, map(), (acc, i) -> map_concat(acc, i))")

df.groupBy("id").agg(collect_list("data").as("data"))
  .select($"id", mergeExpr.as("merged_data"))
  .show(false)

// +---+------------------------------+
// |id |merged_data                   |
// +---+------------------------------+
// |1  |[k1 -> v1, k2 -> v3, k3 -> v3]|
// |2  |[k4 -> v4, k6 -> v6, k5 -> v5]|
// +---+------------------------------+

With map_concat we concatenate all the Map items of the data column via the aggregate build-in function which allows us to apply the aggregation to the pairs of the list.

Attention: current implementation of map_concat on Spark 2.4.5 it allows co-existence of identical keys. This is most likely a bug since it is not the expected behaviour according to the official documentation. Please be aware of that.

If you want to avoid such a case you can also go for a UDF:

import org.apache.spark.sql.functions.{collect_list, udf}

val mergeMapUDF = udf((data: Seq[Map[String, String]]) => data.reduce(_ ++ _))

df.groupBy("id").agg(collect_list("data").as("data"))
  .select($"id", mergeMapUDF($"data").as("merged_data"))
  .show(false)
0
votes

You can achieve it without UDF. Let's create your dataframe:

val df = Seq(Seq(Map("a" -> "a1", "b" -> "b1"), Map("c" -> "c1", "d" -> "d1"))).toDF()
df.show(false)
df.printSchema()

output:

+----------------------------------------+
|value                                   |
+----------------------------------------+
|[[a -> a1, b -> b1], [c -> c1, d -> d1]]|
+----------------------------------------+

root
 |-- value: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

If your array contains 2 elements just use map_concat:

df.select(map_concat('value.getItem(0), 'value.getItem(1))).show(false)

or this (I have no idea how to loop from 0 to 'value array type column size dynamically, it might be the shortest solution)

df.select(map_concat((for {i <- 0 to 1} yield 'value.getItem(i)): _*)).show(false)

Otherwise if your array contains multiple maps and size is not known you could try below method:

  val df2 = df.map(s => {
    val list = s.getList[Map[String, String]](0)
    var map = Map[String, String]()
    for (i <- 0 to list.size() - 1) {
      map = map ++ list.get(i)
    }
    map
  })

  df2.show(false)
  df2.printSchema()

output:

+------------------------------------+
|value                               |
+------------------------------------+
|[a -> a1, b -> b1, c -> c1, d -> d1]|
+------------------------------------+

root
 |-- value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)