5
votes

I am trying to optimize my spark job by avoiding shuffling as much as possible.

I am using cassandraTable to create the RDD.

The column family's column names are dynamic, thus it is defined as follows:

CREATE TABLE "Profile" (
  key text,
  column1 text,
  value blob,
  PRIMARY KEY (key, column1)
) WITH COMPACT STORAGE AND
  bloom_filter_fp_chance=0.010000 AND
  caching='ALL' AND
  ...

This definition results in CassandraRow RDD elements in the following format:

CassandraRow <key, column1, value>
  • key - the RowKey
  • column1 - the value of column1 is the name of the dynamic column
  • value - the value of the dynamic column

So if I have RK='profile1', with columns name='George' and age='34', the resulting RDD will be:

CassandraRow<key=profile1, column1=name, value=George>
CassandraRow<key=profile1, column1=age, value=34>

Then I need to group elements that share the same key together to get a PairRdd:

PairRdd<String, Iterable<CassandraRow>>

Important to say, that all the elements I need to group are in the same Cassandra node (share the same row key), so I expect the connector to keep the locality of the data.

The problem is that using groupBy or groupByKey causes shuffling. I rather group them locally, because all the data is on the same node:

JavaPairRDD<String, Iterable<CassandraRow>> rdd = javaFunctions(context)
        .cassandraTable(ks, "Profile")
        .groupBy(new Function<ColumnFamilyModel, String>() {
            @Override
            public String call(ColumnFamilyModel arg0) throws Exception {
                return arg0.getKey();
            }
        })

My questions are:

  1. Does using keyBy on the RDD will cause shuffling, or will it keep the data locally?
  2. Is there a way to group the elements by key without shuffling? I read about mapPartitions, but didn't quite understand the usage of it.

Thanks,

Shai

1

1 Answers

5
votes

I think you are looking for spanByKey, a cassandra-connector specific operation that takes advantage of the ordering provided by cassandra to allow grouping of elements without incurring in a shuffle stage.

In your case, it should look like:

sc.cassandraTable("keyspace", "Profile")
  .keyBy(row => (row.getString("key")))
  .spanByKey

Read more in the docs:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#grouping-rows-by-partition-key