2
votes

I have a following data(alldata) which has SQL query and viewname name.

Select_Query|viewname
select v1,v2 from conditions|cond
select w1,w2 from locations|loca

I have split and properly assigned it to the temptable(alldata)

val Select_Querydf = spark.sql("select Select_Query,ViewName from alldata")

while I try to execute the query and register a tempview or table out of it, its showing nullpointer error. But the PRINTLN shows all the values in the table right when I comment out the spark.sql stmt.

 Select_Querydf.foreach{row => 
          val Selectstmt = row(0).toString()
          val viewname = row(1).toString()
          println(Selectstmt+"-->"+viewname)
      spark.sql(Selectstmt).registerTempTable(viewname)//.createOrReplaceTempView(viewname)
       }
output:
select v1,v2 from conditions-->cond
select w1,w2 from locations-->loca

But while I execute it with spark.sql, it shows the following error, Please help where I am going wrong.

19/12/09 02:43:12 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4) java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:128) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623) at sparkscalacode1.SQLQueryexecutewithheader$$anonfun$main$1.apply(SQLQueryexecutewithheader.scala:36) at sparkscalacode1.SQLQueryexecutewithheader$$anonfun$main$1.apply(SQLQueryexecutewithheader.scala:32) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) 19/12/09 02:43:12 ERROR TaskSetManager: Task 0 in stage 4.0 failed 1 times; aborting job Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 4, localhost, executor driver): java.lang.NullPointerException at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:128) at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:623) at sparkscalacode1.SQLQueryexecutewithheader$$anonfun$main$1.apply(SQLQueryexecutewithheader.scala:36) at sparkscalacode1.SQLQueryexecutewithheader$$anonfun$main$1.apply(SQLQueryexecutewithheader.scala:32) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)

1
You simply can't do that since you can't execute driver's code inside an executor. The executor doesn't know anything about spark session and/or context therefore you get the exception. The only place you can use the code like spark.sql(Selectstmt).registerTempTable(viewname)//.createOrReplaceTempView(viewname) is the driverabiratsis
Thanks Alexandros. But could you suggest me an approach for this. I have a dataframe with 5 rows of 5 different sql queries. If I have to execute it one after the other, how to do it? But I hope its in the driver code. Let me paste the complete code.Lokesh

1 Answers

3
votes

Here the spark.sql which is SparkSession cannot be used in foreach of Dataframe. Sparksession is created in Driver and foreach is executed in worker and not serialized.

I hope the you have a small list for Select_Querydf, if so you can collect as a list and use it as below.

Select_Querydf.collect().foreach { row =>
  val Selectstmt = row.getString(0)
  val viewname = row.getString(1)
  println(Selectstmt + "-->" + viewname)
  spark.sql(Selectstmt).createOrReplaceTempView(viewname)
}

Hope this helps!