I'm trying to split a dataframe according to the values of one (or more) column and rotate each resulting dataframe independently from the rest. Namely, given an input dataframe:
val inputDF = Seq(("tom","20","a","street a","germany"),("jimmy","30","b","street b","germany"),
("lola","50","c","street c","argentina"), ("maria","60","d","street d","argentina"), ("joe","70","e","street e","argentina")
.toDF("name","age","company","address","country")
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//| tom| 20| a|street a| germany|
//|jimmy| 30| b|street b| germany|
//| lola| 40| c|street c|argentina|
//|maria| 50| d|street d|argentina|
//| joe| 60| e|street e|argentina|
//+-----+---+-------+--------+---------+
I need to split the records by the different values of the "country" column. For the input dataframe the split should produce:
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//| tom| 20| a|street a| germany|
//|jimmy| 30| b|street b| germany|
//+-----+---+-------+--------+---------+
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//| lola| 40| c|street c|argentina|
//|maria| 50| d|street d|argentina|
//| joe| 60| e|street e|argentina|
//+-----+---+-------+--------+---------+
And I also have to rotate the "name" and "age" columns under each dataframe in such a way that each person has a different company and address while keeping intact the rest of the columns. The desired output dataframe would look like the following:
//+-----+---+-------+--------+---------+
//| name|age|company| address| country|
//+-----+---+-------+--------+---------+
//|jimmy| 30| a|street a| germany|
//| tom| 20| b|street b| germany|
//| joe| 60| c|street c|argentina|
//| lola| 40| d|street d|argentina|
//|maria| 50| e|street e|argentina|
//+-----+---+-------+--------+---------+
the final row order does not matter
My 1st attempt (running on the Spark-shell)
I tried to assign a unique id to each row, then shuffle the desired columns (name and age) and join the reordered dataframe with the rest of the dataframe using the auxiliar id value. The main problem here is the use of collect() which can be dangerous with big dataframes, and repartition(1) which almost goes against the distributed computation and Spark (it has been used to avoid exceptions when zipping rdds with different number of partitions).
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, rand}
import org.apache.spark.sql.types.LongType
// column(s) names to split the input dataframe
val colToSplit = Seq("country")
val splitCols = colToSplit.map(col)
// list of columns names to be rotated (together)
val colsToRotate = Seq("name", "age")
val rotateCols = colsToRotate.map(col) :+ col(auxCol)
// add an auxiliar column for joining the dataframe in the final step
val auxCol = "aux"
val dfWithID = inputDF.withColumn(auxCol, monotonically_increasing_id())
val splitValuesSchema = dfWithID.select(splitCols: _*).schema
// create one dataframe for each value of the splitting column
val splitValuesDFs = dfWithID.select(splitCols: _*).distinct().collect()
.map(row => spark.sparkContext.makeRDD(List(row)))
.map(rdd => spark.createDataFrame(rdd, splitValuesSchema))
val rotateIDCols = Array(auxCol) ++ colsToRotate
// join the split values with their records (DFs with id + colsToRotate)
val splittedDFs = splitValuesDFs
.map(df => df.join(dfWithID, colToSplit).selectExpr(rotateIDCols: _*))
// random reorder the auxiliar id column (DFs with random ids)
val randIdDFs = splittedDFs
.map(df => df.select(auxCol).orderBy(rand()).toDF())
// get rdds with random ids
val randIdRdds = randIdDFs
.map(df => df.select(auxCol).rdd.map(row => row(0)))
// finally, zip and append the rdds with the random ids to the dataframes created by
// splitting the main df to obtain the rotated dataframe with all the data
val tuples = (splittedDFs, randIdRdds).zipped
val newRdds = tuples
.map((df: DataFrame, rdd) => df.rdd.repartition(1).zip(rdd.repartition(1))
.map(row => Row.fromSeq(row._1.toSeq ++ Seq(row._2))))
val tuples2 = (splittedDFs, newRdds).zipped
val rotatedDF = tuples2.map((df: DataFrame, rdd) => spark
.createDataFrame(rdd, df.schema.add("rotated_id", LongType)).drop(auxCol))
.reduce(_ union _).withColumnRenamed("rotated_id", "column2join")
// get the rest of the columns
val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col)
val noRotatedDF = dfWithID.select(noRotateCols: _*)
.withColumnRenamed(auxCol, "column2join")
// join both dataframes
val outputDF = noRotatedDF.join(rotatedDF, "column2join")
.select(inputDF.columns.map(col): _*) // to keep the initial columns order
Showing the output dataframe produces a similar result to the expected output abovementioned (it basically depends on the rand() function order)
I would like to avoid the use of collects and repartitions where possible and to get a more functional solution.
Any comment or idea is welcome!
orderBy(rand())call to shuffle the rows (in fact, the monotonically id row) and then appending them to the dataframe with the columns to rotate. Please, excuse my English. - d_ccollectandrepartitioncalls and find a better solution to be able to manage huge dataframes. I'm just looking for some other ways to reach to the desired result. - d_c