0
votes

While I was doing a proof of concept with kafka-flink, I discovered the following : It seems that kafka producer errors could happen due to workload done on flink side ?!

Here are more details:

I have sample files like sample??.EDR made of ~700'000 rows with values like "entity", "value", "timestamp"

I use the following command to create the kafka topic:

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic gprs

I use the following command to load sample files on topic:

[13:00] kafka@ubu19: ~/fms
% /home/kafka/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic gprs < ~/sample/sample01.EDR

I have on flink side jobs that aggregate value for each entity with sliding window of 6 hours and 72 hours (aggregationeachsix, aggregationeachsentytwo).

I did three scenarios:

  1. Load files in the topic without any job running
  2. Load files in the topic with aggregationeachsix job running
  3. Load files in the topic with aggregationeachsix and aggregationeachsentytwo jobs running

The results is that first two scenarios are working but for the third scenario, I have the following errors on kafka producer side while loading the files (not always at the same file, it can be the first, second, third or even later file):

    [plenty of lines before this part]
    [2017-08-09 12:56:53,409] ERROR Error when sending message to topic gprs with key: null, value: 35 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Expiring 233 record(s) for gprs-0: 1560 ms has passed since last append
    [2017-08-09 12:56:53,409] ERROR Error when sending message to topic gprs with key: null, value: 37 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Expiring 233 record(s) for gprs-0: 1560 ms has passed since last append
    [2017-08-09 12:56:53,409] ERROR Error when sending message to topic gprs with key: null, value: 37 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Expiring 233 record(s) for gprs-0: 1560 ms has passed since last append
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1627 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1626 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1625 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1624 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
    [2017-08-09 12:56:53,515] ERROR Error when sending message to topic gprs with key: null, value: 35 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Expiring 8 record(s) for gprs-0: 27850 ms has passed since batch creation plus linger time
    [2017-08-09 12:56:53,515] ERROR Error when sending message to topic gprs with key: null, value: 37 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Expiring 8 record(s) for gprs-0: 27850 ms has passed since batch creation plus linger time
    [plenty of lines after this part]

My question is why flink could have an impact on kafka producer and then, what do I need to change to avoid this error ?

1

1 Answers

0
votes

It looks like you are saturating your network when both flink and kafka-producer are using it and thus you get TimeoutExceptions.