1
votes

I am trying to read from S3 from AWS EMR cluster in a beam application using beam-sdks-java-io-hadoop-file-system v2.0.0 and Spark as runner. I can see in yarn logs that the pipeline is able to detect the file present in S3 but it is not able to read the file. Please see the logs below.

17/06/27 03:29:25 INFO FileBasedSource: Filepattern s3a://xxx/test-folder/* matched 1 files with total size 3410584
17/06/27 03:29:25 INFO FileBasedSource: Matched 1 files for pattern s3a://xxx/test-folder/*
17/06/27 03:29:25 INFO FileBasedSource: Splitting filepattern s3a://xxx/test-folder/* into bundles of size 1705292 took 82 ms and produced 1 files and 1 bundles
17/06/27 03:29:25 INFO SparkContext: Starting job: foreach at BoundedDataset.java:109

17/06/27 03:29:33 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-10-130-237-237.vpc.internal:40063 (size: 4.6 KB, free: 3.5 GB) 17/06/27 03:29:36 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, ip-10-130-237-237.vpc.internal): java.lang.RuntimeException: Failed to read data. at org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.tryProduceNext(SourceRDD.java:198) at org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.hasNext(SourceRDD.java:239) at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:41) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:284) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.UnsupportedOperationException: Byte-buffer read unsupported by input stream at org.apache.hadoop.fs.FSDataInputStream.read(FSDataInputStream.java:146) at org.apache.beam.sdk.io.hdfs.HadoopFileSystem$HadoopSeekableByteChannel.read(HadoopFileSystem.java:185) at org.apache.beam.sdk.io.CompressedSource$CompressedReader$CountingChannel.read(CompressedSource.java:509) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:65) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:109) at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) at java.io.FilterInputStream.read(FilterInputStream.java:133) at java.io.PushbackInputStream.read(PushbackInputStream.java:186) at org.apache.beam.sdk.repackaged.com.google.common.io.ByteStreams.read(ByteStreams.java:859) at org.apache.beam.sdk.io.CompressedSource$CompressionMode$1.createDecompressingChannel(CompressedSource.java:127) at org.apache.beam.sdk.io.CompressedSource$DecompressAccordingToFilename.createDecompressingChannel(CompressedSource.java:258) at org.apache.beam.sdk.io.CompressedSource$CompressedReader.startReading(CompressedSource.java:542) at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.startImpl(FileBasedSource.java:471) at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.start(OffsetBasedSource.java:271) at org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.seekNext(SourceRDD.java:214) at org.apache.beam.runners.spark.io.SourceRDD$Bounded$ReaderToIteratorAdapter.tryProduceNext(SourceRDD.java:188) ... 22 more

When I run the same code with HDFS as input file system, it works perfectly. Can someone help me figure out how can I read data from S3? Input format is text file with gzip compression.

Code:

HadoopFileSystemOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(HadoopFileSystemOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getHdfsConfiguration().get(0).get("fs.default.name")))
 .apply(ParDo.of(new PrintLineTransform()));

Running with S3:

--hdfsConfiguration='[{"fs.default.name": "s3a://xxx/test-folder/*"}]

Running with HDFS:

--hdfsConfiguration='[{"fs.default.name": "hdfs://xxx/test-folder/*"}]
1

1 Answers

1
votes

Log says Byte-buffer read unsupported by input stream

Looks like beam requires an API extension in an input stream which is not actually implemented in the S3 client, so fails. That feature is not even in unreleased versions of Hadoop, though the JIRA was added recently HADOOP-14603.

Nobody is going to put their hands up to implement that feature in a hurry as (a) there are more important things to do and (b) it's a minor performance optimisation (memory efficient) and not actually used by anything important. You don't get it in the local filesystem or Azure either.

Fixes:

  1. implement the Hadoop feature, with tests. I'll promise to review it for you.
  2. persuade someone else to do it for you. For us professional hadoop developers, that generally comes about when someone important demands it, be they management or customers.
  3. Get Beam to handle filesystems which don't support the feature & throw UnsupportedOperationException, falling back to read(byte[]). That's all they have to do.

Workarounds

  • store your data in a different compression format
  • copy it to HDFS first.

I'd recommend you start by looking for the error/stack trace on Apache JIRA under BEAM, then file a new report if none exist. See what they say.

Update: keep an eye on BEAM-2500