0
votes

I have a Hadoop job running in EMR and i am passing the S3 Path as input and output to this Job.

When i run locally everything is working fine.( As there is a single node)

How ever when i run in EMR with 5 node cluster i am running into File Already exists IO Exception.

The output path has a timestamp in it so the out put path doesn't exists in S3.

Error: java.io.IOException: File already exists:s3://<mybucket_name>/8_9_0a4574ca-96d0-47c8-8eb8-4deb82944d4b/customer/RawFile12.txt/1523583593585/TOKENIZED/part-m-00000

I have a very simple hadoop app (primarily my mapper) which reads each line from a file and converts it (using an existing library)

Not sure why each node is trying to write with the same file name.

Here is mapper

public static class TokenizeMapper extends Mapper<Object,Text,Text,Text>{
        public void map(Object key, Text value,Mapper.Context context) throws IOException,InterruptedException{
            //TODO: Invoke Core Engine to transform the Data
            Encryption encryption = new Encryption();
            String tokenizedVal = encryption.apply(value.toString());
            context.write(tokenizedVal,1);
        }
    }

Any my Reducer

public static class TokenizeReducer extends Reducer<Text,Text,Text,Text> {
        public void reduce(Text text,Iterable<Text> lines,Context context) throws IOException,InterruptedException{
            Iterator<Text> iterator = lines.iterator();
            int counter =0;
            while(iterator.hasNext()){
                counter++;
            }

            Text output = new Text(""+counter);
            context.write(text,output);
        }
    }

And my main class

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        long startTime = System.currentTimeMillis();
        try {
            Configuration config = new Configuration();
            String[] additionalArgs = new GenericOptionsParser(config, args).getRemainingArgs();

            if (additionalArgs.length != 2) {
                System.err.println("Usage: Tokenizer Input_File and Output_File ");
                System.exit(2);
            }


            Job job = Job.getInstance(config, "Raw File Tokenizer");
            job.setJarByClass(Tokenizer.class);
            job.setMapperClass(TokenizeMapper.class);
            job.setReducerClass(TokenizeReducer.class);

            job.setNumReduceTasks(0);
            job.setOutputKeyClass(Text.class);
            job.setOutputKeyClass(Text.class);

            FileInputFormat.addInputPath(job, new Path(additionalArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(additionalArgs[1]));

            boolean status = job.waitForCompletion(true);
            if (status) {
                //System.exit(0);
                System.out.println("Completed Job Successfully");
            } else {
                System.out.println("Job did not Succeed");
            }
        }
        catch(Exception e){
            e.printStackTrace();
        }
        finally{
            System.out.println("Total Time for processing =["+(System.currentTimeMillis()-startTime)+"]");
        }
    }

I am passing the arguments when i am launching the cluster as

s3://<mybucket>/8_9_0a4574ca-96d0-47c8-8eb8-4deb82944d4b/customer/RawFile12.txt

s3://<mybucket>/8_9_0a4574ca-96d0-47c8-8eb8-4deb82944d4b/customer/RawFile12.txt/1523583593585/TOKENIZED

Appreciate any inputs.

Thanks

1
Just a query, why keep a txt file location (RawFile12.txt) in the output file path ? Can we not remove that part ? - Deepan Ram
I can remove it. don't think that is the issue as the Key is different even if has the file name. More over the folder is getting created. By the i removed the file name from the path and ran into same issue again. - Sateesh K

1 Answers

0
votes

In the driver code, you have set Reducer to 0, then we do not need the reducer code.

In case you need to clear the output dir before job launch, you can use this snippet to clear the dir if it exists:-

    FileSystem fileSystem = FileSystem.get(<hadoop config object>);

    if(fileSystem.exists(new Path(<pathTocheck>)))
    {
        fileSystem.delete(new Path(<pathTocheck>), true);
    }