6
votes

I'm evaluating spark-cassandra-connector and i'm struggling trying to get a range query on partition key to work.

According to the connector's documentation it seems that's possible to make server-side filtering on partition key using equality or IN operator, but unfortunately, my partition key is a timestamp, so I can not use it.

So I tried using Spark SQL with the following query ('timestamp' is the partition key):

select * from datastore.data where timestamp >= '2013-01-01T00:00:00.000Z' and timestamp < '2013-12-31T00:00:00.000Z'

Although the job spawns 200 tasks, the query is not returning any data.

Also I can assure that there is data to be returned since running the query on cqlsh (doing the appropriate conversion using 'token' function) DOES return data.

I'm using spark 1.1.0 with standalone mode. Cassandra is 2.1.2 and connector version is 'b1.1' branch. Cassandra driver is DataStax 'master' branch. Cassandra cluster is overlaid on spark cluster with 3 servers with replication factor of 1.

Here is the job's full log

Any clue anyone?

Update: When trying to do server-side filtering based on the partition key (using CassandraRDD.where method) I get the following exception:

Exception in thread "main" java.lang.UnsupportedOperationException: Range predicates on partition key columns (here: timestamp) are not supported in where. Use filter instead.

But unfortunately I don't know what "filter" is...

2

2 Answers

8
votes

You have several options to get the solution you are looking for.

The most powerful one would be to use Lucene indexes integrated with Cassandra by Stratio, which allows you to search by any indexed field in the server side. Your writing time will be increased but, on the other hand, you will be able to query any time range. You can find further information about Lucene indexes in Cassandra here. This extended version of Cassandra is fully integrated into the deep-spark project so you can take all the advantages of the Lucene indexes in Cassandra through it. I would recommend you to use Lucene indexes when you are executing a restricted query that retrieves a small-medium result set, if you are going to retrieve a big piece of your data set, you should use the third option underneath.

Another approach, depending on how your application works, might be to truncate your timestamp field so you can look for it using an IN operator. The problem is, as far as I know, you can't use the spark-cassandra-connector for that, you should use the direct Cassandra driver which is not integrated with Spark, or you can have a look at the deep-spark project where a new feature allowing this is about to be released very soon. Your query would look something like this:

select * from datastore.data where timestamp IN ('2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04', ... , '2013-12-31')

, but, as I said before, I don't know if it fits to your needs since you might not be able to truncate your data and group it by date/time.

The last option you have, but the less efficient, is to bring the full data set to your spark cluster and apply a filter on the RDD.

Disclaimer: I work for Stratio :-) Don't hesitate on contacting us if you need any help.

I hope it helps!

8
votes

i think the CassandraRDD error is telling that the query that you are trying to do is not allowed in Cassandra and you have to load all the table in a CassandraRDD and then make a spark filter operation over this CassandraRDD.

So your code (in scala) should something like this:

val cassRDD= sc.cassandraTable("keyspace name", "table name").filter(row=> row.getDate("timestamp")>=DateFormat('2013-01-01T00:00:00.000Z')&&row.getDate("timestamp") < DateFormat('2013-12-31T00:00:00.000Z'))

If you are interested in making this type of queries you might have to take a look to others Cassandra connectors, like the one developed by Stratio