I am using spark structured streaming to read data from kafka.
val readStreamDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", config.getString("kafka.source.brokerList"))
.option("startingOffsets", config.getString("kafka.source.startingOffsets"))
.option("subscribe", config.getString("kafka.source.topic"))
.load()
Based on an uid
in the message read from kafka, I have to make an api call to an external source and fetch data and write back to another kafka topic.
For this I am using a custom foreach
writer and processing every message.
import spark.implicits._
val eventData = readStreamDF
.select(from_json(col("value").cast("string"), event).alias("message"), col("timestamp"))
.withColumn("uid", col("message.eventPayload.uid"))
.drop("message")
val q = eventData
.writeStream
.format("console")
.foreach(new CustomForEachWriter())
.start()
The CustomForEachWriter
makes an API call and fetch results against the given uid
from a service. The result is an array of ids. These ids are then again written back to another kafka topic via a kafka producer.
There are 30 kafka partition and I have launched spark with following config
num-executors = 30
executors-cores = 3
executor-memory = 10GB
But still the spark job starts lagging and is not able to keep up with the incoming data rate.
Incoming data rate is around 10K messages per sec. The avg time to process a single msg in 100ms.
I want to understand how spark process this in case of structured streaming. In case of structured streaming there is one dedicated executor which is responsible for reading data from all partitions of kafka. Does that executor distributes tasks based on no. of partitions in kafka. The data in a batch get processed sequentially. How can that be made to process parallel so as to maximize the throughput.