0
votes

I have a huge dataframe with 20 Million records.I need to iterate the dataframe df1 and read each row one by one and construct two other dataframes df2 and df3 as output based on the column values of df1.

Input - df1 has 20 columns and 20 million records. Output -df2 has 4 columns and will create 20 million records based on values of columns in df1. Output - df3 has 20 columns and will create 500-800 million records based on values of columns in df1.

Current approach-

 //EMI Transaction Schema
 val emiDFSchema = StructType(
 StructField("AS_OF_DATE",StringType, false)::
 StructField("EST_END_DT",StringType, false)::
 StructField("EFF_DT",StringType, false)::
 StructField("NET_PRCPL_AMT",DecimalType, false)::
 ...
 ...
Nil)

 val emiDFSchema1 = StructType(
 StructField("AS_OF_DATE",StringType, false)::
 StructField("EST_END_DT",StringType, false)::
 StructField("EFF_DT",StringType, false)::
 StructField("NET_PRCPL_AMT",DecimalType, false)::
 ...
 ...
Nil)

val rwList = new ListBuffer[Row]()
val rwList1 = new ListBuffer[Row]()

df1.collect().map{row=>
       ..... 
        //calculation of attributes based on columns of df1
       ......
        //creating Row object for df2 with all calculated attributes
        val row = Row(attr1,attr2,....attr20)
        rwList+=(row)

        for(i<-1 to n){
          ...
          //calculation of attributes based on columns of df1
          ...
          //creating Row object for df3 with all calculated attributes
          val row1 = Row(attr1,attr2,....attr20)
          rwList1+=(row1)
         }
      }

     val emiDF1 = spark.createDataFrame(spark.sparkContext.parallelize(rwList),emiSchema)
     val emiDF2 = spark.createDataFrame(spark.sparkContext.parallelize(rwList1),emiSchema1)

Since df1 is huge, doing collect().map on it is taking huge amount of time. Can you please suggest an alternative way to iterate df1 efficiently in reduced amount of time ?

---Spark v2.4 ---Scala

1
20M is not huge afaik - thebluephantom

1 Answers