0
votes

I have written a topic in Kafka as my-topic and I am trying to fetch the information of topic in spark. But I am facing some difficulty in displaying Kafka topic details as I am getting long list of errors. I am using java for fetching data.

Below is my code:

public static void main(String s[]) throws InterruptedException{
    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Sampleapp");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "localhost:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "Different id is allotted for different stream");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);

    Collection<String> topics = Arrays.asList("my-topic");

    final JavaInputDStream<ConsumerRecord<String, String>> stream =
      KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
      );

    JavaPairDStream<String, String> jPairDStream =  stream.mapToPair(
            new PairFunction<ConsumerRecord<String, String>, String, String>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;

                @Override
                public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
                    return new Tuple2<>(record.key(), record.value());
                }
            });

    jPairDStream.foreachRDD(jPairRDD -> {
           jPairRDD.foreach(rdd -> {
                System.out.println("key="+rdd._1()+" value="+rdd._2());
            });
        });

    jssc.start();            
    jssc.awaitTermination(); 

    stream.mapToPair(
            new PairFunction<ConsumerRecord<String, String>, String, String>() {
                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;

                @Override
                public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
                    return new Tuple2<>(record.key(), record.value());
                }
            });
}

Below is the Error I am getting:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 17/09/04 11:41:15 INFO SparkContext: Running Spark version 2.1.0 17/09/04 11:41:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/09/04 11:41:15 INFO SecurityManager: Changing view acls to: 11014525 17/09/04 11:41:15 INFO SecurityManager: Changing modify acls to: 11014525 17/09/04 11:41:15 INFO SecurityManager: Changing view acls groups to: 17/09/04 11:41:15 INFO SecurityManager: Changing modify acls groups to: 17/09/04 11:41:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(11014525); groups with view permissions: Set(); users with modify permissions: Set(11014525); groups with modify permissions: Set() 17/09/04 11:41:15 INFO Utils: Successfully started service 'sparkDriver' on port 56668. 17/09/04 11:41:15 INFO SparkEnv: Registering MapOutputTracker 17/09/04 11:41:15 INFO SparkEnv: Registering BlockManagerMaster 17/09/04 11:41:15 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 17/09/04 11:41:15 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 17/09/04 11:41:15 INFO DiskBlockManager: Created local directory at C:\Users\11014525\AppData\Local\Temp\blockmgr-cba489b9-2458-455a-8c03-4c4395a01d44 17/09/04 11:41:15 INFO MemoryStore: MemoryStore started with capacity 896.4 MB 17/09/04 11:41:16 INFO SparkEnv: Registering OutputCommitCoordinator 17/09/04 11:41:16 INFO Utils: Successfully started service 'SparkUI' on port 4040. 17/09/04 11:41:16 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.16.202.21:4040 17/09/04 11:41:16 INFO Executor: Starting executor ID driver on host localhost 17/09/04 11:41:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 56689. 17/09/04 11:41:16 INFO NettyBlockTransferService: Server created on 172.16.202.21:56689 17/09/04 11:41:16 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 17/09/04 11:41:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 172.16.202.21, 56689, None) 17/09/04 11:41:16 INFO BlockManagerMasterEndpoint: Registering block manager 172.16.202.21:56689 with 896.4 MB RAM, BlockManagerId(driver, 172.16.202.21, 56689, None) 17/09/04 11:41:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 172.16.202.21, 56689, None) 17/09/04 11:41:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 172.16.202.21, 56689, None) 17/09/04 11:41:16 WARN KafkaUtils: overriding enable.auto.commit to false for executor 17/09/04 11:41:16 WARN KafkaUtils: overriding auto.offset.reset to none for executor 17/09/04 11:41:16 WARN KafkaUtils: overriding executor group.id to spark-executor-Different id is allotted for different stream 17/09/04 11:41:16 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135 17/09/04 11:41:16 INFO DirectKafkaInputDStream: Slide time = 10000 ms 17/09/04 11:41:16 INFO DirectKafkaInputDStream: Storage level = Serialized 1x Replicated 17/09/04 11:41:16 INFO DirectKafkaInputDStream: Checkpoint interval = null 17/09/04 11:41:16 INFO DirectKafkaInputDStream: Remember interval = 10000 ms 17/09/04 11:41:16 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@23a3407b 17/09/04 11:41:16 INFO MappedDStream: Slide time = 10000 ms 17/09/04 11:41:16 INFO MappedDStream: Storage level = Serialized 1x Replicated 17/09/04 11:41:16 INFO MappedDStream: Checkpoint interval = null 17/09/04 11:41:16 INFO MappedDStream: Remember interval = 10000 ms 17/09/04 11:41:16 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@140030a9 17/09/04 11:41:16 INFO ForEachDStream: Slide time = 10000 ms 17/09/04 11:41:16 INFO ForEachDStream: Storage level = Serialized 1x Replicated 17/09/04 11:41:16 INFO ForEachDStream: Checkpoint interval = null 17/09/04 11:41:16 INFO ForEachDStream: Remember interval = 10000 ms 17/09/04 11:41:16 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@65041548 17/09/04 11:41:16 ERROR StreamingContext: Error starting the context, marking it as stopped org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy" which has no default value. at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:48) at org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:194) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:380) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:363) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:350) at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) at Json.ExcelToJson.SparkConsumingKafka.main(SparkConsumingKafka.java:56) 17/09/04 11:41:16 INFO ReceiverTracker: ReceiverTracker stopped 17/09/04 11:41:16 INFO JobGenerator: Stopping JobGenerator immediately 17/09/04 11:41:16 INFO RecurringTimer: Stopped timer for JobGenerator after time -1 17/09/04 11:41:16 INFO JobGenerator: Stopped JobGenerator 17/09/04 11:41:16 INFO JobScheduler: Stopped JobScheduler Exception in thread "main" org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy" which has no default value. at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:48) at org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:194) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:380) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:363) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:350) at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:83) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75) at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:578) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) at Json.ExcelToJson.SparkConsumingKafka.main(SparkConsumingKafka.java:56) 17/09/04 11:41:16 INFO SparkContext: Invoking stop() from shutdown hook 17/09/04 11:41:16 INFO SparkUI: Stopped Spark web UI at http://172.16.202.21:4040 17/09/04 11:41:16 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 17/09/04 11:41:16 INFO MemoryStore: MemoryStore cleared 17/09/04 11:41:16 INFO BlockManager: BlockManager stopped 17/09/04 11:41:16 INFO BlockManagerMaster: BlockManagerMaster stopped 17/09/04 11:41:16 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 17/09/04 11:41:16 INFO SparkContext: Successfully stopped SparkContext 17/09/04 11:41:16 INFO ShutdownHookManager: Shutdown hook called 17/09/04 11:41:16 INFO ShutdownHookManager: Deleting directory C:\Users\11014525\AppData\Local\Temp\spark-37334cdc-9680-4801-8e50-ef3024ed1d8a

pom.xml

org.apache.spark spark-streaming_2.11 2.1.0 commons-lang commons-lang 2.6 org.apache.kafka kafka_2.10 0.8.2.0 org.apache.spark spark-streaming-kafka-0-10_2.10 2.1.1

1
org.apache.kafka.common.config.ConfigException: Missing required configuration "partition.assignment.strategy" which has no default value. that might have something to do with itStultuske
@bleedcode I have edited my answer below based on the pom.xml details you have provided. Please try it and let me know if it resolves your issue.abaghel

1 Answers

1
votes

From the log, your spark version is 2.1.0. You have not shared the build file having other dependencies. It looks like you have both spark-streaming-kafka-0-8_2.11-2.1.0.jar and spark-streaming-kafka-0-10_2.11-2.1.0.jar in classpath and it is loading the wrong class. If you are using maven then you would need dependencies like below. Please check and update your project.

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.0</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.1.0</version>
</dependency>
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.0</version>
</dependency>  
<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.1.0</version>
</dependency> 

EDIT

As you have edited the question and posted the dependencies I am editing my Answer. You are using Kafka version 0.8.* while your spark-streaming-kafka version is 0.10.*. Please use same version for Kafka dependencies. Please use below dependency for org.apache.kafka

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.2.0</version>
</dependency>