The spring-boot consumer micro-service is not able to send messages to a topic after kafka restarts.
Using the docker swarm configuration I have setup single node cluster with 1 kafka broker and 2 spring boot micro-services (a producer and a consumer). I am using spring boot 2.0.3
The consumer and the producer (spring boot micro-services) are on the same overlay network "net-broker" and so they access kafka with the service name "kafka:9092"
Everything is working fine on the first start.
Then kafka ONLY is restarted and after that the consumer IS not able to send messages from kafka topic any more.
The kafka service is restarted due to a small change in the docker-compose.yml (e.g max_attempts: 3 => max_attempts: 4)
The docker-compose.yml file
kafka:
image: wurstmeister/kafka:2.12-2.2.0
depends_on:
- zookeeper
networks:
- net-broker
deploy:
replicas: 1
update_config:
parallelism: 1
delay: 10s
restart_policy:
condition: on-failure
max_attempts: 3
# ports:
# - target: 9094
# published: 9094
# protocol: tcp
# mode: host
environment:
HOSTNAME_COMMAND: "echo ${HOST_IP:-192.168.99.100}"
KAFKA_CREATE_TOPICS: "gnss-topic-${GNSS_TAG}:3:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
BROKER_ID_COMMAND: "echo 101"
KAFKA_LOG_DIRS: "/kafka/kafka-logs"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- kafka-data:/kafka
The KafkaProducerConfig class
@Bean
public ProducerFactory<String, GNSSPortHolderDTO> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, gnssConfig.getKafkaBootstapServers());
// high throughput producer (at the expense of a bit of latency and CPU usage)
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
configProps.put(ProducerConfig.LINGER_MS_CONFIG, "20");
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32 KB batch size
// serializers
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
spring boot producer logs after kafka restart:
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for gnss-topic-11.2.1-B5607-1: 30030 ms has passed since batch creation plus linger time
spring boot consumer logs after kafka restart:
gnss_data-access.1.ll948jogpqil@GDN-S-GNSS2 | 2019-05-08 09:42:33.984 INFO 1 --- [ gnss-view-data] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=gnss-view-data] Marking the coordinator fe7091944126:9092 (id: 2147483546 rack: null) dead
I am using the 'spring-kafka-2.1.7.RELEASE.jar' library for the producer/consumer micro-services
Using the remote debug mode I understood that the consumer is trying to send message to the old "killed" container ID instead of using the service name "kafka:9092". I do not know why.
gnssConfig.getKafkaBootstapServers()
will returnkafka:9092
which is the service name – Gerassimos Mitropoulos