0
votes

So I have to retrieve the content of a file stored in HDFS and perform certain analysis on it.

The thing is, I can't even manage to read the file and write its content to another text file in my local file system. (I'm new to Flink, this is just a test to make sure I'm properly reading the file)

The file in HDFS is a plain text file. Here's my code:

public class readFromHdfs {

    public static void main(String[] args) throws Exception {

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> lines = env.readTextFile("hdfs://localhost:9000//test/testfile0.txt");

        lines.writeAsText("/tmp/hdfs_file.txt"); 

        env.execute("File read from HDFS");
    }
}

There's no output in /tmp after I run it.

It's a really simple code and I'm not sure if there's a problem with it or I'm just doing something else wrong. As I said, I'm completely new to Flink

Also, the job appears as failed in the web dashboard. Here's the contet of the flink log: https://pastebin.com/rvkXPGHU

Thanks in advance

EDIT: I solved the problem by increasing the number of task slots. The web dashboard showed an available task slot, and it wasn't complaining about not having enough slots at all, so I didn't think it could be that.

Anyway, writeAsText it's not really working as I expected. I reads the contents from testfile0.txt allright, but it doesn't write them into hdfs_file.txt. Instead it creates a directory by that name, and inside it 8 text files, 6 of them totally empty. The other two contain testfile0.txt (most of it is in 1.txt, and the last chunk in 2.txt).

Although it doesn't really matter because the content of the file is being properly stored in the DataSet, so I can go on with my analysis of the data.

1

1 Answers

1
votes

It works as expected - you have set the parallelism of full job (and hence output format too) to 8, so each slot creates its own file (as you could know it is unsafe to write to the single file concurrenty). If you need just 1 output file you should writeAsText(...).setParalellis(1) to override global parallelism property.

If you want to get output in local file system instead of HDFS, you should explicitly set "file://" protocol in path, because for Hadoop flink looks to "hdfs://" by default.