0
votes

I want to merge multiple ArrayType[StringType] columns in spark to create one ArrayType[StringType]. For combining two columns I found the soluton here:

Merge two spark sql columns of type Array[string] into a new Array[string] column

But how do I go about combining, if I don't know the number of columns at compile time. At run time, I will know the names of all the columns to be combined.

One option is to use the UDF defined in the above stackoverflow question, to add two columns, multiple times in a loop. But this involves multiple reads on the entire dataframe. Is there a way to do this in just one go?

+------+------+---------+
| col1 | col2 | combined|
+------+------+---------+
| [a,b]| [i,j]|[a,b,i,j]|
| [c,d]| [k,l]|[c,d,k,l]|
| [e,f]| [m,n]|[e,f,m,n]|
| [g,h]| [o,p]|[g,h,o,p]|
+------+----+-----------+
2

2 Answers

0
votes
val arrStr: Array[String] = Array("col1", "col2")

val arrCol: Array[Column] = arrString.map(c => df(c))

val assembleFunc = udf { r: Row => assemble(r.toSeq: _*)}

val outputDf = df.select(col("*"), assembleFunc(struct(arrCol: 
_*)).as("combined"))

def assemble(rowEntity: Any*): 
                    collection.mutable.WrappedArray[String] = {

 var outputArray = 
 rowEntity(0).asInstanceOf[collection.mutable.WrappedArray[String]]

  rowEntity.drop(1).foreach {
    case v: collection.mutable.WrappedArray[String] =>
      outputArray ++= v
    case null =>
      throw new SparkException("Values to assemble cannot be 
      null.")
    case o =>
      throw new SparkException(s"$o of type ${o.getClass.getName} 
      is not supported.")
 }

outputArray
}

outputDf.show(false)    
-1
votes
  1. Process the dataframe schema and get all the columns of the type ArrayType[StringType].

  2. create a new dataframe with functions.array_union of the first two columns

  3. iterate through the rest of the columns and adding each of them to the combined column

>>>from pyspark import Row
>>>from pyspark.sql.functions import array_union
>>>df = spark.createDataFrame([Row(col1=['aa1', 'bb1'], 
                                col2=['aa2', 'bb2'],
                                col3=['aa3', 'bb3'], 
                                col4= ['a', 'ee'], foo="bar"
                               )])
>>>df.show()
+----------+----------+----------+-------+---+
|      col1|      col2|      col3|   col4|foo|
+----------+----------+----------+-------+---+
|[aa1, bb1]|[aa2, bb2]|[aa3, bb3]|[a, ee]|bar|
+----------+----------+----------+-------+---+
>>>cols = [col_.name for col_ in df.schema 
...       if col_.dataType == ArrayType(StringType()) 
...        or col_.dataType == ArrayType(StringType(), False)
...       ]
>>>print(cols)
['col1', 'col2', 'col3', 'col4']
>>>
>>>final_df = df.withColumn("combined", array_union(cols[:2][0], cols[:2][1]))
>>>
>>>for col_ in cols[2:]:
...    final_df = final_df.withColumn("combined", array_union(col('combined'), col(col_)))
>>>
>>>final_df.select("combined").show(truncate=False)
+-------------------------------------+
|combined                             |
+-------------------------------------+
|[aa1, bb1, aa2, bb2, aa3, bb3, a, ee]|
+-------------------------------------+