3
votes

We are using Flink 1.2.0 with the suggested S3AFileSystem configuration. A simple streaming job works as expected when its source is a single folder within an S3 bucket.

The job runs without errors--but does not produce output--when its source is a folder which itself contains subfolders.

For clarity, below is a model of the S3 bucket. Running the job to point to s3a://bucket/folder/2017/04/25/01/ properly reads all three objects and any subsequent objects that appear in the bucket. Pointing the job to s3a://bucket/folder/2017/ (or any other intermediate folder) leads to a job that runs without producing anything.

In fits of desperation, we've tried the permutations that [in|ex]clude the trailing /.

.
`-- folder
    `-- 2017
        `-- 04
            |-- 25
            |   |-- 00
            |   |   |-- a.txt
            |   |   `-- b.txt
            |   `-- 01
            |       |-- c.txt
            |       |-- d.txt
            |       `-- e.txt
            `-- 26

Job code:

def main(args: Array[String]) {

  val parameters = ParameterTool.fromArgs(args)
  val bucket = parameters.get("bucket")
  val folder = parameters.get("folder")

  val path = s"s3a://$bucket/$folder"

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val lines: DataStream[String] = env.readFile(
    inputFormat = new TextInputFormat(new Path(path)),
    filePath = path,
    watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
    interval = Time.seconds(10).toMilliseconds)

  lines.print()
  env.execute("Flink Streaming Scala API Skeleton")
}

core-site.xml is configured per the docs:

<configuration>
  <property>
    <name>fs.s3a.impl</name>
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
  </property>
  <property>
    <name>fs.s3a.buffer.dir</name>
    <value>/tmp</value>
  </property>
</configuration>

We have included all the jars for S3AFileSystem listed here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/aws.html#flink-for-hadoop-27

We are stumped. This seems like it should work; there are plenty of breadcrumbs on the internet that indicate that this did work. [e.g., http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Reading-files-from-an-S3-folder-td10281.html]

Help me, fellow squirrels... you're my only hope!

3
possibly relevant item that I forgot to add... issues.apache.org/jira/browse/HADOOP-13208 indicates changes to S3A's listFiles in version 2.8.0. Flink docs suggest using hadoop-aws-2.7.2.jar.StephenWithPH

3 Answers

8
votes

Answering my own question... with help from Steve Loughran above.

In Flink, when working with a file-based data source to process continuously, FileInputFormat does not enumerate nested files by default.

This is true whether the source is S3 or anything else.

You must set it like so:

def main(args: Array[String]) {

  val parameters = ParameterTool.fromArgs(args)
  val bucket = parameters.get("bucket")
  val folder = parameters.get("folder")

  val path = s"s3a://$bucket/$folder"

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val textInputFormat = new TextInputFormat(new Path(path))

  //this is important!
  textInputFormat.setNestedFileEnumeration(true)

  val lines: DataStream[String] = env.readFile(
    inputFormat = textInputFormat,
    filePath = path,
    watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
    interval = Time.seconds(10).toMilliseconds)

  lines.print()
  env.execute("Flink Streaming Scala API Skeleton")

}

0
votes

What version of Hadoop is under this?

If this has stopped with Hadoop 2.8, is probably a regression, meaning probably my fault. First file a JIRA @ issues.apache.org under FLINK, then, if its new in 2.8.0 link it as broken-by HADOOP-13208

The code snippet here is a good example that could be used for a regression test, and it's time I did some for Flink.

That big listFiles() change moves the enum of files under a path from a recursive treewalk to a series of flat lists of all child entries under a path: it works fantastically for everything else (distcp, tests, hive, spark) and has been shipping in products since Dec '16; I'd be somewhat surprised if it is the cause, but not denying blame. Sorry

0
votes

As with flink 1.7.x version Flink provides two file systems to talk to Amazon S3, flink-s3-fs-presto and flink-s3-fs-hadoop. Both flink-s3-fs-hadoop and flink-s3-fs-presto register default FileSystem wrappers for URIs with the s3:// scheme, flink-s3-fs-hadoop also registers for s3a:// and flink-s3-fs-presto also registers for s3p://, so you can use this to use both at the same time.

Sample code :

//Reading Data from S3
// This will print all the contents in the bucket line wise
final Path directory = new Path("s3a://husnain28may2020/");
final FileSystem fs = directory.getFileSystem();

//using input format
org.apache.flink.api.java.io.TextInputFormat textInputFormatS3 = new org.apache.flink.api.java.io.TextInputFormat(directory);
DataSet<String> linesS3 = env.createInput(textInputFormatS3);
linesS3.print();