5
votes

My use case: Using Postman, I call a Spring boot soap endpoint. The endpoint creates a KafkaProducer and send a message to a specific topic. I also have a TaskScheduler to consume the topic.

The problem: When calling soap to push a message to a topic, I get this error:

2017-11-14 21:29:31.463 ERROR 6389 --- [ad | producer-3] DomainEntityProducer : Expiring 1 record(s) for DomainEntityCommandStream-0: 30030 ms has passed since batch creation plus linger time 2017-11-14 21:29:31.464 ERROR 6389 --- [nio-8080-exec-6] DomainEntityProducer : org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for DomainEntityCommandStream-0: 30030 ms has passed since batch creation plus linger time

Here’s the method I use to push to the topic:

public DomainEntity push(DomainEntity pDomainEntity) throws Exception {
    logger.log(Level.INFO, "streaming...");
    wKafkaProperties.put("bootstrap.servers", "localhost:9092");
    wKafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    wKafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    KafkaProducer wKafkaProducer = new KafkaProducer(wKafkaProperties);
    ProducerRecord wProducerRecord = new ProducerRecord("DomainEntityCommandStream", getJSON(pDomainEntity));
    wKafkaProducer.send(wProducerRecord, (RecordMetadata r, Exception e) -> {
        if (e != null) {
            logger.log(Level.SEVERE, e.getMessage());
        }
    }).get();
    return pDomainEntity;
}

Using command shell scripts

./kafka-console-producer.sh --broker-list 10.0.1.15:9092 --topic DomainEntityCommandStream

and

./kafka-console-consumer.sh --boostrap-server 10.0.1.15:9092 --topic DomainEntityCommandStream --from-beginning

works very well.

Going through some related problems on Stackoverflow, I have tried to purge the topic:

./kafka-topics.sh --zookeeper 10.0.1.15:9092 --alter --topic DomainEntityCommandStream --config retention.ms=1000

Looking at kafka logs, I see that retention time was altered.

But, no luck, I get the same error.

The payload is ridiculously small, so why should I change batch.size?

<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
                  xmlns:gs="http://soap.problem.com">
   <soapenv:Header/>
   <soapenv:Body>
      <gs:streamDomainEntityRequest>
         <gs:domainEntity>
                <gs:name>12345</gs:name>
                <gs:value>Quebec</gs:value>
                <gs:version>666</gs:version>
            </gs:domainEntity>
      </gs:streamDomainEntityRequest>
   </soapenv:Body>
</soapenv:Envelope>
1
Btw, i use zookeeper and kafka in docker containersErick Audet
I found my error. When using kafka inside a Docker container, you need to specify the KAFKA_ADVERTISED_HOST_NAME in the yml file. This enables producers and consumers outside of the container to interact with kafka.Erick Audet
Which value did you use?Alessandro Dionisi

1 Answers

0
votes

Using Docker and Kafka 0.11.0.1 image you need to add the following environment parameters to the container:

KAFKA_ZOOKEEPER_CONNECT = X.X.X.X:XXXX (your zookeeper IP or domain : PORT default 2181)

KAFKA_ADVERTISED_HOST_NAME = X.X.X.X (your kafka IP or domain)

KAFKA_ADVERTISED_PORT = XXXX (your kafka PORT number default 9092)

Optionally:

KAFKA_BROKER_ID = 999 (some value)

KAFKA_CREATE_TOPICS=test:1:1 (some topic name to create at start)

If it doesn't work and you still get same message ("Expiring X record(s) for xxxxx: XXXXX ms has passed since batch creation plus linger time") you can try cleaning the kafka data from zookeeper.