The correlation is "1:n(executor:partitions)": a Kafka partition can only be consumed by one executor, one executor can consume multiple Kafka partitions.
This is consistent with Spark Streaming.
For Structured Streaming, the default model is "micro-batch processing model", the "Continuous Processing model" is still in "Experimental" state.
For the "micro-batch processing model", in "KafkaSource.scala", there is
* - The DF returned is based on [[KafkaSourceRDD]] which is constructed such that the
* data from Kafka topic + partition is consistently read by the same executors across
* batches, and cached KafkaConsumers in the executors can be reused efficiently. See the
* docs on [[KafkaSourceRDD]] for more details.
In "KafkaSourceRDD"
/**
* An RDD that reads data from Kafka based on offset ranges across multiple partitions.
* Additionally, it allows preferred locations to be set for each topic + partition, so that
* the [[KafkaSource]] can ensure the same executor always reads the same topic + partition
* and cached KafkaConsumers (see [[KafkaDataConsumer]] can be used read data efficiently.
*
* ...
*/
private[kafka010] class KafkaSourceRDD(
And we know the default location policy is LocationStrategies.PreferConsistent.
For the "Continuous Processing model", in "KafkaContinuousReader.scala"
override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
...
startOffsets.toSeq.map {
case (topicPartition, start) =>
KafkaContinuousDataReaderFactory(
topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
.asInstanceOf[DataReaderFactory[UnsafeRow]]
}.asJava
}
/**
* A data reader factory for continuous Kafka processing. This will be serialized and transformed
* into a full reader on executors.
*
* @param topicPartition The (topic, partition) pair this task is responsible for.
* ...
*/
case class KafkaContinuousDataReaderFactory(
topicPartition: TopicPartition,
startOffset: Long,
kafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
override def createDataReader(): KafkaContinuousDataReader = {
new KafkaContinuousDataReader(
topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
}
}
We can know each (topic, partition) will be contained in one factory, and then will be in one executor.