3
votes

I have a table schema as

appname text,
randomnum int,
addedtime timestamp,
shortuuid text,
assetname text,
brandname text,

PRIMARY KEY ((appname, randomnum), addedtime, shortuuid)

addedtime is clustering key

Now when I am using pushdown filter on clustering key addedtime, I do not see it getting applied

val rdd = tabledf.filter("addedtime > '" + _to + "'").explain
== Physical Plan ==
Filter (cast(addedtime#2 as string) > 2016-12-20 11:00:00)

According to the docs, it should get applied https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#pushdown-filter-examples

Also it was working in spark cassandra connector 1.4 but not with the latest one cassandra connector 1.6.0-M1. Please let me know the issue

2
That looks like a bug in the connector you should file a Jira with the project.RussS
Sure few more observations. I tried with spark 1.5 and 1.6 and its not working. I tried old connector i.e 1.4 with spark 1.6 and it is not working. So the connector which is working with spark 1.4, the same connector is not working with spark 1.6Nipun
nvm figured it out, see answer belowRussS
I cannot see any answer below?Nipun
I wasn't quite done typing sorry :)RussS

2 Answers

7
votes

Problem analysis

The issue seems to be the way Catalyst is processing the comparison.

When doing

val rdd = tabledf.filter("addedtime > '" + _to + "'").explain

It is casting the the addedTime column to a String then doing the comparison. Catalyst is not presenting this predicate to the the Spark Cassandra Connector so there is no way to push it.

INFO  2016-03-08 17:10:49,011 org.apache.spark.sql.cassandra.CassandraSourceRelation: Input Predicates: []
Filter (cast(addedtime#2 as string) > 2015-08-03)

This is also wrong because it's doing a string comparison (which lexically will work here but isn't really what you want to do) So this looks like a bug in Catalyst since we should probably present the predicate to the source even if there is a "cast". There is a workaround though which involves giving the Catalyst optimizer what it wants to see.

Workaround

If instead we give a type hint

df.filter("addedtime > cast('2015-08-03' as timestamp)").explain

Then Spark will generate the correct comparison without the string Cast

DEBUG 2016-03-08 17:11:09,792 org.apache.spark.sql.cassandra.CassandraSourceRelation: Basic Rules Applied:
C* Filters: [GreaterThan(addedtime,2015-08-03 00:00:00.0)]
Spark Filters []

== Physical Plan ==
Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@332464fe[appname#0,randomnum#1,addedtime#2,shortuuid#3] PushedFilters: [GreaterThan(addedtime,2015-08-03 00:00:00.0)]
0
votes

You can also use a java.sql.Timestamp

val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
val date = LocalDateTime.parse("2015-08-03", dateFormatter)
val timestamp= Timestamp.from(date.atZone(ZoneId.systemDefault()).toInstant)

df.filter($"addedtime" > timestamp).explain