I'd like to query some data from Cassandra based on values I have in an RDD. My approach is the following:
val userIds = sc.textFile("/tmp/user_ids").keyBy( e => e )
val t = sc.cassandraTable("keyspace", "users").select("userid", "user_name")
val userNames = userIds.flatMap { userId =>
t.where("userid = ?", userId).take(1)
}
userNames.take(1)
While the Cassandra query works in Spark shell, it throws an exception when I used it inside flatMap:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.NullPointerException:
org.apache.spark.rdd.RDD.<init>(RDD.scala:125)
com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:49)
com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:83)
com.datastax.spark.connector.rdd.CassandraRDD.where(CassandraRDD.scala:94)
My understanding is that I cannot produce an RDD (Cassandra results) inside another RDD.
The examples I found on the web read the whole Cassandra table in an RDD and join RDDs (like this: https://cassandrastuff.wordpress.com/2014/07/07/cassandra-and-spark-table-joins/). But it won't scale if the Cassandra table is huge.
But how do I approach this problem instead?