6
votes

I am trying to write a Hadoop mapper class in Scala. As a starting point, I have taken a Java example from the book "Hadoop: the Definitive Guide" and tried to port it to Scala.

The original Java class extends org.apache.hadoop.mapreduce.Mapper:

public class MaxTemperatureMapper
    extends Mapper<LongWritable, Text, Text, IntWritable>

and overrides the method

public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException

This methods gets called and works properly (I tested using a unit test and then run it with yarn).

My attempt at a Scala port is:

class MaxTemperatureMapperS extends Mapper[LongWritable, Text, Text, IntWritable]

and then the method

@throws(classOf[IOException])
@throws(classOf[InterruptedException])
override def map(key: LongWritable, value: Text, context: Context): Unit = 
{
    ...
}

but the Scala compiler issues an error:

error: method map overrides nothing.

So I thought the two methods had the same signature in Scala and Java, but apparently I am missing something. Can you give me some hint?

2

2 Answers

7
votes

Sometimes the best way to do this is by letting your IDE work for you:

class Test extends Mapper[LongWritable, Text, Text, IntWritable] {
  override def map(key: LongWritable, value: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context): Unit = ???
}

In this case the problem is that the definition of the class Context "lives" inside the class Mapper so you need to use the # syntax

0
votes

For reference providing code for overriding both map and reduce method in Mapper and Reducer class respectively in scala

Mapper Example :

class MaxTemperatureMapper extends Mapper[LongWritable, Text, AvroKey[Integer], AvroValue[GenericRecord]] {

  val parser = new NcdcRecordParser()
  val record = new GenericData.Record(AvroSchema.SCHEMA)

  @throws(classOf[IOException])
  @throws(classOf[InterruptedException])
  override def map(key: LongWritable, value: Text, context:Mapper[LongWritable, Text, AvroKey[Integer], AvroValue[GenericRecord]]#Context) = {

Reducer Example :

class MaxTemperatureReducer extends  Reducer[AvroKey[Integer],AvroValue[GenericRecord],AvroKey[GenericRecord],NullWritable]{

  @throws(classOf[IOException])
  @throws(classOf[InterruptedException])
  override def reduce(key:AvroKey[Integer],values:java.lang.Iterable[AvroValue[GenericRecord]],
      context:Reducer[AvroKey[Integer],AvroValue[GenericRecord],AvroKey[GenericRecord],NullWritable]#Context) = {