0
votes

Im trying to run a spark job on a fraction of my data in Cassandra. I have in hand a RDD of keys (partition and clustering columns) and I would like to run my job only on those keys.

type CassandraKey = (String, String, String, String)
val columns = SomeColumns(ColumnName("pkey1"),ColumnName("pkey2"),ColumnName("pkey3"),ColumnName("ckey1"))
val repartitionedKeys: CassandraPartitionedRDD[CassandraKey] = keys.repartitionByCassandraReplica("keyspace", "table", partitionKeyMapper = columns)
val selectedRows: CassandraJoinRDD[CassandraKey, CassandraRow] =
    repartitionedKeys.joinWithCassandraTable[CassandraRow](keyspace, table).on(joinColumns = columns)
selectedRows.collect()

I than receive the following error at BoundStatementBuilder:19

java.lang.IllegalArgumentException: ckey1 is not a column defined in this metadata

my table schema is as follow:

CREATE TABLE "keyspace".table (
pkey1 text,
pkey2 text,
pkey3 text,
ckey1 text,
ckey2 text,
ckey3 timestamp,
data text,
PRIMARY KEY (( pkey1, pkey2, pkey3 ), ckey1, ckey2, ckey3)
) 

Looking at code I can see that at BoundStatementBuilder columnTypes are being resolved from a dummy query initiated at ReplicaLocator.keyByReplicas. This query is used to retrieve partition tokens from table, and its constructed where clause on Partition keys only.

In addition I can see that in RDDFunction.repartitionByCassandraReplica:183 the given partitionKeyMapper is ignored but thats doesnt seem to cause any issue.

I am using connector version 1.5.1

1

1 Answers

1
votes

The "Repartition" part can only be on the Partition key so don't specify columns there or if you do only choose the Partition Key columns. Only specify all of the join columns with the joinWithCassandraTable call.