0
votes

Goal: Read kafka with spark streaming and store data in cassandra By: Java Spark cassandra connector 1.6 Data input: simple json line object {"id":"1","field1":"value1}

i´ve a java class to read from kafka by spark streaming, processing the data read and then store it in cassandra.

here is the main code:

    **JavaPairReceiverInputDStream**<String, String> messages =
            KafkaUtils.createStream(ssc,
                    targetKafkaServerPort, targetTopic, topicMap);

    **JavaDStream** list = messages.map(new Function<Tuple2<String,String>,List<Object>>(){
        public List<Object> call(    Tuple2<String,String> tuple2){
            List<Object> **list**=new ArrayList<Object>();

            Gson gson = new Gson();
            MyClass myclass = gson.fromJson(tuple2._2(), MyClass.class);
            myclass.setNewData("new_data");
            String jsonInString = gson.toJson(myclass);
            list.add(jsonInString);
            return list;
        }
    });

The next code is incorrect:

    **javaFunctions**(list)
            .writerBuilder("schema", "table", mapToRow(JavaDStream.class))
            .saveToCassandra();

Because "javaFunctions" method expect a JavaRDD object and "list" is a JavaDStream...

I´d need to cast JavaDStream to JavaRDD but I don´t find the right way...

Any help?

2

2 Answers

1
votes

Let's use import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.* instead of com.datastax.spark.connector.japi.CassandraJavaUtil.*

0
votes

ummmm not really...What I´ve done is use a foreachRDD after create the dsStream:

    dStream.foreachRDD(new Function<JavaRDD<MyObject>, Void>() {
        @Override
        public Void call(JavaRDD<MyObject> rdd) throws Exception {
            if (rdd != null) {
                javaFunctions(rdd)
                        .writerBuilder("schema", "table", mapToRow(MyObject.class))
                        .saveToCassandra();
                logging(" -->  Saved data to cassandra",1,null);
            }

            return null;
        }
    });

Hope to be usefull...