4
votes

I have deployed a structured stream with 4 workers over a Kafka topic with 4 partitions.

I was assuming that there will be 4 workers deployed for 4 partitions, with a one to one mapping between worker<->partition.

But, thats not the case. All partitions are being served by a same Executor. I confirmed this by checking the thread-id and logs over the executor.

Is there any document which shows the correlation between Kafka partitions and Spark Structured Streams. Also, are there any knobs that we can tweak around.

2

2 Answers

2
votes

The correlation is "1:n(executor:partitions)": a Kafka partition can only be consumed by one executor, one executor can consume multiple Kafka partitions.

This is consistent with Spark Streaming.


For Structured Streaming, the default model is "micro-batch processing model", the "Continuous Processing model" is still in "Experimental" state.

For the "micro-batch processing model", in "KafkaSource.scala", there is

 *   - The DF returned is based on [[KafkaSourceRDD]] which is constructed such that the
 *     data from Kafka topic + partition is consistently read by the same executors across
 *     batches, and cached KafkaConsumers in the executors can be reused efficiently. See the
 *     docs on [[KafkaSourceRDD]] for more details.

In "KafkaSourceRDD"

/**
 * An RDD that reads data from Kafka based on offset ranges across multiple partitions.
 * Additionally, it allows preferred locations to be set for each topic + partition, so that
 * the [[KafkaSource]] can ensure the same executor always reads the same topic + partition
 * and cached KafkaConsumers (see [[KafkaDataConsumer]] can be used read data efficiently.
 *
 * ...
 */
private[kafka010] class KafkaSourceRDD(

And we know the default location policy is LocationStrategies.PreferConsistent.


For the "Continuous Processing model", in "KafkaContinuousReader.scala"

  override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
    ...
    startOffsets.toSeq.map {
      case (topicPartition, start) =>
        KafkaContinuousDataReaderFactory(
          topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
          .asInstanceOf[DataReaderFactory[UnsafeRow]]
    }.asJava
  }

/**
 * A data reader factory for continuous Kafka processing. This will be serialized and transformed
 * into a full reader on executors.
 *
 * @param topicPartition The (topic, partition) pair this task is responsible for.
 * ...
 */
case class KafkaContinuousDataReaderFactory(
    topicPartition: TopicPartition,
    startOffset: Long,
    kafkaParams: ju.Map[String, Object],
    pollTimeoutMs: Long,
    failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
  override def createDataReader(): KafkaContinuousDataReader = {
    new KafkaContinuousDataReader(
      topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
  }
}

We can know each (topic, partition) will be contained in one factory, and then will be in one executor.

0
votes

If you are using DirectStream API then the correlation is 1:1(sparkcore:partition). From spark streaming guide,

The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata