6
votes

Below is the spark scala code which will print one column DataSet[Row]:

import org.apache.spark.sql.{Dataset, Row, SparkSession}
val spark: SparkSession = SparkSession.builder()
        .appName("Spark DataValidation")
        .config("SPARK_MAJOR_VERSION", "2").enableHiveSupport()
        .getOrCreate()

val kafkaPath:String="hdfs:///landing/APPLICATION/*"
val targetPath:String="hdfs://datacompare/3"
val pk:String = "APPLICATION_ID" 
val pkValues = spark
        .read
        .json(kafkaPath)
        .select("message.data.*")
        .select(pk)
        .distinct() 
pkValues.show()

Output of about code :

+--------------+
|APPLICATION_ID|
+--------------+
|           388|
|           447|
|           346|
|           861|
|           361|
|           557|
|           482|
|           518|
|           432|
|           422|
|           533|
|           733|
|           472|
|           457|
|           387|
|           394|
|           786|
|           458|
+--------------+

Question :

How to convert this dataframe to comma separated String variable ?

Expected output :

val   data:String= "388,447,346,861,361,557,482,518,432,422,533,733,472,457,387,394,786,458"

Please suggest how to convert DataFrame[Row] or Dataset to one String .

2

2 Answers

6
votes

I don't think that's a good idea, since a dataFrame is a distributed object and can be inmense. Collect will bring all the data to the driver, so you should perform this kind operation carefully.

Here is what you can do with a dataFrame (two options):

df.select("APPLICATION_ID").rdd.map(r => r(0)).collect.mkString(",")
df.select("APPLICATION_ID").collect.mkString(",")

Result with a test dataFrame with only 3 rows:

String = 388,447,346

Edit: With DataSet you can do directly:

ds.collect.mkString(",")
0
votes

Use collect_list:

import org.apache.spark.sql.functions._
val data = pkValues.select(collect_list(col(pk))) // collect to one row
    .as[Array[Long]] // set encoder, so you will have strongly-typed Dataset
    .take(1)(0) // get the first row - result will be Array[Long]
    .mkString(",") // and join all values

However, it's quite a bad idea to perform collect or take of all rows. Instead, you may want to save pkValues somewhere with .write? Or make it an argument to other function, to keep distributed computing

Edit: Just noticed, that @SCouto posted other answer just after me. Collect will also be correct, with collect_list function you have one advantage - you can easily go grouping if you want and i.e. group keys to even and odd ones. It's up to you which solution you prefer, simpler with collect or one line longer, but more powerful