I'm using Apache Hadoop, MapReduce and Cassandra to run a MapReduce job that reads in from a Cassandra table, and outputs to another Cassandra table.
I have a few jobs that output to a table with a single primary key. For example, this table for counting the number of each type of word has a single key.
CREATE TABLE word_count(
word text,
count int,
PRIMARY KEY(text)
) WITH COMPACT STORAGE;
The associated reduce class looks a bit like this:
public static class ReducerToCassandra
extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>
{
public void reduce(Text word, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values){
sum += val.get();
}
org.apache.cassandra.thrift.Column c
= new org.apache.cassandra.thrift.Column();
c.setName(ByteBufferUtil.bytes("count");
c.setValue(ByteBufferUtil.bytes(sum));
c.setTimestamp(System.currentTimeMillis());
Mutation mutation = new Mutation();
mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn());
mutation.column_or_supercolumn.setColumn(c);
ByteBuffer keyByteBuffer = ByteBufferUtil.bytes(word.toString());
context.write(keyByteBuffer, Collections.singletonList(mutation));
}
}
If I want to add an extra column, then I just need to add another mutation to the List<Mutation>
already being output by reduce
but I can't work out how to output to a table that has the new column in a composite primary key. For example, this table does the same as the one above, but also indexes words along with the hour of their publication.
CREATE TABLE word_count(
word text,
publication_hour bigint,
count int,
PRIMARY KEY(word, publication_hour)
) WITH COMPACT STORAGE;
I've tried a few different approaches, like trying to output a custom WritableComparable
(that holds both a word and a hour) and updating the class
and method
signatures and job
configuration accordingly, but that makes reduce
throw a ClassCastException
when it tries to cast the custom WritableComparable
to ByteBuffer
.
I've tried building the appropriate column name with the Builder
.
public static class ReducerToCassandra
// MappedKey MappedValue ReducedKey ReducedValues
extends Reducer<WordHourPair, IntWritable, ByteBuffer, List<Mutation>>
{
// MappedKey Values with the key wordHourPair
public void reduce(WordHourPair wordHourPair, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values){
sum += val.get();
}
long hour = wordHourPair.getHourLong();
org.apache.cassandra.thrift.Column c
= new org.apache.cassandra.thrift.Column();
c.setName(ByteBufferUtil.bytes("count");
c.setValue(ByteBufferUtil.bytes(sum));
c.setTimestamp(System.currentTimeMillis());
Mutation mutation = new Mutation();
mutation.setColumn_or_supercolumn(new ColumnOrSuperColumn());
mutation.column_or_supercolumn.setColumn(c);
//New Code
List<AbstractType<?>> keyTypes = new ArrayList<AbstractType<?>>();
keyTypes.add(UTF8Type.instance);
keyTypes.add(LongType.instance);
CompositeType compositeKey = CompositeType.getInstance(keyTypes);
Builder builder = new Builder(compositeKey);
builder.add(ByteBufferUtil.bytes(word.toString());
builder.add(ByteBufferUtil.bytes(hour));
ByteBuffer keyByteBuffer = builder.build();
context.write(keyByteBuffer, Collections.singletonList(mutation));
}
}
But that throws an IOException
java.io.IOException: InvalidRequestException(why:String didn't validate.)
at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter$RangeClient.run(ColumnFamilyRecordWriter.java:204)
Caused by: InvalidRequestException(why:String didn't validate.)
at org.apache.cassandra.thrift.Cassandra$batch_mutate_result$batch_mutate_resultStandardScheme.read(Cassandra.java:28232)
at org.apache.cassandra.thrift.Cassandra$batch_mutate_result$batch_mutate_resultStandardScheme.read(Cassandra.java:28218)
at org.apache.cassandra.thrift.Cassandra$batch_mutate_result.read(Cassandra.java:28152)
at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:78)
at org.apache.cassandra.thrift.Cassandra$Client.recv_batch_mutate(Cassandra.java:1069)
at org.apache.cassandra.thrift.Cassandra$Client.batch_mutate(Cassandra.java:1055)
at org.apache.cassandra.hadoop.ColumnFamilyRecordWriter$RangeClient.run(ColumnFamilyRecordWriter.java:196)
This question: Cassandra CQL3 composite key not written by Hadoop reducer seems to exhibit the kind of code I'm looking for, but it calls context.write
with parameters of type HashMap, ByteBuffer
and I'm not sure how I'd make context.write
accept those params.
How can I get the data I want (word-hour keys, int values) into my table?