4
votes

I have written a mapreduce program to process logs.The job writes side data in addition to actual output to a different location external to output path set in Driver code.But with speculative execution enabled ,the output of killed task attempts are not removed.Is there way to avoid this problem ? Is it possible to solve the issue other than writing to normal output location and copying to external location after job completion ?

Is it possible to solve this issue using 'OutputCommitter' ?

Has anyone tried this ? Any help would be appreciated.

2

2 Answers

1
votes

Yes, FileOutputCommitter can be used that moves the contents of the temporary task directory to final output directory when a task succeeds, and deletes the original task directory.

I believe most of the the built-in output formats extending FileOutputFormat in Hadoop uses a OutputCommitter, by default which is FileOutputCommitter.

This is the code from FileOutputFormat

public synchronized 
     OutputCommitter getOutputCommitter(TaskAttemptContext context
                                        ) throws IOException {
    if (committer == null) {
      Path output = getOutputPath(context);
      committer = new FileOutputCommitter(output, context);
    }
    return committer;
  }

To write to multiple paths you can probably look into MultipleOutputs, that by default uses the OutputCommitter.

Or you can create your own output format and extend FileOutputFomat and override the above function in FileOutputFormat, create your own OutputCommitter implementation looking at the FileOutputCommitter code.

In the FileOoutputcommitter code you will find the function, which you might be interested in -

    /**
   * Delete the work directory
   */
  @Override
  public void abortTask(TaskAttemptContext context) {
    try {
      if (workPath != null) { 
        context.progress();
        outputFileSystem.delete(workPath, true);
      }
    } catch (IOException ie) {
      LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
    }
  }

If a task succeeds then commitTask()is called, which in the default implementation moves the temporaray task output directory (which has the task attempt ID in its name to avoid conflicts between task attempts) to the final output path, ${mapred.out put.dir}. Otherwise, the framework calls abortTask(), which deletes the temporary task output directory.

0
votes

To avoid _logs and _SUCCESS files being created in the mapreduce output folder you may use the below settings:
conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); conf.set("hadoop.job.history.user.location", "none");