I have data in Kafka, i wanted to read the data whether Kafka is sending or not sending data, and filter them and return JSON.
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "flink_consumer");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic",
new SimpleStringSchema(), properties);
consumer.setStartFromLatest();
//config.setWriteTimestampToKafka(true);
DataStream<String> stream = env.addSource(consumer);
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(String value) throws Exception {
return "Stream Value: " + value;
}
}).print();
env.execute();
Case 1: When Kafka producer sending the data to Kafka then i can see value printing in my console. - That's good and OK. Case 2: Kafka producer is stopped sending data, still Kafka has value in topic, but the same code not returning me any data. -- Is this possible?
any idea where am making mistake?
{"firsname":"test", "lastname":"topic", "value":"3.45", "location":"UK"}
I wanted filter firstname and return JSON.
I see there is filter options during data streaming process.
consumer.setStartFromEarliest();. It will start reading from the very first un-acknowledged message. - Zahid Adeel