0
votes

I have a DataFrame with a structure similar to:

 |-- npaDetails: struct (nullable = true)
 |    |-- additionalInformation: struct (nullable = true)
 |    |-- couponDetails: array (nullable = true)
 |    |-- npaService: struct (nullable = true)
 |-- npaHeaderData: struct (nullable = true)
 |    |-- npaDownloadDate: string (nullable = true)
 |    |-- npaDownloadTime: string (nullable = true)
 |    |-- npaIssuanceDate: string (nullable = true)
 |    |-- npaNumber: string (nullable = true)

I was planning to group the rows depending on the field npaNumber to later operate over the groups which has more than one element (so groups of rows with same npaNumber). So I wrote the following:

df.groupBy($"npaHeaderData.npaNumber") 
  .count()
  .filter("count > 1")
  .foreach { x => println(x) }

With the foreach I was expecting to display groups of rows with the same npaNumber, but instead the elements I was displaying was just the npaNumber and the count value:

[3487208122633,2]
[5668207771332,3]
[3567207579910,4]
[5768207822303,2]
[9868207960414,7]

I also tried the following without success:

val groupedDF = df.groupBy($"npaHeaderData.npaNumber").agg($"npaDetails", $"npaHeaderData")

But I get an error message:

Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'npaHeaderData' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;; Aggregate [npaHeaderData#6.npaNumber], [npaHeaderData#6.npaNumber AS npaNumber#36, npaHeaderData#6]

How can I access/operate independently with each group of rows, which have been grouped by a column.attribute in the dataframe?

Just in case it is relevant, the next operation I am planning to perform over those groups of rows is a order by, based on npaHeaderData.npaIssuanceDate

Thanks for your time

1

1 Answers

2
votes

Aggregations would not preserve the original rows, instead it will aggregate all the rows in the grouped data and give you one aggregated row only. And you will have only those columns used in groupBy and agg functions.

If you are intending to preserve all the rows and all the columns, you should go with withColumn api and use Window function.

df.withColumn("count", count($"npaHeaderData").over(Window.partitionBy("npaHeaderData.npaNumber")))
    .filter($"count">1)

This should give you all the rows of grouped dataframe and with all the columns + count column which is going to filter all the rows whose count is > 1. If you don't want count column, you can use .drop("count")