7
votes

I am trying to write a Spark Structured Streaming job that reads from multiple Kafka topics (potentially 100s) and writes the results to different locations on S3 depending on the topic name. I've developed this snippet of code that currently reads from multiple topics and outputs the results to the console (based on a loop) and it works as expected. However, I would like to understand what the performance implications are. Would this be the recommended approach? Is it not recommended to have multiple readStream and writeStream operations? If so, what is the recommended approach?

my_topics = ["topic_1", "topic_2"]

for i in my_topics:
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", i) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", "s3://<MY_BUCKET>/{}".format(i)) \
        .start()
2
why do you want different checkpointLocation location for each topic, you can use one for all topics ? - Srinivas
Kafka Connect is generally a better approach for Kafka -> S3. I can provide an answer based on that if it would be useful. - Robin Moffatt
@Srinivas In the event where I need to restart/reset a specific topic by clearing the checkpoint location, would it not be better to have separate checkpoint locations to avoid the possibility of coupling/causing issues with the checkpoints for other topics? - Brandon
@RobinMoffatt I have explored the option of using Kafka Connect, however, I would like to use Spark Structured Streaming to expand on the number of sinks down the line. - Brandon
(Kafka Connect can handle regex topic list, if that's your concern.) - Robin Moffatt

2 Answers

4
votes

It's certainly reasonable to run a number # of concurrent streams per driver node.

Each .start() consumes a certain amount of driver resources in spark. Your limiting factor will be the load on the driver node and its available resources. 100's of topics running continuously at high rate would need to be spread across multiple driver nodes [In Databricks there is one driver per cluster]. The advantage of Spark is as you mention, multiple sinks and also a unified batch & streaming apis for transformations.

The other issue will be dealing with the small writes you may end up making to S3 and file consistency. Take a look at delta.io to handle consistent & reliable writes to S3.

3
votes

Advantages of below approach.

  1. Generic
  2. Multiple Threads, All threads will work individual.
  3. Easy to maintain code & support for any issues.
  4. If one topic is failed, No impact on other topics in production. You just have to focus on failed one.
  5. If you want to pull all data for specific topic, You just have to stop job for that topic, update or change the config & restart same job.

Note - Below code is not complete generic, You may need to change or tune below code.

topic="" // Get value from input arguments
sink="" // Get value from input arguments

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", topic) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", sink) \
        .start()        

Problems with below approach.

  1. If one topic is failed, It will terminate complete program.
  2. Limited Threads.
  3. Difficult to maintain code, debug & support for any issues.
  4. If you want to pull all data for specific topic from kafka, It's not possible as any config change will apply for all topics, hence its too costliest operation.
my_topics = ["topic_1", "topic_2"]

for i in my_topics:
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", i) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", "s3://<MY_BUCKET>/{}".format(i)) \
        .start()