This question might be duplicate.
I want to listen a Kafka Topic from Spark and Pass the content to Ignite Cache.
I like to achieve the same thing described at Performance Tuning of an Apache Kafka/Spark Streaming System.
Used KafkaUtils.createDirectStream()
for reading Kafka topic in Spark and IgniteRDD
for pushing data into Ignite cache.
But system threw error like below:
org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases:
(1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
Code is mentioned below:
public static void main(String[] args) throws Exception{
SparkConf conf = new SparkConf()
.setAppName("kafka-sandbox")
.setMaster("local[*]");
conf.set("spark.driver.allowMultipleContexts", "true");
JavaSparkContext sc = new JavaSparkContext(conf);
//Context for Kafka
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
// Creates Ignite context with specific configuration and runs Ignite in the embedded mode.
IgniteContext igniteContext = new IgniteContext(
sc.sc(),"/home/ec2-user/apache-ignite-fabric-2.6.0-bin/config/default-config.xml", false);
// Adjust the logger to exclude the logs of no interest.
Logger.getRootLogger().setLevel(Level.ERROR);
Logger.getLogger("org.apache.ignite").setLevel(Level.INFO);
// Define data to be stored in the Ignite RDD (cache).
List<Integer> data = new ArrayList<>(20);
for (int i = 0; i<20; i++) {
data.add(i);
}
Set<String> topics = Collections.singleton("Hello-Kafka");
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "10.0.102.251:9092");
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> {
// Create a Java Ignite RDD of Type (Int,Int) Integer Pair.
IgniteRDD sharedRDD = igniteContext.fromCache("hello-spark");
// Preparing a Java RDD
JavaRDD<String> javaRDD = sc.parallelize(Collections.singletonList("Hello-world"));
System.out.println("--- New RDD with " + rdd.partitions().size() + " partitions and " + rdd.count() + " records");
rdd.foreach(record -> {
//Displaying Kafka topic
System.out.println("Got the record : " + record._2);
//Pushing valeus to Ignite
sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() {
@Override public Tuple2<Integer, Integer> call(Integer val) throws Exception {
return new Tuple2<Integer, Integer>(val, val);
}
})
);
ssc.start();
ssc.awaitTermination();
}
}
I am not able to find out what is missing in code. Is the approach is right or I should use another approach. Please guide me for the same.