0
votes
val query1 = spark.sql(s"select * from tmp_table where id IN (select id from tmp_table where date between '${fromDate}' and '$toDate' group by id having count(id) > 1)")

var finalTable = Seq[Template]()

query1.foreach { row =>
    //Query on finalRtnView and get latest record for that row

    //Update finalTable for that row

    import spark.implicits._
    finalTable = finalTable :+ finalTableTemplate

    //Convert to DF so that data is updated .
    val df = finalTable.toDF().createOrReplaceTempView("finalRtnView") //Throws Exception here
}

java.lang.NullPointerException at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:213) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

The same works if I use a for loop here instead of foreach. In case of for loop the processing a very slow.

How can I modify this code to run faster? I need the updated dataframe everytime in the for loop.

1

1 Answers

0
votes

i ran into the same issue, calling forEach with collect resolved the issue for me.

So

query1.collect().foreach