1
votes

I have table in cassandra where A(String) and B (int) are partiton keys together I am writing sql query in spark sql

select ("SELECT * from table where A IN ("221",...) and B IN(32,323...));

On Explain plan it seems to be doing batch scan instead of direct join on partition keys

== Physical Plan ==
Project [A,B ... other columns]
+- BatchScan[A,B ... other columns] Cassandra Scan: dev.table
  • Cassandra Filters: [["A" IN (?, ?, ?, ?), D],["B" IN (?, ?, ?, ?, ?, ?, ?, ?, ?, ?), D]]
  • Requested Columns: [A,B ...]

Also in documentation https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md spark.cassandra.sql.inClauseToJoinConversionThreshold is set to 25 ..

I was curious if there would be any scenario in which In clause on primary key would work better than direct join

2

2 Answers

2
votes

This works for me

cqlsh> CREATE TABLE IF NOT EXISTS test.tab4 (k1 varchar, k2 int, PRIMARY KEY (k1, k2));

bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.12:3.0.0-beta --conf spark.cassandra.sql.inClauseToJoinConversionThreshold=10

scala> spark.conf.set(s"spark.sql.catalog.mycatalog", "com.datastax.spark.connector.datasource.CassandraCatalog")
scala> spark.sql("""SELECT * FROM mycatalog.test.tab4 where k1 in ("1","2","3","4") and k2 in (3,4,5,6,7)""").explain
== Physical Plan ==
*(1) Project [k1#43, k2#44]
+- BatchScan[k1#43, k2#44] class com.datastax.spark.connector.datasource.CassandraInJoin

The conversion may not work if the types used in the predicates don't match C* schema. Also, note that inClauseToJoinConversionThreshold is based on cross products of IN values. The cross product in my query is 20.

In clause query may be more performant than direct join only for small IN values cross products. The default (2500) is a bit too high and may be lowered in the future.

If this doesn't help than I would need to see your schema, the exact query you issued and SCC/Spark versions.

0
votes

Haven't done Cassandra and Spark for a while, but it's not that confusing considering the keyspace structure in Cassandra. Cassandra distributes the rows to the correct shards by the hashed partition key. Therefore, if you query a lot of different partitions a batch scan might be even faster. I guess you would get a more desired result and better performance, if you use another schema for your tables and move the partition key to a column key, so you simply just need to use filter or range operations on the database itself. Map all results together and you get your result.