0
votes

I am using storm 1.0.1 and Kafka 0.10.0.0 with storm-kafka-client 1.0.3.

please find the code config I have below.

kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.ByteArrayDeserializer");


            KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreamsNamedTopics.Builder(new Fields(fieldNames), topics)
                    .build();

            KafkaSpoutRetryService retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
                                TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));


            KafkaSpoutTuplesBuilder tuplesBuilder = new KafkaSpoutTuplesBuilderNamedTopics.Builder(new TestTupleBuilder(topics))
                        .build();

            KafkaSpoutConfig kafkaSpoutConfig = new KafkaSpoutConfig.Builder<String, String>(kafkaConsumerProps, kafkaSpoutStreams, tuplesBuilder, retryService)
                                                                .setOffsetCommitPeriodMs(10_000)
                                                                .setFirstPollOffsetStrategy(LATEST)
                                                                .setMaxRetries(5)
                                                                .setMaxUncommittedOffsets(250)
                                                                .build();

When I fail the tuple its not getting replayed. Spout throws below error. Please let me know why it's throwing nullpointer exception.

53501 [Thread-359-test-spout-executor[295 295]] ERROR o.a.s.util - Async loop died!
java.lang.NullPointerException
    at org.apache.storm.kafka.spout.KafkaSpout.doSeekRetriableTopicPartitions(KafkaSpout.java:260) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:248) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:203) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:645) ~[storm-core-1.0.1.jar:1.0.1]
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) [storm-core-1.0.1.jar:1.0.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.8.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102]
53501 [Thread-359-test-spout-executor[295 295]] ERROR o.a.s.d.executor - 
java.lang.NullPointerException
    at org.apache.storm.kafka.spout.KafkaSpout.doSeekRetriableTopicPartitions(KafkaSpout.java:260) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:248) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:203) ~[storm-kafka-client-1.0.3.jar:1.0.3]
    at org.apache.storm.daemon.executor$fn__7885$fn__7900$fn__7931.invoke(executor.clj:645) ~[storm-core-1.0.1.jar:1.0.1]
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:484) [storm-core-1.0.1.jar:1.0.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.8.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102]
53527 [Thread-359-test-spout-executor[295 295]] ERROR o.a.s.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
    at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.0.1.jar:1.0.1]
    at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.8.0.jar:?]
    at org.apache.storm.daemon.worker$fn__8554$fn__8555.invoke(worker.clj:761) [storm-core-1.0.1.jar:1.0.1]
    at org.apache.storm.daemon.executor$mk_executor_data$fn__7773$fn__7774.invoke(executor.clj:271) [storm-core-1.0.1.jar:1.0.1]
    at org.apache.storm.util$async_loop$fn__625.invoke(util.clj:494) [storm-core-1.0.1.jar:1.0.1]
    at clojure.lang.AFn.run(AFn.java:22) [clojure-1.8.0.jar:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.8.0_102]

Please find the complete spout configs below {key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer, group.id=test-group, ssl.keystore.location=C:/test.jks, bootstrap.servers=localhost:1000, auto.commit.interval.ms=1000, security.protocol=SSL, enable.auto.commit=true, ssl.truststore.location=C:/test1.jks, ssl.keystore.password=pass123, ssl.key.password=pass123, ssl.truststore.password=pass123, session.timeout.ms=30000, auto.offset.reset=latest}

2

2 Answers

0
votes

Storm 1.0.1 consists of storm-kafka-client in beta quality. We have fixed few issues and more stable version is available in Storm 1.1 release and can be used against Kafka 0.10 onwards. In your topology you can make dependency against storm-kafka-client version 1.1 and kafka-clients dependency with appropriate version. You don't need to upgrade storm cluster itself.

0
votes

I had enable.auto.commit=true making the value as false resolved the issue for me.