0
votes

I want to create a Spark dataframe by reading a Seq using Scala. The datatypes of the seq are String, Dataframe, Long and Date type.

I tried to apply the below approach but getting some error, may be it is not the right way to deal with issue.

val Total_Record_Count = TotalRecordDF.count // geting count total number by reading a dataframe
val Rejected_Record_Count = rejectDF.count // geting count total number by reading a dataframe
val Batch_Run_ID = spark.range(1).select(unix_timestamp as "current_timestamp") 
case class JobRunDetails(Job_Name: String, Batch_Run_ID: DataFrame, Source_Entity_Name: String, Total_Record_Count: Long, Rejected_Record_Count: Long, Reject_Record_File_Path: String,Load_Date: String)
val inputSeq = Seq(JobRunDetails("HIT", Batch_Run_ID, "HIT", Total_Record_Count, Rejected_Record_Count, "blob.core.windows.net/feedlayer", Load_Date))

I tried val df = sc.parallelize(inputSeq).toDF() but it is throwing error "java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.DataFrame"

I just want to create a dataframe by reading the sequence. Any help will be highly appreciated. Note:- I am using the Databricks Spark 2.3 version.

1

1 Answers

0
votes

Normally We create case classes with Java/Scala primitive types.. Haven't seen someone creating case class with DataFrame as one of the member element..

If I got your requirement correctly.. Here's what you're looking for -

case class JobRunDetails(Job_Name: String, Batch_Run_ID: Int, Source_Entity_Name: String, Total_Record_Count: Long, Rejected_Record_Count: Long, Reject_Record_File_Path: String, Load_Date: String)
//defined class JobRunDetails

import spark.implicits._
    val Total_Record_Count = 100 //TotalRecordDF.count // geting count total number by reading a dataframe
    val Rejected_Record_Count = 200 //rejectDF.count // geting count total number by reading a dataframe
    val Batch_Run_ID = spark.range(1).select(unix_timestamp as "current_timestamp").take(1).head.get(0).toString().toInt
    val Load_Date = "2019-27-07"    
    val inputRDD: RDD[JobRunDetails] = spark.sparkContext.parallelize(Seq(JobRunDetails("HIT", Batch_Run_ID, "HIT", Total_Record_Count, Rejected_Record_Count, "blob.core.windows.net/feedlayer", Load_Date)))
inputRDD.toDF().show

/**
import spark.implicits._
Total_Record_Count: Int = 100
Rejected_Record_Count: Int = 200
Batch_Run_ID: Int = 1564224156
Load_Date: String = 2019-27-07
inputRDD: org.apache.spark.rdd.RDD[JobRunDetails] = ParallelCollectionRDD[3] at parallelize at command-330223868839989:6
*/

+--------+------------+------------------+------------------+---------------------+-----------------------+----------+
|Job_Name|Batch_Run_ID|Source_Entity_Name|Total_Record_Count|Rejected_Record_Count|Reject_Record_File_Path| Load_Date|
+--------+------------+------------------+------------------+---------------------+-----------------------+----------+
|     HIT|  1564224156|               HIT|               100|                  200|   blob.core.windows...|2019-27-07|
+--------+------------+------------------+------------------+---------------------+-----------------------+----------+