3
votes

I am using Spring Boot with Kafka, and want to listen to messages on an existing and working broker and topic.

The application.yml is configured as follows:

spring:
  kafka:
    topic:
        boot: my_topic
    bootstrap-servers: hostname:9092
    properties:
      security.protocol: SSL
    consumer:
      auto-offset-reset: earliest
      group-id: my_group
      enable-auto-commit: false
      fetch-max-wait: 500
      max-poll-records: 1

The following error is being thrown:

2018-02-11 11:36:34.302  WARN 1676 --- [ntainer#0-0-C-1] o.a.k.common.network.SslTransportLayer   : Failed to send SSL Close message 

java.io.IOException: An existing connection was forcibly closed by the remote host
    at sun.nio.ch.SocketDispatcher.write0(Native Method) ~[na:1.8.0_20]
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:51) ~[na:1.8.0_20]
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[na:1.8.0_20]
    at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.8.0_20]
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470) ~[na:1.8.0_20]
    at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) ~[kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) ~[kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.common.network.Selector.close(Selector.java:487) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:368) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:291) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:136) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:197) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:248) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) [kafka-clients-0.10.1.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) [kafka-clients-0.10.1.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:556) [spring-kafka-1.1.7.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_20]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_20]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]

There is no possibility of changing the configuration of the kafka broker and producer. Is there anything missing in the client side kafka configuration within Spring Boot which can be added to resolve this?

Thanks

1
>An existing connection was forcibly closed by the remote host - I suggest you look at the server log to see what he's complaining about.Gary Russell
From server log: org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 369296128 larger than 104857600)user1052610

1 Answers

1
votes

It appears your broker is not configured for SSL; I get the same error if I set SSL on the client; my broker doesn't use SSL...

[2018-02-12 11:04:45,700] WARN Unexpected error from /127.0.0.1; closing connection (org.apache.kafka.common.network.Selector) org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 369296128 larger than 104857600)