4
votes

I'm trying to split a dataframe according to the values of one (or more) column and rotate each resulting dataframe independently from the rest. Namely, given an input dataframe:

val inputDF = Seq(("tom","20","a","street a","germany"),("jimmy","30","b","street b","germany"),
                  ("lola","50","c","street c","argentina"), ("maria","60","d","street d","argentina"), ("joe","70","e","street e","argentina")
                  .toDF("name","age","company","address","country")

//+-----+---+-------+--------+---------+
//| name|age|company| address|  country|
//+-----+---+-------+--------+---------+
//|  tom| 20|      a|street a|  germany|
//|jimmy| 30|      b|street b|  germany|
//| lola| 40|      c|street c|argentina|
//|maria| 50|      d|street d|argentina|
//|  joe| 60|      e|street e|argentina|
//+-----+---+-------+--------+---------+

I need to split the records by the different values of the "country" column. For the input dataframe the split should produce:

//+-----+---+-------+--------+---------+
//| name|age|company| address|  country|
//+-----+---+-------+--------+---------+
//|  tom| 20|      a|street a|  germany|
//|jimmy| 30|      b|street b|  germany|
//+-----+---+-------+--------+---------+

//+-----+---+-------+--------+---------+
//| name|age|company| address|  country|
//+-----+---+-------+--------+---------+
//| lola| 40|      c|street c|argentina|
//|maria| 50|      d|street d|argentina|
//|  joe| 60|      e|street e|argentina|
//+-----+---+-------+--------+---------+

And I also have to rotate the "name" and "age" columns under each dataframe in such a way that each person has a different company and address while keeping intact the rest of the columns. The desired output dataframe would look like the following:

//+-----+---+-------+--------+---------+
//| name|age|company| address|  country|
//+-----+---+-------+--------+---------+
//|jimmy| 30|      a|street a|  germany|
//|  tom| 20|      b|street b|  germany|
//|  joe| 60|      c|street c|argentina|
//| lola| 40|      d|street d|argentina|
//|maria| 50|      e|street e|argentina|
//+-----+---+-------+--------+---------+

the final row order does not matter

My 1st attempt (running on the Spark-shell)

I tried to assign a unique id to each row, then shuffle the desired columns (name and age) and join the reordered dataframe with the rest of the dataframe using the auxiliar id value. The main problem here is the use of collect() which can be dangerous with big dataframes, and repartition(1) which almost goes against the distributed computation and Spark (it has been used to avoid exceptions when zipping rdds with different number of partitions).

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, rand}
import org.apache.spark.sql.types.LongType

// column(s) names to split the input dataframe
val colToSplit = Seq("country")
val splitCols = colToSplit.map(col)

// list of columns names to be rotated (together)
val colsToRotate = Seq("name", "age")
val rotateCols = colsToRotate.map(col) :+ col(auxCol)

// add an auxiliar column for joining the dataframe in the final step
val auxCol = "aux"
val dfWithID = inputDF.withColumn(auxCol, monotonically_increasing_id())

val splitValuesSchema = dfWithID.select(splitCols: _*).schema

// create one dataframe for each value of the splitting column
val splitValuesDFs = dfWithID.select(splitCols: _*).distinct().collect()
  .map(row => spark.sparkContext.makeRDD(List(row)))
  .map(rdd => spark.createDataFrame(rdd, splitValuesSchema))

val rotateIDCols = Array(auxCol) ++ colsToRotate

// join the split values with their records (DFs with id + colsToRotate)
val splittedDFs = splitValuesDFs
  .map(df => df.join(dfWithID, colToSplit).selectExpr(rotateIDCols: _*))

// random reorder the auxiliar id column (DFs with random ids)
val randIdDFs = splittedDFs
  .map(df => df.select(auxCol).orderBy(rand()).toDF())

// get rdds with random ids
val randIdRdds = randIdDFs
  .map(df => df.select(auxCol).rdd.map(row => row(0)))

// finally, zip and append the rdds with the random ids to the dataframes created by
// splitting the main df to obtain the rotated dataframe with all the data
val tuples = (splittedDFs, randIdRdds).zipped
val newRdds = tuples
  .map((df: DataFrame, rdd) => df.rdd.repartition(1).zip(rdd.repartition(1))
  .map(row => Row.fromSeq(row._1.toSeq ++ Seq(row._2))))

val tuples2 = (splittedDFs, newRdds).zipped
val rotatedDF = tuples2.map((df: DataFrame, rdd) => spark
  .createDataFrame(rdd, df.schema.add("rotated_id", LongType)).drop(auxCol))
  .reduce(_ union _).withColumnRenamed("rotated_id", "column2join")

// get the rest of the columns
val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col)
val noRotatedDF = dfWithID.select(noRotateCols: _*)
  .withColumnRenamed(auxCol, "column2join")

// join both dataframes
val outputDF = noRotatedDF.join(rotatedDF, "column2join")
  .select(inputDF.columns.map(col): _*) // to keep the initial columns order

Showing the output dataframe produces a similar result to the expected output abovementioned (it basically depends on the rand() function order)

I would like to avoid the use of collects and repartitions where possible and to get a more functional solution.

Any comment or idea is welcome!

1
the 1st and 3rd picture look the same - not getting it, well not entirely I see, but not really getting it - thebluephantom
@d_c what is the offset by which you want to rotate? - tourist
@naveenmarri sorry if I'm not understanding you well, I would like to random reassign the values of the columns to rotate to the rest of the columns in the dataframe. My first idea is to use the orderBy(rand()) call to shuffle the rows (in fact, the monotonically id row) and then appending them to the dataframe with the columns to rotate. Please, excuse my English. - d_c
@thebluephantom I would like not to use specially collect and repartition calls and find a better solution to be able to manage huge dataframes. I'm just looking for some other ways to reach to the desired result. - d_c
I have edited the sample dataframes because they were rather confusing, I am so sorry @thebluephantom - d_c

1 Answers

1
votes

I keep trying to find a better, clearer and more functional solution by removing the bad performing calls as much as possible (repartition and some collect). I have added an auxiliary method to index dataframe rows to be able to join unrelated parts (columns or dfs which cannot be joined by any common column). This is my current development, which also removes multiple transformations between rdds and dataframes and looks more readable and comprehensible.

I hope this can help someone with same concerns.

import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions.{col, monotonically_increasing_id, rand}
import org.apache.spark.sql.types.{LongType, StructField, StructType}

// auxiliar method to index row in dataframes
def addRowIndex(df: DataFrame) = spark.createDataFrame(
  df.rdd.zipWithIndex.map { case (row, index) => Row.fromSeq(row.toSeq :+ index) },
  StructType(df.schema.fields :+ StructField("index", LongType, false))
)

// column(s) names to split the input dataframe
val colToSplit = Seq("country")
val splitCols = colToSplit.map(col)

// list of columns names to be rotated (together)
val colsToRotate = Seq("name", "age")

// add an auxiliar column for joining the dataframe in the final step
val auxCol = "aux"
val dfWithID = inputDF.withColumn(auxCol, monotonically_increasing_id())

val rotateIDCols = (Array(auxCol) ++ colsToRotate).map(col)

// get an array of dfs with the different values of the splitter column(s)
// --assuming there will not be too much different values in the splitter column--
val filterValues = dfWithID.select(splitCols: _*).distinct().collect()

// retrieve the different dfs according to the splitter values
val splitDfs = filterValues.map(filterRow => filterRow.getValuesMap(colToSplit)
  .foldLeft(dfWithID) {
    (df, filterField) =>
      df.filter(col(filterField._1) === filterField._2)
        .select(rotateIDCols: _*)
  })

// get and random reorder the aux id column for each dataframe
val randIdDfs = splitDfs.map(_.select(auxCol).orderBy(rand()).toDF())

// remove aux column for each dataframe
val splitWithoutIdDfs = splitDfs.map(_.drop(auxCol))

val dfsTuples = splitWithoutIdDfs.zip(randIdDfs)

// index row of dfs with columns to rotate and dfs with random ids
val indexedDfsTuples = dfsTuples.map {
  case (colsDf, idsDf) => (addRowIndex(colsDf), addRowIndex(idsDf))
}

// join reordered-ids dfs and cols to rotate dataframes by the index
val reorderedDfs = indexedDfsTuples.map {
  case (df1, df2) => df1.join(df2, Seq("index"))
    .drop("index").withColumnRenamed(auxCol, "column2join")
}

// union both dataframes to create the rotated df
reorderedDfs.tail.foldLeft(reorderedDfs.head) { (acc, df) => acc.union(df) }

// get the rest of the columns to get the part of the main df which does not change
val noRotateCols = dfWithID.columns.diff(colsToRotate).map(col)
val noRotatedDF = dfWithID.select(noRotateCols: _*)
  .withColumnRenamed(auxCol, "column2join")

// join the rotated and no rotated dataframes
val outputDF = noRotatedDF.join(rotatedDF, "column2join")
  .select(inputDF.columns.map(col): _*) // to keep the initial columns order