2
votes

I need my flink job to pull records from a database at specified interval and archive it after processing. I have implemented SourceFunction to fetch the required records from database and added the SourceFunction as the source for StreamExecutionEnvironment. How can i specify that the StreamExecutionEnvironment needs to fetch records from database by using the SourceFunction every 10 minutes?

SourceFunction:

public class MongoDBSourceFunction implements SourceFunction<List<Book>>{

    public void cancel() {
        // TODO Auto-generated method stub
    }

    public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
<List<Book>> context) throws Exception {

        List<Book> books = getBooks();

        context.collect(books);

    }

    public List<Book> getBooks() {
        List<Book> books = new ArrayList<Book>();

        //fetch all books from database     
        return books;
    }

}

Processor:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class ArchiveJob {

    public static void main(String[] args) {

        final StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new MongoDBSourceFunction()).print();
    }

}
1

1 Answers

3
votes

You need to add this functionality to the MongoDBSourceFunction itself. For example, you could instantiate a ScheduledExecutorService in the open method and schedule the read task using this executor.

Note, that it is important to hold the checkpointing lock while emitting records.