11
votes

I'm quite new both Spark and Scale and could really need a hint to solve my problem. So I have two DataFrames A (columns id and name) and B (columns id and text) would like to join them, group by id and combine all rows of text into a single String:

A

+--------+--------+
|      id|    name|
+--------+--------+
|       0|       A|
|       1|       B|
+--------+--------+

B

+--------+ -------+
|      id|    text|
+--------+--------+
|       0|     one|
|       0|     two|
|       1|   three|
|       1|    four|
+--------+--------+

desired result:

+--------+--------+----------+
|      id|    name|     texts|
+--------+--------+----------+
|       0|       A|   one two|
|       1|       B|three four|
+--------+--------+----------+

So far I'm trying the following:

var C = A.join(B, "id")
var D = C.groupBy("id", "name").agg(collect_list("text") as "texts")

This works quite well besides that my texts column is an Array of Strings instead of a String. I would appreciate some help very much.

2

2 Answers

13
votes

I am just adding some minor functions in yours to give the right solution, which is

A.join(B, Seq("id"), "left").orderBy("id").groupBy("id", "name").agg(concat_ws(" ", collect_list("text")) as "texts")
0
votes

It's quite simple:

val bCollected = b.groupBy('id).agg(collect_list('text).as("texts")
val ab = a.join(bCollected, a("id") == bCollected("id"), "left")

First DataFrame is immediate result, b DataFrame that has texts collected for every id. Then you are joining it with a. bCollected should be smaller that b itself, so it will probably get better shuffle time