3
votes

I am new to Hive and MapReduce and would really appreciate your answer and also provide a right approach.

I have defined an external table logs in hive partitioned on date and origin server with an external location on hdfs /data/logs/. I have a MapReduce job which fetches these logs file and splits them and stores under the folder mentioned above. Like

"/data/logs/dt=2012-10-01/server01/"
"/data/logs/dt=2012-10-01/server02/"
...
...

From MapReduce job I would like add partitions to the table logs in Hive. I know the two approaches

  1. alter table command -- Too many alter table commands
  2. adding dynamic partitions

For approach two I see only examples of INSERT OVERWRITE which is not an options for me. Is there a way to add these new partitions to the table after the end of the job?

3

3 Answers

3
votes

To do this from within a Map/Reduce job I would recommend using Apache HCatalog, which is a new project stamped under Hadoop.

HCatalog really is an abstraction layer on top of HDFS so you can write your outputs in a standardized way, be it from Hive, Pig or M/R. Where this comes into the picture for you, is that you can directly load data in Hive from your Map/Reduce job using the output format HCatOutputFormat. Below is an example taken from the official website.

A current code example for writing out a specific partition for (a=1,b=1) would go something like this:

Map<String, String> partitionValues = new HashMap<String, String>();
partitionValues.put("a", "1");
partitionValues.put("b", "1");
HCatTableInfo info = HCatTableInfo.getOutputTableInfo(dbName, tblName, partitionValues);
HCatOutputFormat.setOutput(job, info);

And to write to multiple partitions, separate jobs will have to be kicked off with each of the above.

You can also use dynamic partitions with HCatalog, in which case you could load as many partitions as you want in the same job !

I recommend reading further on HCatalog on the website provided above, which should give you more details if needed.

3
votes

In reality, things are a little more complicated than that, which is unfortunate because it is undocumented in official sources (as of now), and it takes a few days of frustration to figure out.

I've found that I need to do the following to get HCatalog Mapreduce jobs to work with writing to dynamic partitions:

In my record writing phase of my job (usually the reducer), I have to manually add my dynamic partitions (HCatFieldSchema) to my HCatSchema objects.

The trouble is that HCatOutputFormat.getTableSchema(config) does not actually return partitioned fields. They need to be manually added

HCatFieldSchema hfs1 = new HCatFieldSchema("date", Type.STRING, null);
HCatFieldSchema hfs2 = new HCatFieldSchema("some_partition", Type.STRING, null);
schema.append(hfs1);
schema.append(hfs2);
0
votes

Here's the code for writing into multiple tables with dynamic partitioning in one job using HCatalog, the code has been tested on Hadoop 2.5.0, Hive 0.13.1:

// ... Job setup, InputFormatClass, etc ...
String dbName = null;
String[] tables = {"table0", "table1"};

job.setOutputFormatClass(MultiOutputFormat.class);
MultiOutputFormat.JobConfigurer configurer = MultiOutputFormat.createConfigurer(job);

List<String> partitions = new ArrayList<String>();
partitions.add(0, "partition0");
partitions.add(1, "partition1");

HCatFieldSchema partition0 = new HCatFieldSchema("partition0", TypeInfoFactory.stringTypeInfo, null);
HCatFieldSchema partition1 = new HCatFieldSchema("partition1", TypeInfoFactory.stringTypeInfo, null);

for (String table : tables) {
    configurer.addOutputFormat(table, HCatOutputFormat.class, BytesWritable.class, CatRecord.class);

    OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, table, null);
    outputJobInfo.setDynamicPartitioningKeys(partitions);

    HCatOutputFormat.setOutput(
        configurer.getJob(table), outputJobInfo
    );

    HCatSchema schema = HCatOutputFormat.getTableSchema(configurer.getJob(table).getConfiguration());
    schema.append(partition0);
    schema.append(partition1);

    HCatOutputFormat.setSchema(
        configurer.getJob(table),
        schema
    );
}
configurer.configure();

return job.waitForCompletion(true) ? 0 : 1;

Mapper:

public static class MyMapper extends Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        HCatRecord record = new DefaultHCatRecord(3); // Including partitions
        record.set(0, value.toString());

        // partitions must be set after non-partition fields
        record.set(1, "0"); // partition0=0
        record.set(2, "1"); // partition1=1

        MultiOutputFormat.write("table0", null, record, context);
        MultiOutputFormat.write("table1", null, record, context);
    }
}