Im trying to run a spark job on a fraction of my data in Cassandra. I have in hand a RDD of keys (partition and clustering columns) and I would like to run my job only on those keys.
type CassandraKey = (String, String, String, String)
val columns = SomeColumns(ColumnName("pkey1"),ColumnName("pkey2"),ColumnName("pkey3"),ColumnName("ckey1"))
val repartitionedKeys: CassandraPartitionedRDD[CassandraKey] = keys.repartitionByCassandraReplica("keyspace", "table", partitionKeyMapper = columns)
val selectedRows: CassandraJoinRDD[CassandraKey, CassandraRow] =
repartitionedKeys.joinWithCassandraTable[CassandraRow](keyspace, table).on(joinColumns = columns)
selectedRows.collect()
I than receive the following error at BoundStatementBuilder:19
java.lang.IllegalArgumentException: ckey1 is not a column defined in this metadata
my table schema is as follow:
CREATE TABLE "keyspace".table (
pkey1 text,
pkey2 text,
pkey3 text,
ckey1 text,
ckey2 text,
ckey3 timestamp,
data text,
PRIMARY KEY (( pkey1, pkey2, pkey3 ), ckey1, ckey2, ckey3)
)
Looking at code I can see that at BoundStatementBuilder columnTypes are being resolved from a dummy query initiated at ReplicaLocator.keyByReplicas. This query is used to retrieve partition tokens from table, and its constructed where clause on Partition keys only.
In addition I can see that in RDDFunction.repartitionByCassandraReplica:183 the given partitionKeyMapper is ignored but thats doesnt seem to cause any issue.
I am using connector version 1.5.1