I'm testing the async send() in my kafka producer.
The cluster I want to connect to is offline.
My assumption would be that I send 10000 individual requests (lenght of listToSend) quickly.
Next the timeout (60s) would kick in and after 60 seconds I would see the callbacks hit me with logger.error(s"failed to send record ${x._2}", e)
However it seems to take forever for the method to finish.
That's why I added in the logger.debug("test: am I sending data")
line.
It prints, then nothing happens for 60 seconds. I see the failed callback for the 1st record. And only then will it move on.
Is this normal behavior or am I missing something fundamental?
listToSend.foreach { x =>
logger.debug("test: am I sending data")
// note: I added this 'val future =' in an attempt to fix this, to no avail
val future = producer.send(new ProducerRecord[String, String](topic, x._2), new Callback {
override def onCompletion(metadata: RecordMetadata, e: Exception) {
if (e != null) {
//todo: handle failed sends, timeouts, ...
logger.error(s"failed to send record ${x._2}", e)
}
else { //nice to have: implement logic here, or call another method to process metadata
logger.debug("~Callback success~")
}
}
}
)
}
note: I do not want to block this code, I want to keep it async. However it seems to be blocking on the send() regardless.
producer.send
returns aFuture[RecordMetadata]
(or something like that). You can callval meta = future.get()
to actually do the block, as well asproducer.close
– OneCricketeer