I've got two tables:
- my_keyspace.name with columns:
- name (string) - partition key
- timestamp (date) - second part of partition key
- id (int) - third part of partition key
- my_keyspace.data with columns:
- timestamp (date) - partition key
- id (int) - second part of partition key
- data (string)
I'm trying to join on timestamp and id from a name table. I'm doing it by getting all timestamps and ids associated with a given name and retrieving data from data table for those entries.
It's really fast to do it in CQL. I expected Spark Cassandra to be equally fast at it, but instead it seems to be doing a full table scan. It might be due to not knowing which fields are partition/primary key. Though I don't seem to be able to find a way to tell it the mappings.
How can I make this join as efficient as it should be? Here's my code sample:
private static void notSoEfficientJoin() {
SparkConf conf = new SparkConf().setAppName("Simple Application")
.setMaster("local[*]")
.set("spark.cassandra.connection.host", "localhost")
.set("spark.driver.allowMultipleContexts", "true");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaPairRDD<DataKey, NameRow> nameIndexRDD = javaFunctions(sc).cassandraTable("my_keyspace", "name", mapRowTo(NameRow.class)).where("name = 'John'")
.keyBy(new Function<NameRow, DataKey>() {
@Override
public DataKey call(NameRow v1) throws Exception {
return new DataKey(v1.timestamp, v1.id);
}
});
JavaPairRDD<DataKey, DataRow> dataRDD = javaFunctions(sc).cassandraTable("my_keyspace", "data", mapRowTo(DataRow.class))
.keyBy(new Function<DataRow, DataKey>() {
@Override
public DataKey call(DataRow v1) throws Exception {
return new DataKey(v1.timestamp, v1.id);
}
});
JavaRDD<String> cassandraRowsRDD = nameIndexRDD.join(dataRDD)
.map(new Function<Tuple2<DataKey, Tuple2<NameRow, DataRow>>, String>() {
@Override
public String call(Tuple2<DataKey, Tuple2<NameRow, DataRow>> v1) throws Exception {
NameRow nameRow = v1._2()._1();
DataRow dataRow = v1._2()._2();
return nameRow + " " + dataRow;
}
});
List<String> collect = cassandraRowsRDD.collect();
}