1
votes

I have a large table that grows vertically. I want to read rows in small batches, so that I can process each and save results.

Table definition

CREATE TABLE foo ( 
uid timeuuid, 
events blob, 
PRIMARY KEY ((uid)) 
)

Code attempt 1 - using CassandraSQLContext

// Step 1. Get uuid of the last row in a batch
val max = 10
val rdd = sc.cassandraTable("foo", "bar")
var cassandraRows = rdd.take(max)
var lastUUID = cassandraRows.last.getUUID("uid"); 
// lastUUID = 131ea620-2e4e-11e4-a2fc-8d5aad979e84


// Step 2. Use last row as a pointer to the start of the next batch
val cc = new CassandraSQLContext(sc)
val cql = s"SELECT events from foo.bar where token(uid) > token($lastUUID) limit $max"

// which is at runtime
// SELECT events from foo.bar WHERE 
// token(uid) > token(131ea620-2e4e-11e4-a2fc-8d5aad979e84) limit 10

cc.sql(cql).collect()

Last line throws

Exception in thread "main" java.lang.RuntimeException: [1.79] failure: ``)'' expected but identifier ea620 found

SELECT events from foo.bar where token(uid) > token(131ea620-2e4e-11e4-a2fc-8d5aad979e84) limit 10 ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)

But it returns correct 10 records if I run my cql in cqlsh.

Code attempt 2 - using DataStax Cassandra connector

// Step 1. Get uuid of the last row in a batch
val max = 10
val rdd = sc.cassandraTable("foo", "bar")
var cassandraRows = rdd.take(max)
var lastUUID = cassandraRows.last.getUUID("uid"); 
// lastUUID = 131ea620-2e4e-11e4-a2fc-8d5aad979e84

// Step 2. Execute query
rdd.where(s"token(uid) > token($lastUUID)").take(max)

This throws

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.io.IOException: Exception during preparation of SELECT "uid", "events" FROM "foo"."bar" WHERE token("uid") > ? AND token("uid") <= ? AND uid > $lastUUID ALLOW FILTERING: line 1:118 no viable alternative at character '$'

How to use where token(...) queries in spark and Cassandra?

1
not sure what exactly is causing the error but the spark query should use SQL syntax rather than CQL because you are using CassandraSQLContext, possibly u need to wrap the uuid in quotes - aaronman
token must not be an SQL function consider switching to standard datastax connector rather than the SQL context - aaronman
CQL queries are very limited, I would use it solely for loading the necessary data in and then just use as a normal RDD - aaronman
do the same query but get rid of the WHERE part and do the ranges in spark, the types of queries you are allowed to do depend on how ur clustering columns are arranged in cassandra, - aaronman

1 Answers

0
votes

I would use the DataStax Cassandra Java Driver. Similar to your CassandraSQLContext, you would select chunks like this:

val query = QueryBuilder.select("events")
  .where(gt(token("uid"),token(lastUUID))
  .limit(10)
val rows = session.execute(query).all()

If you want to asynchronously query, session also has executeAsync, which returns a RichListenableFuture which can be wrapped by a scala Future by adding a callback.