0
votes

I am using local spark to read and write from s3. To write back to s3 i am using from java concurrent util so that i can write in a multi threaded fashion.

Here is my implementation of that

ConvertToCsv This method has spark.write action

 for ( String Id: listOfId) {

                Future<?> future = executor.submit( () -> {

                ConvertToCsv( dataFrame, destinationPath, Id);
            } );
            futures.add( future );

        } 

I am getting this error!

No such file or directory: s3a://kira-bucket-parquet/collection/82709dd1-8924-481c-9d93-14a9e2e0c524/5e67e9d5-2d8b-4c4b-928a-4736485af3ca/_temporary/0 at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2269) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2163) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2102) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerListStatus(S3AFileSystem.java:1903) at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listStatus$9(S3AFileSystem.java:1882) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:1882) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1919) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1961) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:213) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala

The solution i came across is configuring s3a commiters.

How do i configure S3a commiters in local spark? and is there any alternative solution ?

1
Assuming your data is sufficiently parallelized, spark.write.csv("s3://...") will already write your data to S3 concurrently out of the box. What are you trying to accomplish by adding Futures on top of this? - Charlie Flowers
The use case here is, The parquet file which I read is partitioned by ID. Processing and writing back to S3 as csv of each ID is independent so that for each ID can be processed parallel. It doesn't throw the error immediately.. it process some ID writes to S3.. suppose if we have 5 IDs 4 IDs is processed and written to S3 only while processing that one I always get the above error @cricket_007 - Kiran Hebbar
I only edited the question because it's not about Hadoop. In any case, I would recommend saving your data back as Parquet, not CSV, you could also partition the dataframe yourself, you probably shouldn't use a loop - OneCricketeer

1 Answers

0
votes

To safely commit work -even locally- you can use the S3A committers.

Although they are in the hadoop-aws JAR, they are designed and tested for spark as well as MapReduce.

Consult the documentation