1
votes

We are using Cassandra DataStax 6.0 and Spark enabled. We have 10GB of data coming every day. All queries are based on date. We have one huge table with 40 columns. We are planning to generate reports using Spark. What is the best way to setup this data. Since we keep getting data every day and save data for around 1 year in one table.

We tried to use different partition but most of our keys are based on date.

No code just need suggestion

Our query should be fast enough. We have 256GB Ram with 9 nodes. 44 core CPU.

1
Are you processing that data only by Spark, or also accessing them by other APIs? - Alex Ott
Only by Spark processing is being done - Hyder Tom

1 Answers

1
votes

Having the data organized in the daily partitions isn't very good design - in this case, only RF nodes will be active during the day writing the data, and then at the time of the report generation.

Because you'll be accessing that data only from Spark, you can use following approach - have some bucket field as partition key, for example, with uniformly generated random number, and timestamp as a clustering column, and maybe another uuid column for uniqueness guarantee of records, something like this:

create table test.sdtest (
  b int,
  ts timestamp,
  uid uuid,
  v1 int,
  primary key(b, ts, uid));

Where maximum value for generatio of b should be selected to have not too very big and not very small partitions, so we can effectively read them.

And then we can run Spark code like this:

import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("sdtest", "test").load()
val filtered = data.filter("ts >= cast('2019-03-10T00:00:00+0000' as timestamp) AND ts < cast('2019-03-11T00:00:00+0000' as timestamp)")

The trick here is that we distribute data across the nodes by using the random partition key, so the all nodes will handle the load during writing the data and during the report generation.

If we look into physical plan for that Spark code (formatted for readability):

== Physical Plan ==
*Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [b#23,ts#24,v1#25] 
PushedFilters: [*GreaterThanOrEqual(ts,2019-03-10 00:00:00.0),
  *LessThan(ts,2019-03-11 00:00:00.0)], ReadSchema: struct<b:int,ts:timestamp,v1:int>

We can see that both conditions will be pushed to DSE on the CQL level - this means, that Spark won't load all data into memory and filter them, but instead all filtering will happen in Cassandra, and only necessary data will be returned back. And because we're spreading requests between multiple nodes, the reading could be faster (need to test) than reading one giant partition. Another benefit of this design, is that it will be easy to perform deletion of the old data using Spark, with something like this:

val toDel = sc.cassandraTable("test", "sdtest").where("ts < '2019-08-10T00:00:00+0000'")
toDel.deleteFromCassandra("test", "sdtest", keyColumns = SomeColumns("b", "ts"))

In this case, Spark will perform very effective range/row deletion that will generate less tombstones.

P.S. it's recommended to use DSE's version of the Spark connector as it may have more optimizations.

P.P.S. theoretically, we can merge ts and uid into one timeuuid column, but I'm not sure that it will work with Dataframes.