0
votes

I have a Spark RDD (or Dataframe - converting to either is not a problem) that has the following columns (example for each structure):

res248: org.apache.spark.rdd.RDD[(String, Array[String])] = MapPartitionsRDD[1004] at map at <console>:246
org.apache.spark.sql.DataFrame = [id: string, list: array<string>]

I want to expand this RDD/DF to have an additional column containing the size of the list array. So the output should be something like this (example):

org.apache.spark.sql.DataFrame = [id: string, list: array<string>, length_of_list: int]

I tried to do an rdd.map(x=> (x._1,x._2,count(x._2))) but got an error message:

<console>:246: error: overloaded method value count with alternatives:
  (columnName: String)org.apache.spark.sql.TypedColumn[Any,Long] <and>
  (e: org.apache.spark.sql.Column)org.apache.spark.sql.Column

Tried adding a new column using DF with the function withColumn("new_column",count($"list")) or any variation thereof. It still doesn't work. I get an error message complaining about aggregation.

Do you know of a way to achieve this without having to collect the RDD?

2

2 Answers

1
votes

There is inbuilt function size which returns the length of array or map.

import org.apache.spark.sql.functions._
df.withColumn("length_of_list", size($"list"))
1
votes

You can create the new column using a simple UDF to apply to column list as follows:

val df = Seq(
  ("a", Array("x1", "x2", "x3")),
  ("b", Array("y1", "y2", "y3", "y4"))
).toDF(
  "id", "list"
)
// df: org.apache.spark.sql.DataFrame = [id: string, list: array<string>]

val listSize = (l: Seq[String]) => l.size
// listSize: Seq[String] => Int = <function1>

def listSizeUDF = udf(listSize)
// listSizeUDF: org.apache.spark.sql.expressions.UserDefinedFunction

val df2 = df.withColumn("length_of_list", listSizeUDF($"list"))

df2.show
+---+----------------+--------------+
| id|            list|length_of_list|
+---+----------------+--------------+
|  a|    [x1, x2, x3]|             3|
|  b|[y1, y2, y3, y4]|             4|
+---+----------------+--------------+

[UPDATE]

As pointed out by @Ramesh Maharjan, there is a built-in size function in Spark which I somehow forgot about. I'll leave the old answer as a simple use case for using UDF.