0
votes

I am trying to use SSL for the Kafka-Spark integration. I have tested Kafka with SSL enabled and it works completely fine with sample consumer and producer.

Also, I have tried the integration of Spark - Kafka which also works without an issue when done without SSL in the spark-job.

Now, when I enable SSL in the spark-job I am getting an exception and the integration does not work.

The ONLY change I did to enable SSL in the spark-job is include the following lines of code in my job:

    sparkConf.set("security.protocol", "SSL");
    sparkConf.set("ssl.truststore.location", "PATH/truststore.jks");
    sparkConf.set("ssl.truststore.password", "passwrd");
    sparkConf.set("ssl.keystore.location", "PATH/keystore.jks");
    sparkConf.set("ssl.keystore.password", "kstore");
    sparkConf.set("ssl.key.password", "keypass");

And this sparkConf is passed when creating the streaming context.

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));

And when I run the job the error I get is as follows:

17/05/24 18:16:39 WARN ConsumerFetcherManager$LeaderFinderThread: [test-consumer-group_bmj-cluster-1495664195784-5f49cbd0-leader-finder-thread], Failed to find leader for Set([bell,0])
java.lang.NullPointerException
    at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
    at kafka.cluster.Broker.connectionString(Broker.scala:62)
    at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
    at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

Kafka Version - 2.11-0.10.2.0
Spark Version - 2.1.0
Scala Version - 2.11.8

Streaming Libraries

  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>

Any help with regards to overcoming this issue?

1

1 Answers

0
votes

With some digging in I was able to figure out the issue I had.

First of all, in order to enable SSL the SSL related, kafka-params need to passed into the KafkaUtils.createDirectStream() method and NOT the sparkConf of the JavaStreamingContext.

Then, the given SSL parameters:

"security.protocol", "SSL"
"ssl.truststore.location", "PATH/truststore.jks"
"ssl.truststore.password", "passwrd"
"ssl.keystore.location", "PATH/keystore.jks"
"ssl.keystore.password", "kstore"
"ssl.key.password", "keypass"

are not supported by the spark-kafka-streaming version "0-8_2.11", which I was using, hence I had to change this to version "0-10_2.11".

This in return has a complete API change of the method: KafkaUtils.createDirectStream() which is used to connect to Kafka.

An explanation is given in the documentation as to how to use it here.

So my final code snippet to connect to Kafka looks like this:

final JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    javaStreamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topicsCollection, kafkaParams)
            );

with kafka-params being a map that holds all the SSL parameters.

Thanks
Shabir