I am trying to produce elements using FlinkKafkaProducer010, however when I open a consumer console window the elements appear to arrive out of order.
I created the topic using: kafka-topics.bat --create --topic mytopic --zookeeper localhost:2181 --partitions 1 --replication-factor 1
The consumer is created using: kafka-console-consumer.bat --zookeeper localhost:2181 --topic mytopic
The Kafka Producer code I am using is:
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
if(parameterTool.getNumberOfParameters() < 2) {
System.out.println("Missing parameters!");
System.out.println("Usage: Kafka --topic <topic> --bootstrap.servers <kafka brokers>");
return;
}
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
DataStream<String> messageStream = env.addSource(getSourceFunction());
FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties());
messageStream.addSink(producer);
env.execute("Kafka Producer");
}
public static SourceFunction<String> getSourceFunction() {
return new SourceFunction<String>() {
private static final long serialVersionUID = 6369260225318862378L;
public boolean running = true;
@Override
public void run(SourceContext<String> ctx) {
int counter = 0;
while (this.running && counter < 500) {
String data = "item " + Integer.toString(counter);
ctx.collect(data);
counter++;
}
}
@Override
public void cancel() {
running = false;
}
};
}
When I look in the Kafka log files I see a .log file with also the elements out of order. The ordering of the elements makes jumps of about 10 values. In my use case it is essential to have the correct order. I have been searching on how to ensure the elements arrive in order, but so far without any luck. Is there something I have missed that fixes the ordering?
Thanks in advance for any help!