2
votes

I would like to build a Spark streaming pipeline that reads from multiple Kafka Topics (that vary in number over time). I intended on stopping the the streaming job, adding/removing the new topics, and starting the job again whenever I required an update to the topics in the streaming job using one of the two options outlined in the Spark Structured Streaming + Kafka Integration Guide:

# Subscribe to multiple topics
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

Upon further investigation, I noticed the following point in the Spark Structured Streaming Programming Guide and am trying to understand why changing the number of input sources is "not allowed":

Changes in the number or type (i.e. different source) of input sources: This is not allowed.

Definition of "Not Allowed" (also from Spark Structured Streaming Programming Guide):

The term not allowed means you should not do the specified change as the restarted query is likely to fail with unpredictable errors. sdf represents a streaming DataFrame/Dataset generated with sparkSession.readStream.

My understanding is that Spark Structured Streaming implements its own checkpointing mechanism:

In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the quick example) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when starting a query.

Can someone please explain why changing the number of sources is "not allowed"? I assume that would be one of the benefits of the checkpointing mechanism.

2

2 Answers

0
votes

Steps to add new input source in existing running model streaming job

  1. Stop the current running Streaming in which model is running.
  2. hdfs dfs -get output/checkpoints/<model_name>offsets <local_directory>/offsets

There will be 3 files(since last 3 offsets are saved by spark) in the directory. sample format below for single file

v1

{ "batchWatermarkMs":0,"batchTimestampMs":1578463128395,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{ "logOffset":0}
{ "logOffset":0}
  • each {"logOffset":batchId} represents single input source.
  • To add new input source add "-" at the end of each file in the directory.

Sample updated file v1

{"batchWatermarkMs":0,"batchTimestampMs":1578463128395,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"logOffset":0}
{"logOffset":0}
  • If you want to add more than 1 input source then add "-" equal to number of new input source.
  • hdfs dfs -put -f <local_directory>/offsets output/checkpoints/<model_name>offsets
0
votes

The best way to do what you want it's running your readStreams in multiple thread. I'm doing this, reading 40 tables at same time. For do this I follow this article: https://cm.engineering/multiple-spark-streaming-jobs-in-a-single-emr-cluster-ca86c28d1411.

I will do a quick brief about what I do after read and mount my code structure with main function, executor, and a trait with my spark session who will be shared with all jobs .

1.Two Lists of the topics that I want to read.

So, in Scala I create two lists. The first list is the topics that always I want to read and the second list it's a Dynamic list where when I stop my job I can add some new topics.

  1. Pattern Matching to run the jobs.

I have two job different jobs, one that I run to the tables that always I run and Dynamic jobs that I run to specifc topics,in other words, If I want to add a new topic and create a new job to him, I add this job in pattern matching. In the bellow code, I want to run specfic job to the Cars and Ship tables and all another tables that I put in the specifc list will run the same replication table job

  var tables = specifcTables ++ dynamicTables

  tables.map(table => {
    table._1 match {
      case "CARS" => new CarsJob
      case "SHIPS" => new ShipsReplicationJob
      case _ => new ReplicationJob

After this I pass this pattern matching to a createjobs function that will instantiate each of these jobs and I pass this function to a startFutureTask function who will put each of these jobs in different threads

startFutureTasks(createJobs(tables))

I hope I've helped. Thanks !