0
votes

I have a dataset which I am extracting and applying a specific schema to before writing out as a json.

My test dataset looks like:

cityID|retailer|postcode

123|a1|1

123|s1|2

123|d1|3

124|a1|4

124|s1|5

124|d1|6

I want to group by city ID. I am then applying the below schema and putting it into a dataframe. I then want to write out the data as a json. My code is as follows:

Grouping by cityID

val rdd1 = cridf.rdd.map(x=>(x(0).toString, (x(1).toString, x(2).toString))).groupByKey() 

Mapping RDD to Row

val final1 = rdd1.map(x=>Row(x._1,x._2.toList))

Applying Schema

val schema2 = new StructType()
.add("cityID", StringType)
.add("reads", ArrayType(new StructType()
.add("retailer", StringType)
.add("postcode", IntegerType)))

Creating data frame

val parsedDF2 = spark.createDataFrame(final1, schema2)

Writing to json file

parsedDF2.write.mode("overwrite")
.format("json")
.option("header", "false")
.save("/XXXX/json/testdata")

The job aborts due to the following error:

java.lang.RuntimeException: Error while encoding:

java.lang.RuntimeException: scala.Tuple2 is not a valid external type for schema of struct

2
@JānisŠ. in final1 x._2 is a list of retailer and postcode - Elena.G
Yes, I overlooked that. - Jānis Š.

2 Answers

2
votes

You can transform this directly from your dataframe. Here you go:

   val rawData = spark.read.option("header", "true").option("delimiter", "|").csv("57407427.csv")

   import org.apache.spark.sql.functions._
   val readsDf = rawData.withColumn("reads",struct("retailer", "postcode")).drop("retailer", "postcode" )

   val finalJsonDf = readsDf.groupBy("cityID").agg(collect_list("reads").alias("reads"))

   finalJsonDf.printSchema() //for testing the schema

   finalJsonDf.coalesce(1).write.mode("overwrite")
     .format("json")
     .option("header", "false")
     .save("57407427_Op.json")

And Hopefully you are also trying to write out the same json output:

 {"cityID":"124","reads":[{"retailer":"a1","postcode":"4"},{"retailer":"s1","postcode":"5"},{"retailer":"d1","postcode":"6"}]}
 {"cityID":"123","reads":[{"retailer":"a1","postcode":"1"},{"retailer":"s1","postcode":"2"},{"retailer":"d1","postcode":"3"}]}
0
votes

If you cannot avoid using RDDs, you could use case classes:

case class Read(retailer: String, postcode: Int)
case class Record(cityId: String, reads: List[Read])

...

val rdd1 = cridf.rdd
    .map(x => (x.head, Read(x(1), x(2).toInt)))
    .groupByKey

val final1 = rdd1
    .map(x => Record(x._1, x._2.toList))
    .toDF

final1
   .write
   .mode("overwrite")
   .format("json")
   .option("header", "false")
   .save("/XXXX/json/testdata")

final1 has the following schema:

root
 |-- cityId: string (nullable = true)
 |-- reads: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- retailer: string (nullable = true)
 |    |    |-- postcode: integer (nullable = false)

However, I think that @partha_devArch solution is much better.

Update

With minimal additions to your code and using the provided schema, the solution would be as follows:

import org.apache.spark.sql.catalyst.encoders.RowEncoder

...

val rdd1 = cridf.rdd
    .map(x => (x.head, Row(x(1), x(2).toInt)))
    .groupByKey

val final1 = rdd1
    .map(x => Row(x._1, x._2.toList))(RowEncoder.apply(schema2).clsTag)

val parsedDF2 = spark.createDataFrame(final1, schema2)

parsedDF2
    .write
    .mode("overwrite")
    .format("json")
    .option("header", "false")
    .save("/XXXX/json/testdata")