0
votes

I have a Flink job to consume a Kafka topic and sink it to another topic and the Flink job is setting as auto.commit with a interval 3 minutes(checkpoint disabled), but in the monitoring side, there is 3 minutes lag. But we want to monitor the processing on real time without 3 minutes lag, so we want to have a feature that the FlinkKafkaConsumer is able to commit the offset immediately after sink function.
Is there a way to achieve this goal within Flink framework?
Or any other options?

On line 53, I am trying to create a KafkaConsumer instance to call commitSync() function to make it working, but it does not work.

    public class CEPJobTest {
        private final static String TOPIC = "test";
        private final static String BOOTSTRAP_SERVERS = "localhost:9092";
        public static void main(String[] args) throws Exception {
    
            System.out.println("start cep test job...");
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9092");
            properties.setProperty("zookeeper.connect", "localhost:2181");
            properties.setProperty("group.id", "console-consumer-cep");
            properties.setProperty("enable.auto.commit", "false");
            // offset interval
            //properties.setProperty("auto.commit.interval.ms", "500");
    
            FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(),
                    properties);
    
            //set commitoffset by checkpoint
            consumer.setCommitOffsetsOnCheckpoints(false);
            System.out.println("checkpoint enabled:"+consumer.getEnableCommitOnCheckpoints());
    
            DataStream<String> stream = env.addSource(consumer);
    
            stream.map(new MapFunction<String, String>() {
    
                @Override
                public String map(String value) throws Exception {
                    return new Date().toString() + ":  " + value;
                }
    
            }).print();
           
            //here, I want to commit offset manually after processing message...
    
            KafkaConsumer<?, ?> kafkaConsumer = new KafkaConsumer(properties);
            kafkaConsumer.commitSync();
            env.execute("Flink Streaming");
    
    
    
        }
        private static Consumer<Long, String> createConsumer() {
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
    
            final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
            return consumer;
        }
    }
1

1 Answers

0
votes

This does not work like your code

env.execute is to submit job to cluster, the execution is then submitted. The code before this line is just build the job graph rather than executing anything.

To do this after sink, you should put it in your sink function

class mySink extends RichSinkFunction {
  override def invoke(...) = {
    val kafkaConsumer = new KafkaConsumer(properties);
    kafkaConsumer.commitSync();
  }
}