0
votes

Following is the cassandra table schema :

CREATE TABLE my_table ( year text, month text, day text, hour int, min int, sec int, PRIMARY KEY ((year, month, day), hour, min, sec) )

If i run following query using cassandra cql it works:

SELECT * FROM my_table WHERE year ='2017' and month ='01' and day ='16' and (hour,min,sec) > (1,15,0) LIMIT 200

However, when i run same query using spark-cassandra connector it does not work:

sparkSession.read().format("org.apache.spark.sql.cassandra").options(map).load()
                .where(year ='2017' and month ='01' and day ='16' and (hour,min,sec) >= (1,15,0)");

I am getting following exception in logs:

> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> cannot resolve '(struct(`hour`, `min`, `sec`) >= struct(1, 15, 0))'
> due to data type mismatch: differing types in '(struct(`hour`, `min`,
> `sec`) >= struct(1, 15, 0))'  and (struct<hour:int,min:int,sec:int>
> struct<col1:int,col2:int,col3:int>).; line 1 pos 96

Spark-cassandra-connector version:2.0.0-M3

Spark-version:2.0.0

Any help is much appreciated

1
can you try independently like this and hour >= 1 and min >= 15 and sec >=0 because this worked for my scenerio.Akash Sethi
This will not work even in cassandra cql because hour, min and sec are all part of clustering key. To run a range on any column clustering key, the preceding column should be used with equality condition. i.e. to run ">=" on min column , hour can only contain equal to (=) operation condition else following exception occurs: com.datastax.driver.core.exceptions.InvalidQueryException: PRIMARY KEY column "min" cannot be restricted (preceding column "hour" is restricted by a non-EQ relation)Sourav Gulati
@AkashSethi: Strange that it is running in Spark, however it is not giving the desired results. Because the query posted in the question will return all the rows for the day after 01:15:00 . But here in the output the rows for every hour will start after 15th minuteSourav Gulati

1 Answers

1
votes

Quite simply CQL is not Spark Sql or Catalyst compatible. What you are seeing is a conflict in syntax.

This where clause :

.where(year ='2017' and month ='01' and day ='16' and (hour,min,sec) >= (1,15,0)

Is not directly pushed down to Cassandra. Instead it is being transformed into catalyst predicates. This is where you have a problem

Cataylst sees this

(hour,min,sec) >= (1,15,0)

And tries to make types for them

The left hand side becomes

struct<hour:int,min:int,sec:int>

The right hand side becomes

struct<col1:int,col2:int,col3:int>

These are not tuples, but explicitly typed structs. They cannot be directly compared hence your error. In the DataFrame api you would just define a new Struct with the correct types and make a literal of that but I'm not sure how to express that in SparkSQL.

Regardless this tuple predicate will not be pushed down to Cassandra. The Struct you are defining of hour, min, sec is going to be hidden from Cassandra because the underlying table doesn't provide a Struct<hour, min, sec> which means that Spark thinks it needs to generate that after pulling the data from Cassandra.

You are better off just using the separate clauses with AND as mentioned by @AkashSethi