I am trying to experiment on consumer groups
Here is my code snippet
public final class App {
private static final int INTERVAL = 5000;
public static void main(String[] args) throws Exception {
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "xxx:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", true);
kafkaParams.put("auto.commit.interval.ms","1000");
kafkaParams.put("security.protocol","SASL_PLAINTEXT");
kafkaParams.put("sasl.kerberos.service.name","kafka");
kafkaParams.put("retries","3");
kafkaParams.put(GROUP_ID_CONFIG,"mygroup");
kafkaParams.put("request.timeout.ms","210000");
kafkaParams.put("session.timeout.ms","180000");
kafkaParams.put("heartbeat.interval.ms","3000");
Collection<String> topics = Arrays.asList("venkat4");
SparkConf conf = new SparkConf();
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(INTERVAL));
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
return new Tuple2<>(record.key(), record.value());
}
}).print();
ssc.start();
ssc.awaitTermination();
}
}
When I run two of this spark streaming job concurrently it fails with error
Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition venkat4-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.needOffsetReset(SubscriptionState.java:315) at org.apache.kafka.clients.consumer.KafkaConsumer.seekToEnd(KafkaConsumer.java:1170) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:197) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:214) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) at scala.Option.orElse(Option.scala:289)
Per this https://www.wisdomjobs.com/e-university/apache-kafka-tutorial-1342/apache-kafka-consumer-group-example-19004.html creation of separate instance of kafka consumer with same group will create a rebalance of partitions. I believe the rebalance is not being tolerated by the consumer. How should I fix this
Below is the command used
SPARK_KAFKA_VERSION=0.10 spark2-submit --num-executors 2 --master yarn --deploy-mode client --files jaas.conf#jaas.conf,hive.keytab#hive.keytab --driver-java-options "-Djava.security.auth.login.config=./jaas.conf" --class Streaming.App --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --conf spark.streaming.kafka.consumer.cache.enabled=false 1-1.0-SNAPSHOT.jar

