I need to output the results of a MR job to multiple CQL3 column families.
In my reducer, I specify the CF using MultipleOutputs, but all the results are written to the one CF defined in the job's OutputCQL statement.
Job definiton:
...
job.setOutputFormatClass(CqlOutputFormat.class);
ConfigHelper.setOutputKeyspace(job.getConfiguration(), "keyspace1");
MultipleOutputs.addNamedOutput(job, "CF1", CqlOutputFormat.class, Map.class, List.class);
MultipleOutputs.addNamedOutput(job, "CF2", CqlOutputFormat.class, Map.class, List.class);
CqlConfigHelper.setOutputCql(job.getConfiguration(), "UPDATE keyspace1.CF1 SET value = ? ");
...
Reducer class setup:
mos = new MultipleOutputs(context);
Reduce method (psudo code):
keys = new LinkedHashMap<>();
keys.put("key", ByteBufferUtil.bytes("rowKey"));
keys.put("name", ByteBufferUtil.bytes("columnName"));
List<ByteBuffer> variables = new ArrayList<>();
variables.add(ByteBufferUtil.bytes("columnValue"));
mos.write("CF2", keys, variables);
The problem is that my reducer ignores the CF I specify in mos.write() and instead must just run the outputCQL. So in the example above, everything is written to CF1.
Ive tried using a prepared statement to inject the CF into the outputCQL, along the lines of "UPDATE keyspace1.? SET value = ?", but I dont think its possible to use a placeholder for the CF like this.
Is there any way I can overwrite the outputCQL inside the reducer class?