1
votes

I'm testing the async send() in my kafka producer. The cluster I want to connect to is offline. My assumption would be that I send 10000 individual requests (lenght of listToSend) quickly. Next the timeout (60s) would kick in and after 60 seconds I would see the callbacks hit me with logger.error(s"failed to send record ${x._2}", e) However it seems to take forever for the method to finish.

That's why I added in the logger.debug("test: am I sending data") line.

It prints, then nothing happens for 60 seconds. I see the failed callback for the 1st record. And only then will it move on.

Is this normal behavior or am I missing something fundamental?

listToSend.foreach { x =>
        logger.debug("test: am I sending data")
        // note: I added this 'val future =' in an attempt to fix this, to no avail
        val future = producer.send(new ProducerRecord[String, String](topic, x._2), new Callback {
          override def onCompletion(metadata: RecordMetadata, e: Exception) {

            if (e != null) {
              //todo: handle failed sends, timeouts, ...
              logger.error(s"failed to send record ${x._2}", e)
            }
            else { //nice to have: implement logic here, or call another method to process metadata
              logger.debug("~Callback success~")
            }
          }
        }
        )
      }

note: I do not want to block this code, I want to keep it async. However it seems to be blocking on the send() regardless.

1
You need to get the FutureOneCricketeer
isn't that blocking? I added "val future = " in an attempt to fix this, it wasn't originally thereHavnar
producer.send returns a Future[RecordMetadata] (or something like that). You can call val meta = future.get() to actually do the block, as well as producer.closeOneCricketeer
But I don't want to block, that's the thing. It's blocking now, but I just want to "get rid" of my records fast and let the logic handle any kind of failures/downtime/...Havnar
Then you need to set acks to zero in the producer config and not use a Callback or assign the futureOneCricketeer

1 Answers

1
votes

The parallelism I never figured out completely.

However it seems like my topic name (I had named it '[projectname here]_connection') was the issue.

Even though I didn't know of any reserved keywords in topic names, this behavior popped up.

Some further experimenting also brought up that a topic name with a trailing space can also cause this behavior. The producer will try to send it to this topic, but the Kafka cluster doesn't seem to know how to deal with it, causing these timeouts.

So for all of you who come across this issue, check/change your topic name before proceeding your troubleshooting.