2
votes

I have tried numerous approaches to turn the following:

 Gender, Age, Value
 1,      20,  21
 2,      23   22
 1,      26,  23
 2,      29,  24

into

   Male_Age, Male_Value, Female_Age,  Female_Value
     20          21         23           22
     26          23         29           24

What i need to do is group by gender and instead of using an aggregate like (sum, count, avg) I need to create List[age] and List[value]. This should be possible because i am using a Dataset which allows functional operations.

If the number of rows for males and females are not the same, the columns should be filled with nulls.

One approach I tried was to make a new a new dataframe using the columns of other dataframes like so:

df .select(male.select("sex").where('sex === 1).col("sex"),
female.select("sex").where('sex === 2).col("sex"))

However, this bizarrely produces output like so:

sex,    sex,
 1,       1
 2,       2
 1,       1
 2,       2

I can't see how that is possible.

I also tried using pivot, but it forces me to aggregate after the group by:

df.withColumn("sex2", df.col("sex"))
.groupBy("sex")
.pivot("sex2") .agg( sum('value').as("mean"),
stddev('value).as("std. dev") ) .show()

|sex|    1.0_mean|   1.0_std. dev|   2.0_mean|    2.0_std. dev|
|1.0|0.4926065526|   1.8110632697|           |                |
|2.0|            |               |0.951250372|1.75060275400785|

The following code does what I need in Oracle SQL, so it should possible in Spark SQL too I reckon...

drop table mytable

CREATE TABLE mytable 
( gender number(10) NOT NULL,
  age number(10) NOT NULL,   
  value number(10) );

 insert into mytable values (1,20,21); 
 insert into mytable values(2,23,22); 
 insert into mytable values (1,26,23); 
 insert into mytable values (2,29,24); 
 insert into mytable values (1,30,25);

 select * from mytable;


SELECT A.VALUE AS MALE, 
       B.VALUE AS FEMALE 
FROM 
(select value, rownum RN from mytable where gender = 1) A 
FULL OUTER JOIN 
(select value, rownum RN from mytable where gender = 2) B
ON A.RN = B.RN

enter image description here

2

2 Answers

1
votes

The following should give you the result.

val df = Seq(
  (1,      20,  21),
  (2,      23,   22),
  (1,      26,  23),
  (2,      29,  24)
).toDF("Gender", "Age", "Value")

scala> df.show
+------+---+-----+
|Gender|Age|Value|
+------+---+-----+
|     1| 20|   21|
|     2| 23|   22|
|     1| 26|   23|
|     2| 29|   24|
+------+---+-----+

// Gender 1 = Male
// Gender 2 = Female

import org.apache.spark.sql.expressions.Window
val byGender = Window.partitionBy("gender").orderBy("gender")

val males = df
  .filter("gender = 1")
  .select($"age" as "male_age",
          $"value" as "male_value",
          row_number() over byGender as "RN")

scala> males.show
+--------+----------+---+
|male_age|male_value| RN|
+--------+----------+---+
|      20|        21|  1|
|      26|        23|  2|
+--------+----------+---+

val females = df
  .filter("gender = 2")
  .select($"age" as "female_age",
          $"value" as "female_value",
          row_number() over byGender as "RN")

scala> females.show
+----------+------------+---+
|female_age|female_value| RN|
+----------+------------+---+
|        23|          22|  1|
|        29|          24|  2|
+----------+------------+---+

scala> males.join(females, Seq("RN"), "outer").show
+---+--------+----------+----------+------------+
| RN|male_age|male_value|female_age|female_value|
+---+--------+----------+----------+------------+
|  1|      20|        21|        23|          22|
|  2|      26|        23|        29|          24|
+---+--------+----------+----------+------------+
0
votes

Given a DataFrame called df with columns gender, age, and value, you can do this:

df.groupBy($"gender")
  .agg(collect_list($"age"), collect_list($"value")).rdd.map { row =>
     val ages: Seq[Int] = row.getSeq(1)
     val values: Seq[Int] = row.getSeq(2)
     (row.getInt(0), ages.head, ages.last, values.head, values.last)
  }.toDF("gender", "male_age", "female_age", "male_value", "female_value")

This uses the collect_list aggregating function in the very helpful Spark functions library to aggregate the values you want. (As you can see, there is also a collect_set as well.)

After that, I don't know of any higher-level DataFrame functions to expand those columnar arrays into individual columns of their own, so I fall back to the lower-level RDD API our ancestors used. I simply expand everything into a Tuple and then turn it back into a DataFrame. The commenters above mention corner cases I have not addressed; using functions like headOption and tailOption might be useful there. But this should be enough to get you moving.