I have a large table that grows vertically. I want to read rows in small batches, so that I can process each and save results.
Table definition
CREATE TABLE foo (
uid timeuuid,
events blob,
PRIMARY KEY ((uid))
)
Code attempt 1 - using CassandraSQLContext
// Step 1. Get uuid of the last row in a batch
val max = 10
val rdd = sc.cassandraTable("foo", "bar")
var cassandraRows = rdd.take(max)
var lastUUID = cassandraRows.last.getUUID("uid");
// lastUUID = 131ea620-2e4e-11e4-a2fc-8d5aad979e84
// Step 2. Use last row as a pointer to the start of the next batch
val cc = new CassandraSQLContext(sc)
val cql = s"SELECT events from foo.bar where token(uid) > token($lastUUID) limit $max"
// which is at runtime
// SELECT events from foo.bar WHERE
// token(uid) > token(131ea620-2e4e-11e4-a2fc-8d5aad979e84) limit 10
cc.sql(cql).collect()
Last line throws
Exception in thread "main" java.lang.RuntimeException: [1.79] failure: ``)'' expected but identifier ea620 found
SELECT events from foo.bar where token(uid) > token(131ea620-2e4e-11e4-a2fc-8d5aad979e84) limit 10 ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
But it returns correct 10 records if I run my cql in cqlsh.
Code attempt 2 - using DataStax Cassandra connector
// Step 1. Get uuid of the last row in a batch
val max = 10
val rdd = sc.cassandraTable("foo", "bar")
var cassandraRows = rdd.take(max)
var lastUUID = cassandraRows.last.getUUID("uid");
// lastUUID = 131ea620-2e4e-11e4-a2fc-8d5aad979e84
// Step 2. Execute query
rdd.where(s"token(uid) > token($lastUUID)").take(max)
This throws
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.io.IOException: Exception during preparation of SELECT "uid", "events" FROM "foo"."bar" WHERE token("uid") > ? AND token("uid") <= ? AND uid > $lastUUID ALLOW FILTERING: line 1:118 no viable alternative at character '$'
How to use where token(...) queries in spark and Cassandra?
CassandraSQLContext, possibly u need to wrap the uuid in quotes - aaronmantokenmust not be an SQL function consider switching to standard datastax connector rather than the SQL context - aaronmanWHEREpart and do the ranges in spark, the types of queries you are allowed to do depend on how ur clustering columns are arranged in cassandra, - aaronman