0
votes

I have data in Kafka, i wanted to read the data whether Kafka is sending or not sending data, and filter them and return JSON.

        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();

        properties.setProperty("bootstrap.servers", "localhost:9092");
       
        properties.setProperty("group.id", "flink_consumer");


        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic",
                new SimpleStringSchema(), properties);
        consumer.setStartFromLatest();
        //config.setWriteTimestampToKafka(true);

        DataStream<String> stream = env.addSource(consumer);

        stream.map(new MapFunction<String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public String map(String value) throws Exception {
                
                return "Stream Value: " + value;
            }
        }).print();
        env.execute();

Case 1: When Kafka producer sending the data to Kafka then i can see value printing in my console. - That's good and OK. Case 2: Kafka producer is stopped sending data, still Kafka has value in topic, but the same code not returning me any data. -- Is this possible?

any idea where am making mistake?

{"firsname":"test", "lastname":"topic", "value":"3.45", "location":"UK"}

I wanted filter firstname and return JSON.

I see there is filter options during data streaming process.

1
if you want to start from the very first message, you should set consumer.setStartFromEarliest();. It will start reading from the very first un-acknowledged message. - Zahid Adeel
Zahid - Thank you very much it's really worked. - user23
I am glad it was helpful. - Zahid Adeel
sure, i can't see the upvote button. - user23
zahid should turn his comment into an answer, and then you can upvote it. - kkrugler

1 Answers

0
votes

If you want to start from the very first message, you should set consumer.setStartFromEarliest();. It will start reading from the very first un-acknowledged message.