1
votes

I am working on a Spring Java Project and integrating Apache spark and cassandra using Datastax connector.

I have autowired sparkSession and the below lines of code seems to work.

Map<String, String> configMap = new HashMap<>();
configMap.put("keyspace", "key1");
configMap.put("table", tableName.toLowerCase());

Dataset<Row> ds = sparkSession.sqlContext().read().format("org.apache.spark.sql.cassandra").options(configMap)
        .load();
ds.show();

In the above step I am loading Datasets and in below step I am doing filtration of datetime field .

String s1 = "2020-06-23 18:51:41";
String s2 = "2020-06-23 18:52:21";

Timestamp from = Timestamp.valueOf(s1);
Timestamp to = Timestamp.valueOf(s2);
ds = ds.filter(df.col("datetime").between(from, to));

Is it possible to apply this filter condition during load itself.If so can someone suggest me how to do this?

Thanks in advance.

2

2 Answers

0
votes

You don't have to do anything explicitly here, spark-cassandra-connector has predicate pushdown, so your filtering condition would be applied during the data selection.

Source: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md

The connector will automatically pushdown all valid predicates to Cassandra. The Datasource will also automatically only select columns from Cassandra which are required to complete the query. This can be monitored with the explain command.

0
votes

This filter will be effectively pushed down only if your column on which you're doing filtering is the first clustering column. As Rayan pointed, we can use the explain command on the dataset to check that predicates pushdown happened - the corresponding predicates should have the * characters near them, like this:

val dcf3 = dc.filter("event_time >= cast('2019-03-10T14:41:34.373+0000' as timestamp) 
   AND event_time <= cast('2019-03-10T19:01:56.316+0000' as timestamp)")

// dcf3.explain
// == Physical Plan ==
// *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [uuid#21,event_time#22,id#23L,value#24] 
// PushedFilters: [ *GreaterThanOrEqual(event_time,2019-03-10 14:41:34.373), *LessThanOrE..., 
// ReadSchema: struct<uuid:string,event_time:timestamp,id:bigint,value...

if predicate won't be pushed, we would see an additional step after scan when the filtering happens on the Spark level.