0
votes

Pyspark allows you to create a Dictionary when a single a single row is returned from the dataframe using the below approach.

t=spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)")).collect()[0].asDict()
print(t)
print(t["key"])
print(t["value"])
print(t["rw"])
print("Printing using for comprehension")
[print(t[i]) for i in t ]

Results:

{'key': 'spark.app.id', 'value': 'local-1594577194330', 'rw': 1}
spark.app.id
local-1594577194330
1
Printing using for comprehension
spark.app.id
local-1594577194330
1

I'm trying the same in scala-spark. It is possible using case class approach.

case class download(key:String, value:String,rw:Long)

val t=spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)")).as[download].first
println(t)
println(t.key)
println(t.value)
println(t.rw)

Results:

download(spark.app.id,local-1594580739413,1)
spark.app.id
local-1594580739413
1

In actual problem, I have nearly 200+ columns and don't want to use case class approach. I'm trying something like below to avoid the case class option.

val df =spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)"))

(df.columns).zip(df.take(1)(0))

but getting error.

    <console>:28: error: type mismatch;
 found   : (String, String, Long)
 required: Iterator[?]
       (df.columns.toIterator).zip(df.take(1)(0))

Is there a way to solve this.

2
You can turn tuples (products) into Iterators with tuple.productIterator - user

2 Answers

1
votes

In scala, there is a method getValuesMap to convert a row into Map[columnName: String, columnValue: T]. Try using the same as below-

  val df =spark.sql("SET").withColumn("rw",expr("row_number() over(order by key)"))
    df.show(false)
    df.printSchema()

    /**
      * +----------------------------+-------------------+---+
      * |key                         |value              |rw |
      * +----------------------------+-------------------+---+
      * |spark.app.id                |local-1594644271573|1  |
      * |spark.app.name              |TestSuite          |2  |
      * |spark.driver.host           |192.168.1.3        |3  |
      * |spark.driver.port           |58420              |4  |
      * |spark.executor.id           |driver             |5  |
      * |spark.master                |local[2]           |6  |
      * |spark.sql.shuffle.partitions|2                  |7  |
      * +----------------------------+-------------------+---+
      *
      * root
      * |-- key: string (nullable = false)
      * |-- value: string (nullable = false)
      * |-- rw: integer (nullable = true)
      */

    val map = df.head().getValuesMap(df.columns)
    println(map)
    println(map("key"))
    println(map("value"))
    println(map("rw"))
    println("Printing using for comprehension")
    map.foreach(println)

    /**
      * Map(key -> spark.app.id, value -> local-1594644271573, rw -> 1)
      * spark.app.id
      * local-1594644271573
      * 1
      * Printing using for comprehension
      * (key,spark.app.id)
      * (value,local-1594644271573)
      * (rw,1)
      */
1
votes

The problem is that zip is a method on a collection which can only take another collection object which implements IterableOnce, and df.take(1)(0) is a Spark SQL Row, which doesn't fall into that category.

Try converting the row to a Seq using its toSeq method.

df.columns.zip(df.take(1)(0).toSeq)

result:

Array((key,spark.app.id), (value,local-1594577194330), (rw,1))