I need my flink job to pull records from a database at specified interval and archive it after processing. I have implemented SourceFunction to fetch the required records from database and added the SourceFunction as the source for StreamExecutionEnvironment. How can i specify that the StreamExecutionEnvironment needs to fetch records from database by using the SourceFunction every 10 minutes?
SourceFunction:
public class MongoDBSourceFunction implements SourceFunction<List<Book>>{
public void cancel() {
// TODO Auto-generated method stub
}
public void run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
<List<Book>> context) throws Exception {
List<Book> books = getBooks();
context.collect(books);
}
public List<Book> getBooks() {
List<Book> books = new ArrayList<Book>();
//fetch all books from database
return books;
}
}
Processor:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ArchiveJob {
public static void main(String[] args) {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new MongoDBSourceFunction()).print();
}
}