2
votes

I've got two tables:

  1. my_keyspace.name with columns:
    • name (string) - partition key
    • timestamp (date) - second part of partition key
    • id (int) - third part of partition key
  2. my_keyspace.data with columns:
    • timestamp (date) - partition key
    • id (int) - second part of partition key
    • data (string)

I'm trying to join on timestamp and id from a name table. I'm doing it by getting all timestamps and ids associated with a given name and retrieving data from data table for those entries.

It's really fast to do it in CQL. I expected Spark Cassandra to be equally fast at it, but instead it seems to be doing a full table scan. It might be due to not knowing which fields are partition/primary key. Though I don't seem to be able to find a way to tell it the mappings.

How can I make this join as efficient as it should be? Here's my code sample:

private static void notSoEfficientJoin() {
    SparkConf conf = new SparkConf().setAppName("Simple Application")
                                    .setMaster("local[*]")
                                    .set("spark.cassandra.connection.host", "localhost")
                                    .set("spark.driver.allowMultipleContexts", "true");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaPairRDD<DataKey, NameRow> nameIndexRDD = javaFunctions(sc).cassandraTable("my_keyspace", "name", mapRowTo(NameRow.class)).where("name = 'John'")
                                                                       .keyBy(new Function<NameRow, DataKey>() {
                                                                           @Override
                                                                           public DataKey call(NameRow v1) throws Exception {
                                                                               return new DataKey(v1.timestamp, v1.id);
                                                                           }
                                                                       });

    JavaPairRDD<DataKey, DataRow> dataRDD = javaFunctions(sc).cassandraTable("my_keyspace", "data", mapRowTo(DataRow.class))
                                                          .keyBy(new Function<DataRow, DataKey>() {
                                                              @Override
                                                              public DataKey call(DataRow v1) throws Exception {
                                                                  return new DataKey(v1.timestamp, v1.id);
                                                              }
                                                          });

    JavaRDD<String> cassandraRowsRDD = nameIndexRDD.join(dataRDD)
                                                       .map(new Function<Tuple2<DataKey, Tuple2<NameRow, DataRow>>, String>() {
                                                           @Override
                                                           public String call(Tuple2<DataKey, Tuple2<NameRow, DataRow>> v1) throws Exception {
                                                               NameRow nameRow = v1._2()._1();
                                                               DataRow dataRow = v1._2()._2();
                                                               return nameRow + " " + dataRow;
                                                           }
                                                       });

    List<String> collect = cassandraRowsRDD.collect();
}
1

1 Answers

2
votes

The way to do this join more efficiently is to actually invoke joinWithCassandraTable this can be done by wrapping results with another javaFunctions call:

private static void moreEfficientJoin() {
    SparkConf conf = new SparkConf().setAppName("Simple Application")
                                    .setMaster("local[*]")
                                    .set("spark.cassandra.connection.host", "localhost")
                                    .set("spark.driver.allowMultipleContexts", "true");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<DataKey> nameIndexRDD = sc.parallelize(javaFunctions(sc).cassandraTable("my_keyspace", "name", mapRowTo(DataKey.class))
                                                                    .where("name = 'John'")
                                                                    .collect());

    JavaRDD<Data> dataRDD = javaFunctions(nameIndexRDD).joinWithCassandraTable("my_keyspace", "data", allColumns, someColumns("timestamp", "id"), mapRowTo(Data.class), mapToRow(DataKey.class))
                                                       .map(new Function<Tuple2<DataKey, Data>, Data>() {
                                                           @Override
                                                           public Data call(Tuple2<DataKey, Data> v1) throws Exception {
                                                               return v1._2();
                                                           }
                                                       });
    List<Data> data = dataRDD.collect();
}

The important thing is to wrap a JavaRDD with javaFunctions. So it is possible to not call collect and sc.parallelize on nameIndexRDD