I am trying to create a dataframe from RDD in order to be able to write to a json with following format A sample json is as shown below(expected output)
"1234":[ { loc:'abc', cost1:1.234, cost2:2.3445 }, { loc:'www', cost1:1.534, cost2:6.3445 } ]
I am able to generate the json with cost1 and cost2 in String format. But I want cost1 and cost2 to be double. I am getting error while creating data frame from rdd using schema defined. Somehow the data is being considered as String instead of double. Can someone help me to get this right? Below is my scala code of my sample implementation
object csv2json {
def f[T](v: T) = v match {
case _: Int => "Int"
case _: String => "String"
case _: Float => "Float"
case _: Double => "Double"
case _:BigDecimal => "BigDecimal"
case _ => "Unknown"
}
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val input_df = Seq(("12345", "111","1.34","2.34"),("123456", "112","1.343","2.344"),("1234", "113","1.353","2.354"),("1231", "114","5.343","6.344")).toDF("item_id","loc","cost1","cost2")
input_df.show()
val inputRDD = input_df.rdd.map(data => {
val nodeObj = scala.collection.immutable.Map("nodeId" -> data(1).toString()
,"soc" -> data(2).toString().toDouble
,"mdc" -> data(3).toString().toDouble)
(data(0).toString(),nodeObj)
})
val inputRDDAgg = inputRDD.aggregateByKey(scala.collection.mutable.ListBuffer.empty[Any])((nodeAAggreg,costValue) => nodeAAggreg += costValue , (nodeAAggreg,costValue) => nodeAAggreg ++ costValue)
val inputRDDAggRow = inputRDDAgg.map(data => {
println(data._1 + "and------ " + f(data._1))
println(data._2 + "and------ " + f(data._2))
val skuObj = Row(
data._1,
data._2)
skuObj
}
)
val innerSchema = ArrayType(MapType(StringType, DoubleType, true))
val schema:StructType = StructType(Seq(StructField(name="skuId", dataType=StringType),StructField(name="nodes", innerSchema)))
val finalJsonDF = spark.createDataFrame(inputRDDAggRow, schema)
finalJsonDF.show()
}
}
Below is the exception stacktrace:
java.lang.RuntimeException: Error while encoding: java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, skuId), StringType), true, false) AS skuId#32
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), if (isnull(validateexternaltype(lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull0, ObjectType(class java.lang.Object), true), MapType(StringType,DoubleType,true)))) null else newInstance(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData), validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, nodes), ArrayType(MapType(StringType,DoubleType,true),true)), None) AS nodes#33
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:589)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)