I am trying to create a JUnit test for a Flink streaming job which writes data to a kafka topic and read data from the same kafka topic using FlinkKafkaProducer09 and FlinkKafkaConsumer09 respectively. I am passing a test data in the produce:
DataStream<String> stream = env.fromElements("tom", "jerry", "bill");
And checking whether same data is coming from the consumer as:
List<String> expected = Arrays.asList("tom", "jerry", "bill");
List<String> result = resultSink.getResult();
assertEquals(expected, result);
using TestListResultSink.
I am able to see the data coming from the consumer as expected by printing the stream. But could not get the Junit test result as the consumer will keep on running even after the message finished. So it did not come to test part.
Is thre any way in Flink or FlinkKafkaConsumer09 to stop the process or to run for specific time?