I am trying to connect to a kafka cluster through SASL_SSL protocol with jaas config as follows:
spring:
cloud:
stream:
bindings:
binding-1:
binder: kafka-1-with-ssl
destination: <destination-1>
content-type: text/plain
group: <group-id-1>
consumer:
header-mode: headers
binding-2:
binder: kafka-2-with-ssl
destination: <destination-2>
content-type: text/plain
group: <group-id-2>
consumer:
header-mode: headers
binders:
kafka-1-with-ssl:
type: kafka
defaultCandidate: false
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: <broker-hostnames-1>
configuration:
ssl:
truststore:
location: <location-1>
password: <ts-password-1>
type: JKS
jaas:
loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
options:
username: <username-1>
password: <password-1>
kafka-2-with-ssl:
type: kafka
defaultCandidate: false
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: <broker-hostnames-2>
configuration:
ssl:
truststore:
location: <location-2>
password: <ts-password-2>
type: JKS
jaas:
loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
options:
username: <username-2>
password: <password-2>
kafka:
binder:
configuration:
security:
protocol: SASL_SSL
sasl:
mechanism: SCRAM-SHA-256
The above configuration is inline with the sample config available on the spring-cloud-stream's official git repo.
similar issue raised on the library's git repo says it's fixed in latest versions but doesn't seem so. Getting the following error:
springBootVersion: 2.2.8 and spring-cloud-stream-dependencies version - Horsham.SR6.
Failed to create consumer binding; retrying in 30 seconds | org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer:
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:461)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:90)
at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143)
at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$1(BindingService.java:201)
at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:407)
at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:65)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createAdminClient(KafkaTopicProvisioner.java:246)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:216)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:183)
at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:79)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:402)
... 12 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: KrbException: Cannot locate default realm
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:160)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:382)
... 18 common frames omitted
Caused by: javax.security.auth.login.LoginException: KrbException: Cannot locate default realm
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:804)
at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:61)
at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:111)
at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:149)
... 22 common frames omitted
Caused by: sun.security.krb5.RealmException: KrbException: Cannot locate default realm
at sun.security.krb5.Realm.getDefault(Realm.java:68)
at sun.security.krb5.PrincipalName.<init>(PrincipalName.java:462)
at sun.security.krb5.PrincipalName.<init>(PrincipalName.java:471)
at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:706)
... 38 common frames omitted
Caused by: sun.security.krb5.KrbException: Cannot locate default realm
at sun.security.krb5.Config.getDefaultRealm(Config.java:1029)
at sun.security.krb5.Realm.getDefault(Realm.java:64)
... 41 common frames omitted
Which makes me think that the library is not picking up the config props properly because jaas.loginModule
is specified as ScramLoginModule
but it's using Krb5LoginModule
to authenticate.
But well, it's striking to find that when the configuration is done as follows (the difference lies in the last part with ssl credentials outside binder's environment), it connects to the binder which is specified in the global ssl props(outside the binder's env) and silently ignores the other binder without showing any error logs.
Say if password credentials of the binder kafka-2-with-ssl
were specified in the global ssl props, that binder is created and the bindings subscribed to that binder start consuming the events. But this is useful only when we need to create single binder.
spring:
cloud:
stream:
bindings:
binding-1:
binder: kafka-1-with-ssl
destination: <destination-1>
content-type: text/plain
group: <group-id-1>
consumer:
header-mode: headers
binding-2:
binder: kafka-2-with-ssl
destination: <destination-2>
content-type: text/plain
group: <group-id-2>
consumer:
header-mode: headers
binders:
kafka-1-with-ssl:
type: kafka
defaultCandidate: false
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: <broker-hostnames-1>
configuration:
ssl:
truststore:
location: <location-1>
password: <ts-password-1>
type: JKS
jaas:
loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
options:
username: <username-1>
password: <password-1>
kafka-2-with-ssl:
type: kafka
defaultCandidate: false
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: <broker-hostnames-2>
configuration:
ssl:
truststore:
location: <location-2>
password: <ts-password-2>
type: JKS
jaas:
loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
options:
username: <username-2>
password: <password-2>
kafka:
binder:
configuration:
security:
protocol: SASL_SSL
sasl:
mechanism: SCRAM-SHA-256
ssl:
truststore:
location: <location-2>
password: <ts-password-2>
type: JKS
jaas:
loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
options:
username: <username-2>
password: <password-2>
Assure you that nothing is wrong with the ssl credentials. Tested diligently with either of the ssl-kafka-binder successfully getting created individually. The aim is to connect to multiple kafka binders with SASL_SSL protocol. Thanks in advance.