1
votes

We are building a streaming platform where it is essential to work with SQL's in batches.

val query = streamingDataSet.writeStream.option("checkpointLocation", checkPointLocation).foreachBatch { (df, batchId) => {

      df.createOrReplaceTempView("events")

      val df1 = ExecutionContext.getSparkSession.sql("select * from events")

      df1.limit(5).show()
      // More complex processing on dataframes

    }}.trigger(trigger).outputMode(outputMode).start()

query.awaitTermination()

Error thrown is :

org.apache.spark.sql.streaming.StreamingQueryException: Table or view not found: events
Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'events' not found in database 'default';

Streaming source is Kafka with watermarking and without using Spark-SQL we are able to execute dataframe transformations. Spark version is 2.4.0 and Scala is 2.11.7. Trigger is ProcessingTime every 1 minute and OutputMode is Append.

Is there any other approach to facilitate use of spark-sql within foreachBatch ? Would it work with upgraded version of Spark - in which case to version do we upgrade ? Kindly help. Thank you.

1

1 Answers

3
votes

tl;dr Replace ExecutionContext.getSparkSession with df.sparkSession.


The reason of the StreamingQueryException is that the streaming query tries to access the events temporary table in a SparkSession that knows nothing about it, i.e. ExecutionContext.getSparkSession.

The only SparkSession that has this events temporary table registered is exactly the SparkSession the df dataframe is created within, i.e. df.sparkSession.