I am trying to use Spark Kafka Direct Stream approach. It simplifies parallelism by creating as many RDD partitions as kafka topic partition, as stated in this doc. And base on my understanding, spark will create one executor for each RDD partition to do the computation.
So when I submit the application in yarn-cluster mode, and specify option num-executors to a different value to the number of partitions, how many executors there will be?
For example, there is a kafka topic with 2 partition, and I specify num-executors to 4:
export YARN_CONF_DIR=$HADOOP_HOME/client_conf
./bin/spark-submit \
--class playground.MainClass \
--master yarn-cluster \
--num-executors 4 \
../spark_applications/uber-spark-streaming-0.0.1-SNAPSHOT.jar \
127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095 topic_1
I do give it a try and find out the number of executors is 4, and each executor do reads and processes data from kafka. Why? There is only 2 partitions in kafka topic, How does 4 executors read from the kafka topic, which only has 2 partitions?
Below is the details of the spark application and logs.
My spark application, which prints received messages ( in flatMap method) from kafka in every executor:
...
String brokers = args[0];
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(args[1].split(",")));
kafkaParams.put("metadata.broker.list", brokers);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topicsSet);
JavaPairDStream<String, Integer> wordCounts =
messages.flatMap(new FlatMapFunction<Tuple2<String, String>, String>()
{
public Iterable<String> call(Tuple2<String, String> tuple) throws Exception
{
System.out.println(String.format("[received from kafka] tuple_1 is %s, tuple_2 is %s", tuple._1(),
tuple._2())); // print the kafka message received in executor
return Arrays.asList(SPACE.split(tuple._2()));
}
}).mapToPair(new PairFunction<String, String, Integer>()
{
public Tuple2<String, Integer> call(String word) throws Exception
{
System.out.println(String.format("[word]: %s", word));
return new Tuple2<String, Integer>(word, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>()
{
public Integer call(Integer v1, Integer v2) throws Exception
{
return v1 + v2;
}
});
wordCounts.print();
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run(){
System.out.println("gracefully shutdown Spark!");
jssc.stop(true, true);
}
});
jssc.start();
jssc.awaitTermination();
My Kafka topic, with 2 partitions. String "hello hello word 1", "hello hello word 2", "hello hello word 3", ... are sent to the topic.
Topic: topic_2 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: topic_2 Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: topic_2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Webconsle:
console output of executor 1:
...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 12
[word]: hello
[word]: hello
[word]: world
[word]: 12
...
console output of executor 2:
...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 2
[word]: hello
[word]: hello
[word]: world
[word]: 2
...
console output of executor 3:
...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 3
[word]: hello
[word]: hello
[word]: world
[word]: 3
...