I am trying to split a large avro file in HDFS into multiple avro files based on the value in a field in the input file. Following sections show my mapper, reducer and driver programs.
Now everything works fine but the output files are getting named as 01-r-00000.avro, 02-r-00000.avro...
instead of stock-r-00000.avro, stock-r-00001.avro
What am I missing out?
Thanks
Mapper:
public static class CustomFileSplitMapper extends Mapper<AvroKey<GenericRecord>, NullWritable, AvroKey<String>, AvroValue<GenericRecord>> {
@Override
public void map(AvroKey<GenericRecord> key, NullWritable value, Context context)
throws IOException, InterruptedException {
GenericRecord record = key.datum();
LOGGER.info(record);
AvroValue<GenericRecord> outValue = new AvroValue<GenericRecord>(record);
context.write(new AvroKey<String>((String) record.get("date")), outValue);
}
}
Reducer:
public static class CustomFileSplitReducer extends Reducer<AvroKey<String>, AvroValue<GenericRecord>, AvroKey<GenericRecord>, NullWritable> {
private AvroMultipleOutputs amos;
private String outputPath;
@Override
protected void setup(Context context) {
outputPath = context.getConfiguration().get("outputPath");
amos = new AvroMultipleOutputs(context);
}
@Override
public void reduce(AvroKey<String> key, Iterable<AvroValue<GenericRecord>> values, Context context)
throws IOException, InterruptedException {
for (AvroValue<GenericRecord> value : values) {
String datePath = "daily" + File.separator + LocalDate.parse(new String(key.datum().getBytes()),
DateTimeFormatter.ofPattern("yyyyMMdd")).format(DateTimeFormatter.ofPattern("yyyy/MM/dd"));
GenericRecord record = value.datum();
amos.write("stock", new AvroKey<GenericRecord>(record), NullWritable.get(),
outputPath + File.separator + datePath);
}
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
amos.close();
}
}
Driver:
Configuration conf = new Configuration();
conf.set("outputPath", props.getString("outputPath"));
Job job = Job.getInstance(conf, "CustomFileSplitter");
job.setJarByClass(CustomFileSplitter.class);
job.setMapperClass(CustomFileSplitMapper.class);
job.setReducerClass(CustomFileSplitReducer.class);
FileInputFormat.addInputPath(job, new Path(props.getString("inputPath")));
FileOutputFormat.setOutputPath(job, new Path(props.getString("outputPath")));
job.setInputFormatClass(AvroKeyInputFormat.class);
LazyOutputFormat.setOutputFormatClass(job, AvroKeyOutputFormat.class);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(AvroValue.class);
Schema schema = SchemaExtractor.extract(new Path(props.getString("inputPath")));
AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
AvroJob.setMapOutputValueSchema(job, schema);
AvroJob.setOutputKeySchema(job, schema);
AvroJob.setInputKeySchema(job, schema);
AvroMultipleOutputs.addNamedOutput(job, "stock", AvroKeyOutputFormat.class, schema);