0
votes

I'm trying to deploy my flink job on AWS EMR (version 5.15 with Flink 1.4.2). However, I could not get any output from my stream. I tried to create a simple job:

object StreamingJob1 {
    def main(args: Array[String]) {
        val path = args(0)
        val file_input_format = new TextInputFormat(
            new org.apache.flink.core.fs.Path(path))
        file_input_format.setFilesFilter(FilePathFilter.createDefaultFilter())
        file_input_format.setNestedFileEnumeration(true)

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val myStream: DataStream[String] =
            env.readFile(file_input_format,
                path,
                FileProcessingMode.PROCESS_CONTINUOUSLY,
                1000L)
                .map(s => s.split(",").toString)

        myStream.print()
        // execute program
        env.execute("Flink Streaming Scala")
    }
}

And I executed it using the following command:

HADOOP_CONF_DIR=/etc/hadoop/conf; flink run -m yarn-cluster -yn 4 -c my.pkg.StreamingJob1 /home/hadoop/flink-test-0.1.jar hdfs:///user/hadoop/data/

There was no error, but no output on the screen except flink's INFO logs.

I tried to output to a Kinesis stream, or to an S3 file. Nothing was recorded.

    myStream.addSink(new BucketingSink[String](output_path))

I also tried to write to a HDFS file. In this case, a file was created, but with size = 0. I am sure that the input file has been processed using a simple check:

myStream.map(s => {"abc".toInt})

which generated an exception.

What am I missing here?

1

1 Answers

0
votes

It looks like stream.print() doesn't work on EMR.

Output to file: HDFS is used, and sometimes (or most of the time) I need to wait for the file to be updated.

Output to Kinesis: I had a typo in my stream name. I don't know why I didn't get any exception for that stream-not-exist. However, after get the name corrected, I got my expected message.