When loading data from Cassandra table, a spark partition represents all rows with same partition key. However, when I create data in spark with same partition key and re-partitioning the new RDD using .repartitionByCassandraReplica(..) method, it ends up in a different spark partition? How do I achieve consistent partitions in spark using the partition-scheme defined by the Spark-Cassandra connector?
Links to download CQL and Spark job code that I tested
- .CQL with the keyspace and table schema.
- Spark job and other classes.
Version and other information
- Spark : 1.3
- Cassandra : 2.1
- Connector : 1.3.1
- Spark nodes (5) and Cass* cluster nodes (4) runs in different data centers
Code extract. Download code using above links for more details
Step 1 : Loads data into 8 spark partitions
Map<String, String> map = new HashMap<String, String>();
CassandraTableScanJavaRDD<TestTable> tableRdd = javaFunctions(conf)
.cassandraTable("testkeyspace", "testtable", mapRowTo(TestTable.class, map));
Step 2 : Repartition data into 8 partitions
.repartitionByCassandraReplica(
"testkeyspace",
"testtable",
partitionNumPerHost,
someColumns("id"),
mapToRow(TestTable.class, map));
Step 3: Print partition id and values for both rdds
rdd.mapPartitionsWithIndex(...{
@Override
public Iterator<String> call(..) throws Exception {
List<String> list = new ArrayList<String>();
list.add("PartitionId-" + integer);
while (itr.hasNext()) {
TestTable value = itr.next();
list.add(Integer.toString(value.getId()));
}
return list.iterator();
}
}, true).collect();
Step 4 : Snapshot of results printed on Partition 1. Different for both Rdds but expect to be same
Load Rdd values
----------------------------
Table load - PartitionId -1
----------------------------
15
22
--------------------------------------
Repartitioned values - PartitionId -1
--------------------------------------
33
16