19
votes


I am trying to use Spark Kafka Direct Stream approach. It simplifies parallelism by creating as many RDD partitions as kafka topic partition, as stated in this doc. And base on my understanding, spark will create one executor for each RDD partition to do the computation.

So when I submit the application in yarn-cluster mode, and specify option num-executors to a different value to the number of partitions, how many executors there will be?

For example, there is a kafka topic with 2 partition, and I specify num-executors to 4:

export YARN_CONF_DIR=$HADOOP_HOME/client_conf

./bin/spark-submit \
--class playground.MainClass \
--master yarn-cluster \
--num-executors 4 \
../spark_applications/uber-spark-streaming-0.0.1-SNAPSHOT.jar \
127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095 topic_1

I do give it a try and find out the number of executors is 4, and each executor do reads and processes data from kafka. Why? There is only 2 partitions in kafka topic, How does 4 executors read from the kafka topic, which only has 2 partitions?

Below is the details of the spark application and logs.

My spark application, which prints received messages ( in flatMap method) from kafka in every executor:

    ...
    String brokers = args[0];
    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(args[1].split(",")));
    kafkaParams.put("metadata.broker.list", brokers);

    JavaPairInputDStream<String, String> messages =
        KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
            kafkaParams, topicsSet);

    JavaPairDStream<String, Integer> wordCounts =
        messages.flatMap(new FlatMapFunction<Tuple2<String, String>, String>()
        {
            public Iterable<String> call(Tuple2<String, String> tuple) throws Exception
            {
                System.out.println(String.format("[received from kafka] tuple_1 is %s, tuple_2 is %s", tuple._1(),
                    tuple._2())); // print the kafka message received  in executor
                return Arrays.asList(SPACE.split(tuple._2()));
            }

        }).mapToPair(new PairFunction<String, String, Integer>()
        {
            public Tuple2<String, Integer> call(String word) throws Exception
            {
                System.out.println(String.format("[word]: %s", word));
                return new Tuple2<String, Integer>(word, 1);
            }

        }).reduceByKey(new Function2<Integer, Integer, Integer>()
        {
            public Integer call(Integer v1, Integer v2) throws Exception
            {
                return v1 + v2;
            }

        });

    wordCounts.print();

    Runtime.getRuntime().addShutdownHook(new Thread(){
        @Override
        public void run(){
            System.out.println("gracefully shutdown Spark!");
            jssc.stop(true, true);
        }
    });
    jssc.start();
    jssc.awaitTermination();

My Kafka topic, with 2 partitions. String "hello hello word 1", "hello hello word 2", "hello hello word 3", ... are sent to the topic.

Topic: topic_2  PartitionCount:2    ReplicationFactor:2 Configs:
Topic: topic_2  Partition: 0    Leader: 3   Replicas: 3,1   Isr: 3,1
Topic: topic_2  Partition: 1    Leader: 1   Replicas: 1,2   Isr: 1,2

Webconsle: enter image description here

console output of executor 1:

...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 12
[word]: hello
[word]: hello
[word]: world
[word]: 12
...

console output of executor 2:

...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 2
[word]: hello
[word]: hello
[word]: world
[word]: 2
...

console output of executor 3:

...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 3
[word]: hello
[word]: hello
[word]: world
[word]: 3
...
1
I print the number of partitions in each RDD. It has the same value as the partition numbers of kafka topic, which is 2 in my case. How can 3 executors parallel process a series RDDs, which have two partition in total? Based on the console output of each executor, all executors do process data from RDD.yzandrew
As DStream is a series of RDDs, maybe for some time windows, the RDDs are processed in 2 of the 3 executors. And in another time windows, the RDDs are processed in another 2 of the 3 executors? Am I right?yzandrew

1 Answers

5
votes

Each partition is operated on by one executor at a time (assuming you don't have speculative execution turned on).

If you have more executors than you do partitions, not all of them will be doing work on any given RDD. But as you noted, since a DStream is a sequence of RDDs, over time each executor will do some work.