1
votes

The spring-boot consumer micro-service is not able to send messages to a topic after kafka restarts.

Using the docker swarm configuration I have setup single node cluster with 1 kafka broker and 2 spring boot micro-services (a producer and a consumer). I am using spring boot 2.0.3

  • The consumer and the producer (spring boot micro-services) are on the same overlay network "net-broker" and so they access kafka with the service name "kafka:9092"

  • Everything is working fine on the first start.

  • Then kafka ONLY is restarted and after that the consumer IS not able to send messages from kafka topic any more.

  • The kafka service is restarted due to a small change in the docker-compose.yml (e.g max_attempts: 3 => max_attempts: 4)

The docker-compose.yml file

kafka:
    image: wurstmeister/kafka:2.12-2.2.0
    depends_on:
      - zookeeper
    networks:
      - net-broker
    deploy:
      replicas: 1
      update_config:
        parallelism: 1
        delay: 10s
      restart_policy:
        condition: on-failure
        max_attempts: 3
    # ports:
    #   - target: 9094
    #     published: 9094
    #     protocol: tcp
    #     mode: host
    environment:
      HOSTNAME_COMMAND: "echo ${HOST_IP:-192.168.99.100}"
      KAFKA_CREATE_TOPICS: "gnss-topic-${GNSS_TAG}:3:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
      KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      BROKER_ID_COMMAND: "echo 101"
      KAFKA_LOG_DIRS: "/kafka/kafka-logs"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
      - kafka-data:/kafka

The KafkaProducerConfig class

@Bean
  public ProducerFactory<String, GNSSPortHolderDTO> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();

    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, gnssConfig.getKafkaBootstapServers());

    // high throughput producer (at the expense of a bit of latency and CPU usage)
    configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    configProps.put(ProducerConfig.LINGER_MS_CONFIG, "20");
    configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32 KB batch size

    // serializers
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

    return new DefaultKafkaProducerFactory<>(configProps);
  }

spring boot producer logs after kafka restart:

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for gnss-topic-11.2.1-B5607-1: 30030 ms has passed since batch creation plus linger time

spring boot consumer logs after kafka restart:

gnss_data-access.1.ll948jogpqil@GDN-S-GNSS2    | 2019-05-08 09:42:33.984  INFO 1 --- [ gnss-view-data] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=gnss-view-data] Marking the coordinator fe7091944126:9092 (id: 2147483546 rack: null) dead

I am using the 'spring-kafka-2.1.7.RELEASE.jar' library for the producer/consumer micro-services

Using the remote debug mode I understood that the consumer is trying to send message to the old "killed" container ID instead of using the service name "kafka:9092". I do not know why.

1
In: ` configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, gnssConfig.getKafkaBootstapServers());` gnssConfig.getKafkaBootstapServers() is an IP, Docker Container ID or a simbolic name?Pablo López Gallego
Hello Pablo, First of all thank you very much for taking in account my question. gnssConfig.getKafkaBootstapServers() will return kafka:9092 which is the service nameGerassimos Mitropoulos

1 Answers

0
votes

I found a solution to this issue.

Just to remind:

  • Kafka should be accessible only from the overlay virtual network of Docker net-broker.
  • Kafka should NOT be accessible from the Docker host IP for security reasons

The following change in the KAFKA_ADVERTISED_LISTENERS environment variable fixed the problem

old value (not working after restart): KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094

new value (working after restart): KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094

So the fix is to specify the kafka service name kafka:9092 for the INSIDE advertise listener.

The problem was that even if the spring boot producer was configure to use kafka:9092

configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka:9092);  

the producer was actually using the kafka container ID for the communication instead of the service name kafka:9092, so after kafka restart a new container (new container ID) was created but the producer was still pointing to the old container ID