
I'm using Spark Structured streaming, and processing messages from Kafka. At one point my result table looks something like below, where each line in the dataset has a Spark SQL query.

|code|          triggerSql|
|  US|SELECT * FROM def...|
|  UK|SELECT * FROM def...|

I need to execute each of these queries and process the results. However, structured streaming won't allow to collect these SQLs to driver side, and We can't open a new SparkSession inside any transformation.

val query = df3.writeStream.foreach(new ForeachWriter[Row] {
      override def open(partitionId: Long, epochId: Long): Boolean = {

      override def process(value: Row): Unit = {
        val triggerSqlString = value.getAs[String]("triggerSql")
        val code = value.getAs[String]("value")
        println("Code="+code+"; TriggerSQL="+triggerSqlString)



      override def close(errorOrNull: Throwable): Unit = {
       // println("===> Closing..")
    }).trigger(Trigger.ProcessingTime("5 seconds"))

Is there any better alternative way to dynamically execute these SQL in spark.


tl;dr Use DataStreamWriter.foreachBatch operation.

The following sample shows how one could achieve execution of SQL queries from a batch dataset:

def sqlExecution(ds: Dataset[String], batchId: Long): Unit = {
  ds.as[String].collect.foreach { s => sql(s).show }