2
votes

I have been trying to run simple map-reduce jobs on data stored in Cassandra using Java-Client 'HECTOR'.

I have already successfully ran the hadoop-wordcount example explained in this beautiful blogpost. I have also read the Hadoop Support article.

But what I want to do is a bit different in terms of implementation (wordcount example uses a script where is mentions mapreduce-site.xml). I would like someone to help me understand how do I run map-reduce jobs in distributed mode rather than local from 'HECTOR' on cassandra data.

My code runs map-reduce jobs SUCCESSFULLY in local mode. But what I want is to run them in distributed mode and write result as a new ColumnFamily in cassandra keyspace.

I might have to set this somewhere (as mentioned in the blogpost mentioned above)
$PATH_TO_HADOOP/conf/mapred-site.xml
for running it in distributed mode, but I don't know where.

Here's my code

public  class test_forum implements Tool {

private String KEYSPACE = "test_forum";
private String COLUMN_FAMILY ="posts";
private String OUTPUT_COLUMN_FAMILY = "output_post_count";
private static String CONF_COLUMN_NAME = "text";


public int run(String[] strings) throws Exception {

    Configuration conf = new Configuration();

    conf.set(CONF_COLUMN_NAME, "text");
    Job job = new Job(conf,"test_forum");

    job.setJarByClass(test_forum.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setReducerClass(ReducerToCassandra.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(ByteBuffer.class);
    job.setOutputValueClass(List.class);

    job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
    job.setInputFormatClass(ColumnFamilyInputFormat.class);


    System.out.println("Job Set");


    ConfigHelper.setRpcPort(job.getConfiguration(), "9160");
    ConfigHelper.setInitialAddress(job.getConfiguration(), "localhost");
    ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");

    ConfigHelper.setInputColumnFamily(job.getConfiguration(),KEYSPACE,COLUMN_FAMILY);
    ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);

    SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes("text")));

    ConfigHelper.setInputSlicePredicate(job.getConfiguration(),predicate);

    System.out.println("running job now..");

    boolean success = job.waitForCompletion(true);

    return success ? 0:1;  //To change body of implemented methods use File | Settings | File Templates.

}



public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable>
{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    private ByteBuffer sourceColumn;
    protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
    throws IOException, InterruptedException
    {
        sourceColumn = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME));
    }

    public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException
    {



        IColumn column = columns.get(sourceColumn);

        if (column == null)  {
            return;
        }

        String value = ByteBufferUtil.string(column.value());
        System.out.println("read " + key + ":" + value + " from " + context.getInputSplit());

        StringTokenizer itr = new StringTokenizer(value);

        while (itr.hasMoreTokens())
        {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }


}

    public static class ReducerToCassandra extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>
{
    private ByteBuffer outputKey;

    public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
    {
        int sum = 0;

        byte[] keyBytes = word.getBytes();
        outputKey = ByteBuffer.wrap(Arrays.copyOf(keyBytes, keyBytes.length));


        for (IntWritable val : values)
            sum += val.get();

        System.out.println(word.toString()+" -> "+sum);
        context.write(outputKey, Collections.singletonList(getMutation(word, sum)));

    }

    private static Mutation getMutation(Text word, int sum)
    {
        Column c = new Column();
        c.setName(Arrays.copyOf(word.getBytes(), word.getLength()));
        c.setValue(ByteBufferUtil.bytes(String.valueOf(sum)));
        c.setTimestamp(System.currentTimeMillis());

        Mutation m = new Mutation();
        m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
        m.column_or_supercolumn.setColumn(c);
        System.out.println("Mutating");
        return m;

    }

}




public static void main(String[] args) throws Exception, ClassNotFoundException, InterruptedException {

    System.out.println("Working..!");

    int ret=ToolRunner.run(new Configuration(), new test_forum(), args);

    System.out.println("Done..!");

    System.exit(ret);

}

}

Here are the Warnings I get :

WARN  - JobClient                  - Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
WARN  - JobClient                  - No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).

But the code runs successfully performing map-reduce tasks, But I don't know where does it write the data.

EDIT : I had not created columnFamily in cassandra for output. Hence it wasn't writing. So now the only problem remains is that How to run it in Distributed mode.

Thank you.

1

1 Answers

2
votes

Did you create a jar with your class?

Hadoop needs a jar in order to be able to propagate your job class over the cluster. If you did not, it explain the "No job jar file set" error, and why you cannot run it in distributed mode. Take care to launch your job with the "hadoop jar ..." command and to add you jar dependencies (at least apache-cassandra!). Your cassandra server must be up and listening thrift port when you submit your job.

By the way, Hadoop and Cassandra do not need Hector. The ColumnFamilyInputFormat (and ColumnFamilyOutputFormat) nows how to read (and write) data to Cassandra by their own. That's why you have to configure RpcPort, InitialAdress and Partionner (and you did it).

Last note: the ColumnFamilyOutputFormat will not create your output column family, it must already exists, otherwise you will get an error when writing.

Hope this helps,

Benoit