1
votes

We have a long running Flink cluster on AWS EMR. It is configured with the default roles (EMR_EC2_DefaultRole). We try to run a Flink job but it cannot access the s3 bucket to read a file. We have created the minimal main method code to reproduce it:

String filePath = "s3://<our-bucket>/<the-file>";
logger.info("Path: " + filePath);
Path path = Paths.get(filePath);
logger.info("Successfully got path");
File file = path.toFile();
logger.info("Successfully got creds file");
logger.info("Exists [{}], isFile [{}] ", file.exists(), file.isFile());
String content = FileUtils.readFileToString(file);
logger.info("Content [{}]", content);

We run the Flink job via the Flink Web UI. We get all the logs except the Content log.

The exists log is: Exists [false], isFile [false]

We also get the following error:

Caused by: java.io.FileNotFoundException: File 's3:/<our-bucket>/<the-file>' does not exist
    at org.apache.commons.io.FileUtils.openInputStream(FileUtils.java:299)
    at org.apache.commons.io.FileUtils.readFileToString(FileUtils.java:1711)
    at org.apache.commons.io.FileUtils.readFileToString(FileUtils.java:1748)
    at com.<our-package>.Main.main(Main.java:39)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
    ... 10 more

When we ssh to the master EC2 instance and run the following command it works and returns the file content:

sudo hdfs dfs -cat s3://<our-bucket>/<the-file>

Please help :)

1
I'm a bit confused by the single '/' in your FileNotFoundException. Can you triple check that you provide the correct URL?Arvid Heise
@ArvidHeise I've checked it and this happens because of the line Paths.get(filePath). When the path object is logged that is what it produces. I cannot change this line since it is a line inside a 3rd party library I am using. I've tried to pass a url that starts with s3:/// but it removes all the duplicate slashesSlava Shpitalny
What happens if you use s3a://<bucket> or s3n://<bucket>?kkrugler
@kkrugler same thing.Slava Shpitalny

1 Answers

0
votes

It looks like you're trying to pass an S3 path to org.apache.commons.io.FileUtils.readFileToString(), which I don't think will work.

You can create a Flink Path from that S3 path and use that to create an input stream, e.g.

Path = new Path("s3://<our-bucket>/<the-file>");
FileSystem fs = filePath.getFileSystem();
InputStream is = new DataInputStream(fs.open(filePath, readBufferSize));
String s = IOUtils.toString(is, charset);