I am trying to use SSL for the Kafka-Spark integration. I have tested Kafka with SSL enabled and it works completely fine with sample consumer and producer.
Also, I have tried the integration of Spark - Kafka which also works without an issue when done without SSL in the spark-job.
Now, when I enable SSL in the spark-job I am getting an exception and the integration does not work.
The ONLY change I did to enable SSL in the spark-job is include the following lines of code in my job:
sparkConf.set("security.protocol", "SSL");
sparkConf.set("ssl.truststore.location", "PATH/truststore.jks");
sparkConf.set("ssl.truststore.password", "passwrd");
sparkConf.set("ssl.keystore.location", "PATH/keystore.jks");
sparkConf.set("ssl.keystore.password", "kstore");
sparkConf.set("ssl.key.password", "keypass");
And this sparkConf is passed when creating the streaming context.
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
And when I run the job the error I get is as follows:
17/05/24 18:16:39 WARN ConsumerFetcherManager$LeaderFinderThread: [test-consumer-group_bmj-cluster-1495664195784-5f49cbd0-leader-finder-thread], Failed to find leader for Set([bell,0])
java.lang.NullPointerException
at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
at kafka.cluster.Broker.connectionString(Broker.scala:62)
at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Kafka Version - 2.11-0.10.2.0
Spark Version - 2.1.0
Scala Version - 2.11.8
Streaming Libraries
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.0</version>
</dependency>
Any help with regards to overcoming this issue?