1
votes

I recently Setup 4 node Cassandra cluster for learning with one column family which hold time series data as.

Key -> {column name: timeUUID, column value: csv log line, ttl: 1year}, I use Netflix Astyanax java client to load about 1 million log lines.

I also configured Hadoop to run map-reduce jobs with 1 namenode and 4 datanode's to run some analytics on Cassandra data.

All the available examples on internet uses column name as SlicePredicate for Hadoop Job Configuration, where as I have timeUUID as columns how can I efficiently feed Cassandra data to Hadoop Job configurator with batches of 1000 columns at one time.

There are more than 10000 column's for some rows in this test data and expected to be more in real data.


I configure my job as

public int run(String[] arg0) throws Exception {
    Job job = new Job(getConf(), JOB_NAME);
Job.setJarByClass(LogTypeCounterByDate.class);
job.setMapperClass(LogTypeCounterByDateMapper.class);       
job.setReducerClass(LogTypeCounterByDateReducer.class);

job.setInputFormatClass(ColumnFamilyInputFormat.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);

ConfigHelper.setRangeBatchSize(getConf(), 1000);


SliceRange sliceRange = new SliceRange(ByteBuffer.wrap(new byte[0]), 
    ByteBuffer.wrap(new byte[0]), true, 1000);

SlicePredicate slicePredicate = new SlicePredicate();
slicePredicate.setSlice_range(sliceRange);


ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
ConfigHelper.setInputRpcPort(job.getConfiguration(), INPUT_RPC_PORT);
ConfigHelper.setInputInitialAddress(job.getConfiguration(), INPUT_INITIAL_ADRESS);
    ConfigHelper.setInputPartitioner(job.getConfiguration(), INPUT_PARTITIONER);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), slicePredicate);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}

But I can't able to understand how I define Mapper, kindly can you provide template for Mapper class.

public static class LogTypeCounterByDateMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, LongWritable>
{
    private Text key = null;
    private LongWritable value = null;

    @Override
    protected void setup(Context context){

    }

    public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context){
        //String[] lines = columns.;

    }
}
1
Can someone kindly reply to my query, got struck with this from last 2 days. Thanks in advance.user1793389

1 Answers

0
votes
ConfigHelper.setRangeBatchSize(getConf(), 1000)
...
SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(TimeUUID.asByteBuffer(startValue), TimeUUID.asByteBuffer(endValue), false, 1000))
ConfigHelper.setInputSlicePredicate(conf, predicate)