0
votes
//download file  csv
ByteArrayOutputStream downloadedFile = downloadFile();

//save file in temp folder csv   (
java.io.File tmpCsvFile = save(downloadedFile);

//reading
Dataset<Row> ds = session
        .read()
        .option("header", "true") 
        .csv(tmpCsvFile.getAbsolutePath())

tmpCsvFile saved in the following path:

/mnt/yarn/usercache/hadoop/appcache/application_1511379756333_0001/container_1511379756333_0001_02_000001/tmp/1OkYaovxMsmR7iPoPnb8mx45MWvwr6k1y9xIdh8g7K0Q3118887242212394029.csv

Exception on reading:

org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://ip-33-33-33-33.ec2.internal:8020/mnt/yarn/usercache/hadoop/appcache/application_1511379756333_0001/container_1511379756333_0001_02_000001/tmp/1OkYaovxMsmR7iPoPnb8mx45MWvwr6k1y9xIdh8g7K0Q3118887242212394029.csv;

I think the problem, is that the file is saved locally and when i try to read through spark-sql api it can't find the file. I already tried with sparkContext.addFile() and doesn't work.

Any solutions?

Thanks

1
the file /mnt/yarn/usercache/hadoop/appcache/application_1511379756333_0001/.. seemns to be a local file. Did you tried reading using file:/// option . Try this Dataset<Row> ds = session.read().option("header", "true").csv(file:///tmpCsvFile.getAbsolutePath())Amit Kumar

1 Answers

2
votes

Spark support large number of filesystem, for reading and writing.

  • Local/Regular (file://)
  • S3 (s3://)
  • HDFS (hdfs://)

As a standard behaviour if no URI is specified spark-sql will use hdfs://driver_address:port/path.

The solution of adding file:/// to the path, can work only in the client mode , in my case (cluster) it doesn't. When the driver creates the task for reading the file it'll be passed to an executor to one of the node that doesn't have the file.

What we can do? write a file on Hadoop.

   Configuration conf = new Configuration();
   ByteArrayOutputStream downloadedFile = downloadFile();
   //convert outputstream in inputstream
   InputStream is=Functions.FROM_BAOS_TO_IS.apply(fileOutputStream);
   String myfile="miofile.csv";
   //acquiring the filesystem
   FileSystem fs = FileSystem.get(URI.create(dest),conf);
   //openoutputstream to hadoop
   OutputStream outf = fs.create( new Path(dest));
   //write file 
   IOUtils.copyBytes(tmpIS, outf, 4096, true);
   //commit the read task
   Dataset<Row> ds = session
    .read()
    .option("header", "true") 
    .csv(myfile)

Thanks, any better solution is welcome