I am using Spring Boot with Kafka, and want to listen to messages on an existing and working broker and topic.
The application.yml
is configured as follows:
spring:
kafka:
topic:
boot: my_topic
bootstrap-servers: hostname:9092
properties:
security.protocol: SSL
consumer:
auto-offset-reset: earliest
group-id: my_group
enable-auto-commit: false
fetch-max-wait: 500
max-poll-records: 1
The following error is being thrown:
2018-02-11 11:36:34.302 WARN 1676 --- [ntainer#0-0-C-1] o.a.k.common.network.SslTransportLayer : Failed to send SSL Close message
java.io.IOException: An existing connection was forcibly closed by the remote host
at sun.nio.ch.SocketDispatcher.write0(Native Method) ~[na:1.8.0_20]
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:51) ~[na:1.8.0_20]
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[na:1.8.0_20]
at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.8.0_20]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470) ~[na:1.8.0_20]
at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) ~[kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) ~[kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.common.network.Selector.close(Selector.java:487) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:368) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.common.network.Selector.poll(Selector.java:291) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:136) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:197) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:248) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) [kafka-clients-0.10.1.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:556) [spring-kafka-1.1.7.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_20]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_20]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]
There is no possibility of changing the configuration of the kafka broker and producer. Is there anything missing in the client side kafka configuration within Spring Boot which can be added to resolve this?
Thanks
>An existing connection was forcibly closed by the remote host
- I suggest you look at the server log to see what he's complaining about. – Gary Russell