The problem is as I said earlier was that Java IO (i.e. File Class, Path class,...) treats paths as in local file system whereas Hadoop Io (FileSystem class, Path class,...) treats paths as in HDFS.
Please have a look here:
read/write from/in HDFS
Using FileSystem API to read and write data to HDFS
Reading data from and writing data to Hadoop Distributed File System (HDFS) can be done in a lot of ways. Now let us start by using the FileSystem API to create and write to a file in HDFS, followed by an application to read a file from HDFS and write it back to the local file system.
Step 1: Once you have downloaded a test dataset, we can write an application to read a file from the local file system and write the contents to Hadoop Distributed File System.
package com.hadoop.hdfs.writer;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.ToolRunner;
public class HdfsWriter extends Configured implements Tool {
public static final String FS_PARAM_NAME = "fs.defaultFS";
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("HdfsWriter [local input path] [hdfs output path]");
return 1;
}
String localInputPath = args[0];
Path outputPath = new Path(args[1]);
Configuration conf = getConf();
System.out.println("configured filesystem = " + conf.get(FS_PARAM_NAME));
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
System.err.println("output path exists");
return 1;
}
OutputStream os = fs.create(outputPath);
InputStream is = new BufferedInputStream(new FileInputStream(localInputPath));
IOUtils.copyBytes(is, os, conf);
return 0;
}
public static void main( String[] args ) throws Exception {
int returnCode = ToolRunner.run(new HdfsWriter(), args);
System.exit(returnCode);
}
}
Step 2: Export the Jar file and run the code from terminal to write a sample file to HDFS:
[training@localhost ~]$ hadoop jar HdfsWriter.jar com.hadoop.hdfs.writer.HdfsWriter sample.txt /user/training/HdfsWriter_sample.txt
Step 3: Verify whether the file is written into HDFS and check the contents of the file:
[training@localhost ~]$ hadoop fs -cat /user/training/HdfsWriter_sample.txt
Step 4: Next, we write an application to read the file we just created in Hadoop Distributed File System and write its contents back to the local file system:
package com.hadoop.hdfs.reader;
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class HdfsReader extends Configured implements Tool {
public static final String FS_PARAM_NAME = "fs.defaultFS";
public int run(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("HdfsReader [hdfs input path] [local output path]");
return 1;
}
Path inputPath = new Path(args[0]);
String localOutputPath = args[1];
Configuration conf = getConf();
System.out.println("configured filesystem = " + conf.get(FS_PARAM_NAME));
FileSystem fs = FileSystem.get(conf);
InputStream is = fs.open(inputPath);
OutputStream os = new BufferedOutputStream(new FileOutputStream(localOutputPath));
IOUtils.copyBytes(is, os, conf);
return 0;
}
public static void main( String[] args ) throws Exception {
int returnCode = ToolRunner.run(new HdfsReader(), args);
System.exit(returnCode);
}
}
Step 5: Export the Jar file and run the code from terminal to write a sample file to HDFS:
[training@localhost ~]$ hadoop jar HdfsReader.jar com.hadoop.hdfs.reader.HdfsReader /user/training/HdfsWriter_sample.txt /home/training/HdfsReader_sample.txt
Step 6: Verify whether the file is written back into local file system:
[training@localhost ~]$ hadoop fs -cat /user/training/HdfsWriter_sample.txt
FileSystem is an abstract class that represents a generic file system. Most Hadoop file system implementations can be accessed and updated through the FileSystem object. To create an instance of the HDFS, you call the method FileSystem.get(). The FileSystem.get() method will look at the URI assigned to the fs.defaultFS parameter of the Hadoop configuration files on your classpath and choose the correct implementation of the FileSystem class to instantiate. The fs.defaultFS parameter of HDFS has the value hdfs://.
Once an instance of the FileSystem class has been created, the HdfsWriter class calls the create() method to create a file in HDFS. The create() method returns an OutputStream object, which can be manipulated using normal Java I/O methods. Similarly HdfsReader calls the method open() to open a file in HDFS, which returns an InputStream object that can be used to read the contents of the file.
The FileSystem API is extensive. To demonstrate some of the other methods available in the API, we can add some error checking to the HdfsWriter and HdfsReader classes we created.
To check whether the file exists before we call create(), use:
boolean exists = fs.exists(inputPath);
To check whether the path is a file, use:
boolean isFile = fs.isFile(inputPath);
To rename a file that already exits, use:
boolean renamed = fs.rename(inputPath, new Path("old_file.txt"));