0
votes

I am trying to produce elements using FlinkKafkaProducer010, however when I open a consumer console window the elements appear to arrive out of order.

I created the topic using: kafka-topics.bat --create --topic mytopic --zookeeper localhost:2181 --partitions 1 --replication-factor 1

The consumer is created using: kafka-console-consumer.bat --zookeeper localhost:2181 --topic mytopic

The Kafka Producer code I am using is:

public static void main(String[] args) throws Exception {
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    if(parameterTool.getNumberOfParameters() < 2) {
        System.out.println("Missing parameters!");
        System.out.println("Usage: Kafka --topic <topic> --bootstrap.servers <kafka brokers>");
        return;
    }

    StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));

    DataStream<String> messageStream = env.addSource(getSourceFunction());

    FlinkKafkaProducer010<String> producer = new FlinkKafkaProducer010<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties());
    messageStream.addSink(producer);
    env.execute("Kafka Producer");
}

public static SourceFunction<String> getSourceFunction() {
    return new SourceFunction<String>() {
        private static final long serialVersionUID = 6369260225318862378L;
        public boolean running = true;

        @Override
        public void run(SourceContext<String> ctx) {
            int counter = 0;
            while (this.running && counter < 500) {
                String data = "item " + Integer.toString(counter);
                ctx.collect(data);

                counter++;
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    };
}

When I look in the Kafka log files I see a .log file with also the elements out of order. The ordering of the elements makes jumps of about 10 values. In my use case it is essential to have the correct order. I have been searching on how to ensure the elements arrive in order, but so far without any luck. Is there something I have missed that fixes the ordering?

Thanks in advance for any help!

1
You have guaranteed ordering in a partition. Can u create a consumer programatically and check.Indraneel Bende
I was already consuming the elements in Flink using a FlinkKafkaConsumer010 with TimeCharacteristic set to EventTime. However, this also had out of order arrival, so I went back to the basics of checking with just Kafka and a simple Kafka console consumer.Hans

1 Answers

2
votes

I guess you are using parallelism > 1 for the sink. Order of elements is only guranteed across single operator instance. If you write from multiple parallel instances of a sink to a single kafka partition then there is no guarantees on the order.