0
votes

I have created one sample Direct Kafka Stream in Spark. Kafka has 30 partitions of given topic. But all consumers are executing from same executor machine.

Kafka Manager screenshot.

enter image description here

As per my understanding in direct Kafka Stream, Driver gives the offsets to executors and polls with this.

Spark Version: 2.4

Sample code below:



import com.google.common.collect.ImmutableList;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;

import java.util.Arrays;
import java.util.HashMap;


public class Main {


    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setAppName("StreamingTest");

        conf.set("spark.shuffle.service.enabled", "true");
        conf.set("spark.streaming.kafka.maxRatePerPartition", "100");
        conf.set("spark.streaming.backpressure.enabled", "true");
        conf.set("spark.streaming.concurrentJobs", "1");
        conf.set("spark.executor.extraJavaOptions", "-XX:+UseConcMarkSweepGC");
        conf.set("spark.streaming.backpressure.pid.minRate", "1500");


        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(5));



        JavaInputDStream<ConsumerRecord<Object, Object>> kafkaStream1 = createKafkaStream(ssc, "test-topic-1");

        kafkaStream1.foreachRDD(rdd -> rdd.foreachPartition(p -> p.forEachRemaining(e -> {
            System.out.println("Processing test-topic-1");
            try {
                Thread.sleep(2);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
        })));

        kafkaStream1.foreachRDD(rdd -> {
            OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
            final OffsetRange[] beginOffsets = Arrays.stream(offsetRanges).map(o -> OffsetRange.create(o.topicPartition(), 0, o.fromOffset())).toArray(OffsetRange[]::new);
            rdd.foreachPartition(partition -> {
                OffsetRange o = beginOffsets[TaskContext.get().partitionId()];

            });
            ((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(beginOffsets);
        });



        ssc.start();
        ssc.awaitTermination();
    }

    public static JavaInputDStream<ConsumerRecord<Object, Object>> createKafkaStream(JavaStreamingContext ssc, String topic) {
        HashMap<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-ids>");
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, topic+"hrishi-testing-nfr-7");
        kafkaParams.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 80000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
        kafkaParams.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10000000);
        kafkaParams.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 5000);
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

        return KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(ImmutableList.of(topic), kafkaParams));
    }
}


1
Dstream api is deprecated. Had you tried structured streaming? - OneCricketeer
Yes I know. Dstream is deprecated but my use case is very limited to map partitions only, so this is not a problem and can't afford migration at present. I doubt its because of Dstream. - Hrishikesh Mishra
You can map a dataframe as well. Anyway, please show full code, and this property looks suspicious spark.streaming.concurrentJobs - OneCricketeer
Added complete sample code @cricket_007 - Hrishikesh Mishra
Looks like your code doesn't really do anything accept commit offsets. Have you tried just using a very basic property set? What are you trying to optimize for? - OneCricketeer

1 Answers

0
votes

I found the issue it is happening because I was committing offset from driver. This the code.

((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(offsetRanges);

   kafkaStream1.foreachRDD(rdd -> {
        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
        rdd.foreachPartition(partition -> {
            partition.forEachRemaining(e -> {
                try {
                    System.out.println("hrishi mess" + e);
                    Thread.sleep(2);
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            });

        });
        ((CanCommitOffsets) kafkaStream1.inputDStream()).commitAsync(offsetRanges);
    });

Next, I enabled debug log on Executors and found that KafkaRDD was polling from Kafka it is clearly visible in the log.