1
votes

I'm using Apache Hadoop, MapReduce and Cassandra to run a MapReduce job that reads in from a Cassandra table, and outputs to another Cassandra table.

I have a few jobs that output to a table with a single primary key. For example, this table for counting the number of each type of word has a single key.

    CREATE TABLE word_count(
        word text,
        count int,
        PRIMARY KEY(text)
    ) WITH COMPACT STORAGE;

The associated reduce class looks a bit like this:

public static class ReducerToCassandra 
    extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>
{
    public void reduce(Text word, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException
    {
        int sum = 0;
        for (IntWritable val : values){
            sum += val.get();
        }

        org.apache.cassandra.thrift.Column c 
                = new org.apache.cassandra.thrift.Column();
        c.setName(ByteBufferUtil.bytes("count");
        c.setValue(ByteBufferUtil.bytes(sum));
        c.setTimestamp(System.currentTimeMillis());

        Mutation mutation = new Mutation();
        mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn());
        mutation.column_or_supercolumn.setColumn(c);

        ByteBuffer keyByteBuffer = ByteBufferUtil.bytes(word.toString());
        context.write(keyByteBuffer, Collections.singletonList(mutation));
    }
}

If I want to add an extra column, then I just need to add another mutation to the List<Mutation> already being output by reduce but I can't work out how to output to a table that has the new column in a composite primary key. For example, this table does the same as the one above, but also indexes words along with the hour of their publication.

    CREATE TABLE word_count(
        word text,
        publication_hour bigint,
        count int,
        PRIMARY KEY(word, publication_hour)
    ) WITH COMPACT STORAGE;

I've tried a few different approaches, like trying to output a custom WritableComparable (that holds both a word and a hour) and updating the class and method signatures and job configuration accordingly, but that makes reduce throw a ClassCastException when it tries to cast the custom WritableComparable to ByteBuffer.

I've tried building the appropriate column name with the Builder.

public static class ReducerToCassandra 
    //              MappedKey     MappedValue  ReducedKey  ReducedValues
    extends Reducer<WordHourPair, IntWritable, ByteBuffer, List<Mutation>>
{
    //                 MappedKey                  Values with the key wordHourPair
    public void reduce(WordHourPair wordHourPair, Iterable<IntWritable> values, 
    Context context) 
        throws IOException, InterruptedException
    {
        int sum = 0;
        for (IntWritable val : values){
        sum += val.get();
        }
        long hour = wordHourPair.getHourLong();

        org.apache.cassandra.thrift.Column c 
            = new org.apache.cassandra.thrift.Column();
        c.setName(ByteBufferUtil.bytes("count");
        c.setValue(ByteBufferUtil.bytes(sum));
        c.setTimestamp(System.currentTimeMillis());

        Mutation mutation = new Mutation();
        mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn());
        mutation.column_or_supercolumn.setColumn(c);

        //New Code
        List<AbstractType<?>> keyTypes = new ArrayList<AbstractType<?>>(); 
        keyTypes.add(UTF8Type.instance);
        keyTypes.add(LongType.instance);
        CompositeType compositeKey = CompositeType.getInstance(keyTypes);

        Builder builder = new Builder(compositeKey);
        builder.add(ByteBufferUtil.bytes(word.toString());
        builder.add(ByteBufferUtil.bytes(hour));

        ByteBuffer keyByteBuffer = builder.build();
        context.write(keyByteBuffer, Collections.singletonList(mutation));
    }
}

But that throws an IOException

java.io.IOException: InvalidRequestException(why:String didn't validate.)
    at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter$RangeClient.run(ColumnFamilyRecordWriter.java:204)
Caused by: InvalidRequestException(why:String didn't validate.)
    at org.apache.cassandra.thrift.Cassandra$batch_mutate_result$batch_mutate_resultStandardScheme.read(Cassandra.java:28232)
    at org.apache.cassandra.thrift.Cassandra$batch_mutate_result$batch_mutate_resultStandardScheme.read(Cassandra.java:28218)
    at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:28152)
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
    at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:1069)
    at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:1055)
    at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter$RangeClient.run(ColumnFamilyRecordWriter.java:196)

This question: Cassandra CQL3 composite key not written by Hadoop reducer seems to exhibit the kind of code I'm looking for, but it calls context.write with parameters of type HashMap, ByteBuffer and I'm not sure how I'd make context.write accept those params.

How can I get the data I want (word-hour keys, int values) into my table?

1

1 Answers

1
votes

The answer to this was to use Cassandra's CQL interface, rather than the Thrift API.

Now I can write to a table with a composite key by declaring the output key/value classes of my reduce class as "Map, List", then create a Map for the composite key, where the Key (of type string) is a column name, and the Value (of type ByteBuffer) is the columns value converted to a ByteBuffer with ByteBufferUtil.

For example, to write to a table defined as such:

CREATE TABLE foo (
    customer_id uuid,
    time timestamp,
    my_value int,
    PRIMARY KEY (customer_id, time)
)

I can write:

String customerID = "the customer's id";
long time = DateTime.now().getMillis();
int myValue = 1;

Map<String, ByteBuffer> key = new Map<String, ByteBuffer>();
key.put("customer_id",ByteBufferUtil.bytes(customerID));
key.put("time",ByteBufferUtil.bytes(time));

List<ByteBuffer> values = Collections.singletonList(ByteBufferUtil.bytes(myValue));

context.write(key, values);