My use case: Using Postman, I call a Spring boot soap endpoint. The endpoint creates a KafkaProducer and send a message to a specific topic. I also have a TaskScheduler to consume the topic.
The problem: When calling soap to push a message to a topic, I get this error:
2017-11-14 21:29:31.463 ERROR 6389 --- [ad | producer-3] DomainEntityProducer : Expiring 1 record(s) for DomainEntityCommandStream-0: 30030 ms has passed since batch creation plus linger time 2017-11-14 21:29:31.464 ERROR 6389 --- [nio-8080-exec-6] DomainEntityProducer : org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for DomainEntityCommandStream-0: 30030 ms has passed since batch creation plus linger time
Here’s the method I use to push to the topic:
public DomainEntity push(DomainEntity pDomainEntity) throws Exception {
logger.log(Level.INFO, "streaming...");
wKafkaProperties.put("bootstrap.servers", "localhost:9092");
wKafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
wKafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer wKafkaProducer = new KafkaProducer(wKafkaProperties);
ProducerRecord wProducerRecord = new ProducerRecord("DomainEntityCommandStream", getJSON(pDomainEntity));
wKafkaProducer.send(wProducerRecord, (RecordMetadata r, Exception e) -> {
if (e != null) {
logger.log(Level.SEVERE, e.getMessage());
}
}).get();
return pDomainEntity;
}
Using command shell scripts
./kafka-console-producer.sh --broker-list 10.0.1.15:9092 --topic DomainEntityCommandStream
and
./kafka-console-consumer.sh --boostrap-server 10.0.1.15:9092 --topic DomainEntityCommandStream --from-beginning
works very well.
Going through some related problems on Stackoverflow, I have tried to purge the topic:
./kafka-topics.sh --zookeeper 10.0.1.15:9092 --alter --topic DomainEntityCommandStream --config retention.ms=1000
Looking at kafka logs, I see that retention time was altered.
But, no luck, I get the same error.
The payload is ridiculously small, so why should I change batch.size?
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"
xmlns:gs="http://soap.problem.com">
<soapenv:Header/>
<soapenv:Body>
<gs:streamDomainEntityRequest>
<gs:domainEntity>
<gs:name>12345</gs:name>
<gs:value>Quebec</gs:value>
<gs:version>666</gs:version>
</gs:domainEntity>
</gs:streamDomainEntityRequest>
</soapenv:Body>
</soapenv:Envelope>