8
votes

I'm writing a test application that consumes messages from Kafka's topcis and then push data into S3 and into RDBMS tables (flow is similar to presented here: https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html). So I read data from Kafka and then:

  • each message want to save into S3
  • some messages save to table A in an external database (based on filter condition)
  • some other messages save to table B in an external database (other filter condition)

So I have sth like:

Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2,topic3")
.option("startingOffsets", "earliest")
.load()
.select(from_json(col("value").cast("string"), schema, jsonOptions).alias("parsed_value"))

(please notice that I'm reading from more than one Kafka topic). Next I define required datasets:

Dataset<Row> allMessages = df.select(.....)
Dataset<Row> messagesOfType1 = df.select() //some unique conditions applied on JSON elements
Dataset<Row> messagesOfType2 = df.select() //some other unique conditions

and now for each Dataset I create query to start processing:

StreamingQuery s3Query = allMessages
.writeStream()
.format("parquet")
.option("startingOffsets", "latest")
.option("path", "s3_location")
.start()

StreamingQuery firstQuery = messagesOfType1
.writeStream()
.foreach(new CustomForEachWiriterType1()) // class that extends ForeachWriter[T] and save data into external RDBMS table
.start();

StreamingQuery secondQuery = messagesOfType2
.writeStream()
.foreach(new CustomForEachWiriterType2()) // class that extends ForeachWriter[T] and save data into external RDBMS table (may be even another database than before)
.start();

Now I'm wondering:

Will be those queries executed in parallel (or one after another in FIFO order and I should assign those queries to separate scheduler pools)?

1

1 Answers

9
votes

Will be those queries executed in parallel

Yes. These queries are going to be executed in parallel (every trigger which you did not specify and hence is to run them as fast as possible).


Internally, when you execute start on a DataStreamWriter, you create a StreamExecution that in turn creates immediately so-called daemon microBatchThread (quoted from the Spark source code below):

  val microBatchThread =
    new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
      override def run(): Unit = {
        // To fix call site like "run at <unknown>:0", we bridge the call site from the caller
        // thread to this micro batch thread
        sparkSession.sparkContext.setCallSite(callSite)
        runBatches()
      }
    }

You can see every query in its own thread with name:

stream execution thread for [prettyIdString]

You can check the separate threads using jstack or jconsole.