I am trying to read from S3 from AWS EMR cluster in a beam application using beam-sdks-java-io-hadoop-file-system v2.0.0 and Spark as runner. I can see in yarn logs that the pipeline is able to detect the file present in S3 but it is not able to read the file. Please see the logs below.
17/06/27 03:29:25 INFO FileBasedSource: Filepattern s3a://xxx/test-folder/* matched 1 files with total size 3410584
17/06/27 03:29:25 INFO FileBasedSource: Matched 1 files for pattern s3a://xxx/test-folder/*
17/06/27 03:29:25 INFO FileBasedSource: Splitting filepattern s3a://xxx/test-folder/* into bundles of size 1705292 took 82 ms and produced 1 files and 1 bundles
17/06/27 03:29:25 INFO SparkContext: Starting job: foreach at BoundedDataset.java:109
17/06/27 03:29:33 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-130-237-237.vpc.internal:40063 (size: 4.6 KB, free: 3.5 GB) 17/06/27 03:29:36 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-130-237-237.vpc.internal): java.lang.RuntimeException: Failed to read data. at org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.tryProduceNext(SourceRDD.java:198) at org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.hasNext(SourceRDD.java:239) at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:284) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.UnsupportedOperationException: Byte-buffer read unsupported by input stream at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:146) at org.apache.beam.sdk.io.hdfs.HadoopFileSystem$HadoopSeekableByteChannel.read(HadoopFileSystem.java:185) at org.apache.beam.sdk.io.CompressedSource$CompressedReader$CountingChannel.read(CompressedSource.java:509) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at java.io.FilterInputStream.read(FilterInputStream.java:133) at java.io.PushbackInputStream.read(PushbackInputStream.java:186) at org.apache.beam.sdk.repackaged.com.google.common.io.ByteStreams.read(ByteStreams.java:859) at org.apache.beam.sdk.io.CompressedSource$CompressionMode$1.createDecompressingChannel(CompressedSource.java:127) at org.apache.beam.sdk.io.CompressedSource$DecompressAccordingToFilename.createDecompressingChannel(CompressedSource.java:258) at org.apache.beam.sdk.io.CompressedSource$CompressedReader.startReading(CompressedSource.java:542) at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:471) at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:271) at org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.seekNext(SourceRDD.java:214) at org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.tryProduceNext(SourceRDD.java:188) ... 22 more
When I run the same code with HDFS
as input file system, it works perfectly. Can someone help me figure out how can I read data from S3? Input format is text file with gzip compression.
Code:
HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(HadoopFileSystemOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getHdfsConfiguration().get(0).get("fs.default.name")))
.apply(ParDo.of(new PrintLineTransform()));
Running with S3:
--hdfsConfiguration='[{"fs.default.name": "s3a://xxx/test-folder/*"}]
Running with HDFS:
--hdfsConfiguration='[{"fs.default.name": "hdfs://xxx/test-folder/*"}]