0
votes

This question might be duplicate.

I want to listen a Kafka Topic from Spark and Pass the content to Ignite Cache. I like to achieve the same thing described at Performance Tuning of an Apache Kafka/Spark Streaming System.
Used KafkaUtils.createDirectStream() for reading Kafka topic in Spark and IgniteRDD for pushing data into Ignite cache. But system threw error like below:
org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: (1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. (2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
Code is mentioned below:

public static void main(String[] args) throws Exception{

        SparkConf conf = new SparkConf()
     .setAppName("kafka-sandbox")
                .setMaster("local[*]");
        conf.set("spark.driver.allowMultipleContexts", "true");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //Context for Kafka
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
        // Creates Ignite context with specific configuration and runs Ignite in the embedded mode.
        IgniteContext igniteContext = new IgniteContext(
            sc.sc(),"/home/ec2-user/apache-ignite-fabric-2.6.0-bin/config/default-config.xml", false);


        // Adjust the logger to exclude the logs of no interest.
        Logger.getRootLogger().setLevel(Level.ERROR);
        Logger.getLogger("org.apache.ignite").setLevel(Level.INFO);



        // Define data to be stored in the Ignite RDD (cache).
        List<Integer> data = new ArrayList<>(20);

        for (int i = 0; i<20; i++) {
            data.add(i);
        }



        Set<String> topics = Collections.singleton("Hello-Kafka");
        Map<String, String> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", "10.0.102.251:9092");

        JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
                String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);



        directKafkaStream.foreachRDD(rdd -> {






        // Create a Java Ignite RDD of Type (Int,Int) Integer Pair.
        IgniteRDD sharedRDD = igniteContext.fromCache("hello-spark");
        // Preparing a Java RDD
        JavaRDD<String> javaRDD =  sc.parallelize(Collections.singletonList("Hello-world"));
                        System.out.println("--- New RDD with " + rdd.partitions().size() + " partitions and " + rdd.count() + " records");
                        rdd.foreach(record -> {
                            //Displaying Kafka topic
                                System.out.println("Got the record : " + record._2);
                                //Pushing valeus to Ignite
                               sharedRDD.savePairs(javaRDD.<Integer, Integer>mapToPair(new PairFunction<Integer, Integer, Integer>() {
           @Override public Tuple2<Integer, Integer> call(Integer val) throws Exception {
                return new Tuple2<Integer, Integer>(val, val);
            }
        })
);
        ssc.start();
        ssc.awaitTermination();


                }
}

I am not able to find out what is missing in code. Is the approach is right or I should use another approach. Please guide me for the same.

1

1 Answers

1
votes

Your example may be reduced to the following code:

JavaRDD<Integer> rdd = sparkCtx.parallelize(Arrays.asList(1, 2, 3));
JavaRDD<Integer> javaRDD = sparkCtx.parallelize(Arrays.asList(4, 5, 6));
JavaIgniteRDD<Integer, Integer> sharedRDD = igniteCtx.fromCache("hello-spark");

rdd.foreach(record ->
    sharedRDD.savePairs(
        javaRDD.mapToPair((PairFunction<Integer, Integer, Integer>)val ->
            new Tuple2<>(val, val))
    ));

I removed Kafka out of the equation to simplify the example.

First of all, this is strange, that you iterate over elements of rdd and put values of javaRDD into sharedRDD, while ignoring the rdd records. Where rdd and javaRDD are different things. I don't get, why you do this.

You get the exception because you run mapToPair operation inside the foreach. Both of them are RDD operations, which cannot be nested. You should either move the savePairs part out of the foreach, or combine rdd and javaRDD in some way, that won't require running nested RDD operations. It depends on what you are really trying to achieve.