I have a Flink job to consume a Kafka topic and sink it to another topic and the Flink job is setting as auto.commit with a interval 3 minutes(checkpoint disabled), but in the monitoring side, there is 3 minutes lag. But we want to monitor the processing on real time without 3 minutes lag, so we want to have a feature that the FlinkKafkaConsumer
is able to commit the offset immediately after sink function.
Is there a way to achieve this goal within Flink framework?
Or any other options?
On line 53, I am trying to create a KafkaConsumer
instance to call commitSync()
function to make it working, but it does not work.
public class CEPJobTest {
private final static String TOPIC = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) throws Exception {
System.out.println("start cep test job...");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "console-consumer-cep");
properties.setProperty("enable.auto.commit", "false");
// offset interval
//properties.setProperty("auto.commit.interval.ms", "500");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(),
properties);
//set commitoffset by checkpoint
consumer.setCommitOffsetsOnCheckpoints(false);
System.out.println("checkpoint enabled:"+consumer.getEnableCommitOnCheckpoints());
DataStream<String> stream = env.addSource(consumer);
stream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
return new Date().toString() + ": " + value;
}
}).print();
//here, I want to commit offset manually after processing message...
KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer(properties);
kafkaConsumer.commitSync();
env.execute("Flink Streaming");
}
private static Consumer<Long, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
return consumer;
}
}