0
votes

I need to output the results of a MR job to multiple CQL3 column families.

In my reducer, I specify the CF using MultipleOutputs, but all the results are written to the one CF defined in the job's OutputCQL statement.

Job definiton:

...
job.setOutputFormatClass(CqlOutputFormat.class);
ConfigHelper.setOutputKeyspace(job.getConfiguration(), "keyspace1");
MultipleOutputs.addNamedOutput(job, "CF1", CqlOutputFormat.class, Map.class, List.class);
MultipleOutputs.addNamedOutput(job, "CF2", CqlOutputFormat.class, Map.class, List.class);
CqlConfigHelper.setOutputCql(job.getConfiguration(), "UPDATE keyspace1.CF1 SET value = ? ");
...

Reducer class setup:

mos = new MultipleOutputs(context);

Reduce method (psudo code):

keys = new LinkedHashMap<>();
keys.put("key", ByteBufferUtil.bytes("rowKey"));
keys.put("name", ByteBufferUtil.bytes("columnName"));

List<ByteBuffer> variables = new ArrayList<>();
variables.add(ByteBufferUtil.bytes("columnValue"));

mos.write("CF2", keys, variables);

The problem is that my reducer ignores the CF I specify in mos.write() and instead must just run the outputCQL. So in the example above, everything is written to CF1.

Ive tried using a prepared statement to inject the CF into the outputCQL, along the lines of "UPDATE keyspace1.? SET value = ?", but I dont think its possible to use a placeholder for the CF like this.

Is there any way I can overwrite the outputCQL inside the reducer class?

1

1 Answers

0
votes

So the simple answer is that you cannot output results from a mr job to multiple CFs. However, having the need to do this actually highlights a flaw in the approach, rather than a missing feature in Hadoop.

Instead of processing a bunch of records and trying to produce 2 different results sets in one pass, a better approach is to arrive at the desired result sets iteratively. Basically, this means having multiple jobs iterating over the results of previous jobs until the desired results are achieved.