1
votes

How do you access a Cassandra column family from within a mapper? Specifically, how do you convert the arguments to the map() method back to the java types I expect?

Key {logType} -> {column name: timeUUID, column value: csv log line, ttl: 1year}


Thanks to @Chris & @rs_atl

I successfully run hadoop job, here is complete code:
package com.xxx.hadoop;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.SortedMap;


import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.thrift.TBaseHelper;

import com.xxx.parser.LogParser;
import com.netflix.astyanax.serializers.StringSerializer;

public class LogTypeCounterByDate extends Configured implements Tool {
    private static final String KEYSPACE = "LogKS";
    private static final String COLUMN_FAMILY = "LogBlock";
    private static final String JOB_NAME = "LOG_LINE_COUNT";
    private static final String INPUT_PARTITIONER = "org.apache.cassandra.dht.RandomPartitioner";
    private static final String INPUT_RPC_PORT = "9160";
    private static final String INPUT_INITIAL_ADDRESS = "192.168.1.21";
    private static final String OUTPUT_PATH = "/logOutput/results";

    @Override
    public int run(String[] args) throws Exception {

        //Configuration conf = new Configuration();

        Job job = new Job(getConf(), JOB_NAME);
        job.setJarByClass(LogTypeCounterByDate.class);
        job.setMapperClass(LogTypeCounterByDateMapper.class);       
        job.setReducerClass(LogTypeCounterByDateReducer.class);

        job.setInputFormatClass(ColumnFamilyInputFormat.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        job.setNumReduceTasks(1);
        ConfigHelper.setRangeBatchSize(getConf(), 1000);

        /*SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[0]), 
                ByteBuffer.wrap(new byte[0]), true, 1));*/
        SliceRange sliceRange = new SliceRange(ByteBuffer.wrap(new byte[0]), 
                ByteBuffer.wrap(new byte[0]), true, 1000);

        SlicePredicate slicePredicate = new SlicePredicate();
        slicePredicate.setSlice_range(sliceRange);


        ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
        ConfigHelper.setInputRpcPort(job.getConfiguration(), INPUT_RPC_PORT);
        ConfigHelper.setInputInitialAddress(job.getConfiguration(), INPUT_INITIAL_ADDRESS);
        ConfigHelper.setInputPartitioner(job.getConfiguration(), INPUT_PARTITIONER);
        ConfigHelper.setInputSlicePredicate(job.getConfiguration(), slicePredicate);

        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }

    public static void main(String[] args) throws Exception{
        ToolRunner.run(new Configuration(), new LogTypeCounterByDate(), args);
        System.exit(0);
    }


    public static class LogTypeCounterByDateMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, LongWritable>
    {

        @SuppressWarnings("rawtypes")
        @Override
        protected void setup(Mapper.Context context){

        }

        @SuppressWarnings({ })
        public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException{
            //String[] lines = columns.;
            String rowkey = StringSerializer.get().fromByteBuffer(TBaseHelper.rightSize(key));  
            Iterator<ByteBuffer> iter = columns.keySet().iterator();
            IColumn column;
            String line;
            LogParser lp = null;

            while(iter.hasNext()){
                column = columns.get(iter.next());
                line = StringSerializer.get().fromByteBuffer(TBaseHelper.rightSize(column.value()));
                lp = new LogParser(line);               
                context.write(new Text(rowkey + "\t" + "LineCount"), new LongWritable(1L));
                context.write(new Text(rowkey + "\t" + "Minutes"), new LongWritable(lp.getTotalDuration()));
            }
        }
    }

    public static class LogTypeCounterByDateReducer extends Reducer<Text, LongWritable, Text, LongWritable>
    {           

        public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException{
            long total = 0;
            for(LongWritable val : values){
                total += val.get();
            }
            context.write(key, new LongWritable(total));
        }
    }               
}

ConfigHelper.setRangeBatchSize(getConf(), 1000);

        /*SlicePredicate predicate = new   SlicePredicate().setSlice_range(new SliceRange(ByteBuffer.wrap(new byte[0]), 
                ByteBuffer.wrap(new byte[0]), true, 1));*/
        SliceRange sliceRange = new SliceRange(ByteBuffer.wrap(new byte[0]), 
                ByteBuffer.wrap(new byte[0]), true, 1000);

        SlicePredicate slicePredicate = new SlicePredicate();
        slicePredicate.setSlice_range(sliceRange);

the above code feeds only 1000 columns to mapper for each row where as I want to feed all columns for every row in batches of 1000 columns each time.

Kindly someone help me in this.

1

1 Answers

4
votes

Given parameters:

ByteBuffer key;
SortedMap<ByteBuffer, IColumn> columns;

You'd use:

String rowkey = StringSerializer.get().fromByteBuffer(TBaseHelper.rightSize(key))

to get the deserialized key value. Note that the assumption here is that the row key is a String. If it's some other type, you'd have to use the appropriate serializer class.

To get the column values, do something like:

Iterator<ByteBuffer> = columns.keySet().iterator(); 
while (iter.hasNext()) {
    IColumn col = columns.get(iter.next()); 
    xxx colVal = xxxSerializer.get().fromByteBuffer(TBaseHelper.rightSize(col.value()));
}

Where xxx is the Java type of the column value and xxxSerializer is the corresponding serializer.

By the way, the TBaseHelper class is used to correct the offset of the value in the internal byte array to zero, enforcing an assumption made by the serializer implementations.

So one more thing... If you're retrieving time series then each column is its own time series value and you'll want to include appropriate mapper logic (like some kind of mathematical operations and a write to the context) inside the iteration loop over the columns. If instead you have a more static column family (more like the traditional sql tables) then you'll probably have a single write to context for the entire row.