I am trying to integrate Kafka to Storm Toplogy using below code but unfortunately KafkaSpout is not consuming messages from Kafka-topic. At Storm UI-Core, Emitted count remains 0 forever.
String bootStrapServer = "10.20.10.238:9092";
String topic = "test.topic";
KafkaSpoutConfig.Builder spoutConfigBuilder = KafkaSpoutConfig.builder(bootStrapServer,topic);
spoutConfigBuilder.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG,100*1024*1024);
spoutConfigBuilder.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,100*1024*1024);
spoutConfigBuilder.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
Boolean readFromStart = true;
if(readFromStart) {
spoutConfigBuilder.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST);
}
else {
spoutConfigBuilder.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST);
}
KafkaSpout spout = new KafkaSpout(spoutConfigBuilder.build());
builder.setSpout("kafkaSpout", spout, 1);
// And a Bolt to see messages
builder.setBolt("fcBolt", new FcBolt(), 1).setNumTasks(1).shuffleGrouping("kafkaSpout");
But when I tried to see the produced messages from CLI , I am able to see all messages on topic with below command:
bin/kafka-console-consumer.sh --topic test.topic --from-beginning --bootstrap-server 10.20.10.238:9092
Picked up _JAVA_OPTIONS: -Xmx128000m
test
test
test1
....
Versions:
Storm : 2.2.0
Kafka : 2.13_2.6.0
At old versions,it works fine! Something I had missed to read at newer version.
Any help appreciated. Thanks in Advance!