0
votes

I am new to Flink (v1.3.2) and trying to read avro record continuously in scala on EMR. I know for file you can use something like following and it will keep running and scanning directory.

    val stream = env.readFile(
      inputFormat = textInputFormat,
      filePath = path,
      watchType = FileProcessingMode.PROCESS_CONTINUOUSLY,
      interval = Time.seconds(10).toMilliseconds
    )

Is there a similar way in Flink for avro record? So I have the following code

val textInputFormat = new AvroInputFormat(new Path(path), classOf[User])
textInputFormat.setNestedFileEnumeration(true)
val avroInputStream = env.createInput(textInputFormat)

val output = avroInputStream.map(line => (line.getUserID, 1))
      .keyBy(0)
      .timeWindow(Time.seconds(10))
      .sum(1)

output.print()

I am able to see the output there then Flink switched to FINISHED, but still want to get the code running/waiting for any new files arrive in the future. Is there something like FileProcessingMode.PROCESS_CONTINUOUSLY? Please suggest!

1

1 Answers

0
votes

I figure out this by setting up a flink-yarn-session on EMR and make it run PROCESS_CONTINUOUSLY.

 env.readFile(textInputFormat, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 100)
  1. Create a new flink yarn session using flink-yarn-session -n 2 -d
  2. Get application_id using yarn application -list, for example, it is application_0000000000_0002
  3. Attached flink run job with the application_id,flink run -m yarn-cluster -yid application_0000000000_0002 xxx.jar

More detail can be found on EMR documentation now: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html