0
votes

Given a dataframe with an index column ("Z"):

val tmp= Seq(("D",0.1,0.3, 0.4), ("E",0.3, 0.1, 0.4), ("F",0.2, 0.2, 0.5)).toDF("Z", "a", "b", "c")

+---+---+---+---+
| Z |  a|  b|  c|
 ---+---+---+---+
| "D"|0.1|0.3|0.4|
| "E"|0.3|0.1|0.4|
| "F"|0.2|0.2|0.5|
+---+---+---+---+

Say im interested in the first row where Z = "D":

tmp.filter(col("Z")=== "D")
+---+---+---+---+
| Z |  a|  b|  c|
+---+---+---+---+
|"D"|0.1|0.3|0.4|
+---+---+---+---+

How do i get the min and max values of that Dataframe row and its corresponding column name while keeping the index column?

Desired output if i want top 2 max

+---+---+---
| Z |  b|c  |
+---+---+--+
| D |0.3|0.4|
+---+---+---

Desired output if i want min

+---+---+
| Z |  a|
+---+---+
| D |0.1|
+---+---+

What i tried:

// first convert that DF to an array
val tmp = df.collect.map(_.toSeq).flatten
// returns 
tmp: Array[Any] = Array(0.1, 0.3, 0.4) <---dont know why Any is returned


//take top values of array
val n = 1
tmp.zipWithIndex.sortBy(-_._1).take(n).map(_._2)

But got error:

   No implicit Ordering defined for Any.

Any way to do it straight from dataframe instead of array?

2
could you please provide more details regarding desired output and also let me know what is df Dataframe and what is tmp dataframe? - Nikk
@Nikk updated t0 reflect desired output - jxn

2 Answers

1
votes

You can do something like this

tmp
  .where($"a" === 0.1)
  .take(1)
  .map { row =>
      Seq(row.getDouble(0), row.getDouble(1), row.getDouble(2))
  }
  .head
  .sortBy(d => -d)
  .take(2)

Or if you have big amount of fields you can take schema and pattern match row fields against schema data types like this

import org.apache.spark.sql.types._

val schemaWithIndex = tmp.schema.zipWithIndex

tmp
.where($"a" === 0.1)
.take(1)
.map { row =>
    for {
        tuple <- schemaWithIndex
    } yield {
        val field = tuple._1
        val index = tuple._2
        field.dataType match {
            case DoubleType => row.getDouble(index)
        }
    }
}
.head
.sortBy(d => -d)
.take(2)

Maybe there is easier way to do this.

0
votes

Definitely not the fastest way, but straight from dataframe

More generic solution:

// somewhere in codebase
import spark.implicits._
import org.apache.spark.sql.functions._

def transform[T, R : Encoder](ds: DataFrame, colsToSelect: Seq[String])(func: Map[String, T] => Map[String, R])
                            (implicit encoder: Encoder[Map[String, R]]): DataFrame = {
    ds.map(row => func(row.getValuesMap(colsToSelect)))
      .toDF()
      .select(explode(col("value")))
      .withColumn("idx", lit(1))
      .groupBy(col("idx")).pivot(col("key")).agg(first(col("value")))
      .drop("idx")
  }

Now it's about working with Map where the map key is a field name and map value is the field value.

def fuzzyStuff(values: Map[String, Any]): Map[String, String] = {
  val valueForA = values("a").asInstanceOf[Double]
  //Do whatever you want to do
  // ...
  //use map as a return type where key is a column name and value is whatever yo want to
  Map("x" -> (s"fuzzyA-$valueForA"))
}


def maxN(n: Int)(values: Map[String, Double]): Map[String, Double] = {
 println(values)
 values.toSeq.sorted.reverse.take(n).toMap
}

Usage:

val tmp = Seq((0.1,0.3, 0.4), (0.3, 0.1, 0.4), (0.2, 0.2, 0.5)).toDF("a", "b", "c")
val filtered = tmp.filter(col("a") === 0.1)

transform(filtered, colsToSelect = Seq("a", "b", "c"))(maxN(2))
   .show()

+---+---+
|  b|  c|
+---+---+
|0.3|0.4|
+---+---+

transform(filtered, colsToSelect = Seq("a", "b", "c"))(fuzzyStuff)
   .show()

+----------+
|         x|
+----------+
|fuzzyA-0.1|
+----------+

  1. Define max and min functions
  def maxN(values: Map[String, Double], n: Int): Map[String, Double] = {
    values.toSeq.sorted.reverse.take(n).toMap
  }

  def min(values: Map[String, Double]): Map[String, Double] = {
    Map(values.toSeq.min)
  }
  1. Create dataset
val tmp= Seq((0.1,0.3, 0.4), (0.3, 0.1, 0.4), (0.2, 0.2, 0.5)).toDF("a", "b", "c")
val filtered = tmp.filter(col("a") === 0.1)
  1. Exple and pivot map type
val df = filtered.map(row => maxN(row.getValuesMap(Seq("a", "b", "c")), 2)).toDF()

val exploded = df.select(explode($"value"))
+---+-----+
|key|value|
+---+-----+
|  a|  0.1|
|  b|  0.3|
+---+-----+

//Then pivot
exploded.withColumn("idx", lit(1))
      .groupBy($"idx").pivot($"key").agg(first($"value"))
      .drop("idx")
      .show()

+---+---+
|  b|  c|
+---+---+
|0.3|0.4|
+---+---+