1
votes

We are planning to use Flink to process a stream of data from a kafka topic (Logs in Json format).

But for that processing, we need to use input files which change every day, and the information within can change completely (not the format, but the contents).

Each time one of those input files changes we will have to reload those files into the program and keep the stream processing going on.

Re-loading of the data could be done same way as it is done now:

DataSet<String> globalData = env.readTextFile("file:///path/to/file");

But so far I couldnt find examples or come up with a way to trigger that reload in a stream processing job.

As extra information, we wont be using HDFS but local filesystem on each node, so the reload will have to be done in each node, from the local file. This is because the only reason why we would need HDFS would be for this input files, which are just 100 mb in total and using HDFS would be an overkill.

So far I have been experimenting with RichMapFunction, trying to find a kafka-topic that would provide this functionality (reload files) and trying to find examples of this with no luck.


Edit:

After reading a lot more, I found in several places that this is the way to go: DataArtisans examples.

Trying to make a simple code that would do a simple change in a stream from a control stream, I got the following code:

public class RichCoFlatMapExample extends EventTimeJoinHelper {

private String config_source_path = "NOT_INITIALIZED";

@Override
public void open(Configuration conf) {
    config_source_path = "first_file_path";

}

public abstract void processElement1(String one, String two, Collector<String> out) {

   config_source_path = one;
}


public abstract void processElement2(String one, String two, Collector<String> out) {
   String three = two + config_source_path;
   out.collect(three);
}

}

The problem Im having now is, no matter what I try, I get the following error:

Class 'RichCoFlatMapExample' must either be declared abstract or implement abstract method 'processElement1(String, String, Collector)' in 'RichCoFlatMapExample'

The problem is, the requested method is implemented, but I cant make them "abstract" in a non abstract class (I get an error from the IDE). If I make the class RichCoFlatMapExample, I wont be able to call it from Flink methods (dataStream methods).

Im not sure what is happening but I think this must be close. I will keep trying and update if I make this work.

1
I wanted to clarify how the "input files" are supposed to work: They are to be used as data for filtering, for instance, "valid countries", and then we will filter our stream with those "valid countries". Hence, we need a fully loaded "input file" before we keep processing the stream.I want badges

1 Answers

3
votes

Flink can monitor a directory and ingest files when they are moved into that directory; maybe that's what you are looking for. See the PROCESS_CONTINUOUSLY option for readfile in the documentation.

However, if the data is in Kafka, it would be much more natural to use Flink's Kafka consumer to stream the data directly into Flink. There is also documentation about using the Kafka connector. And the Flink training includes an exercise on using Kafka with Flink.