1
votes

I am trying to connect to a kafka cluster through SASL_SSL protocol with jaas config as follows:

spring:
  cloud:
    stream:
      bindings:
        binding-1:
          binder: kafka-1-with-ssl
          destination: <destination-1>
          content-type: text/plain
          group: <group-id-1>
          consumer:
            header-mode: headers
        binding-2:
          binder: kafka-2-with-ssl
          destination: <destination-2>
          content-type: text/plain
          group: <group-id-2>
          consumer:
            header-mode: headers
            

      binders:
        kafka-1-with-ssl:
          type: kafka
          defaultCandidate: false
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder: 
                      brokers: <broker-hostnames-1>
                      configuration:
                        ssl:
                          truststore:
                            location: <location-1>
                            password: <ts-password-1>
                            type: JKS
                      jaas:
                        loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
                        options:
                          username: <username-1>
                          password: <password-1>

        kafka-2-with-ssl:
          type: kafka
          defaultCandidate: false
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder: 
                      brokers: <broker-hostnames-2>
                      configuration:
                        ssl:
                          truststore:
                            location: <location-2>
                            password: <ts-password-2>
                            type: JKS
                      jaas:
                        loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
                        options:
                          username: <username-2>
                          password: <password-2>
      kafka:
        binder:
          configuration:
            security:
              protocol: SASL_SSL
            sasl:
              mechanism: SCRAM-SHA-256 

The above configuration is inline with the sample config available on the spring-cloud-stream's official git repo.

similar issue raised on the library's git repo says it's fixed in latest versions but doesn't seem so. Getting the following error:

springBootVersion: 2.2.8 and spring-cloud-stream-dependencies version - Horsham.SR6.

Failed to create consumer binding; retrying in 30 seconds | org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer: 
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:461)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:90)
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:143)
    at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleConsumerBinding$1(BindingService.java:201)
    at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: Failed to create new KafkaAdminClient
    at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:407)
    at org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:65)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createAdminClient(KafkaTopicProvisioner.java:246)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.doProvisionConsumerDestination(KafkaTopicProvisioner.java:216)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:183)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionConsumerDestination(KafkaTopicProvisioner.java:79)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:402)
    ... 12 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: KrbException: Cannot locate default realm
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:160)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:146)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99)
    at org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:382)
    ... 18 common frames omitted
Caused by: javax.security.auth.login.LoginException: KrbException: Cannot locate default realm
    at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:804)
    at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:617)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at javax.security.auth.login.LoginContext.invoke(LoginContext.java:755)
    at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
    at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
    at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:60)
    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:61)
    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:111)
    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:149)
    ... 22 common frames omitted
Caused by: sun.security.krb5.RealmException: KrbException: Cannot locate default realm
    at sun.security.krb5.Realm.getDefault(Realm.java:68)
    at sun.security.krb5.PrincipalName.<init>(PrincipalName.java:462)
    at sun.security.krb5.PrincipalName.<init>(PrincipalName.java:471)
    at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:706)
    ... 38 common frames omitted
Caused by: sun.security.krb5.KrbException: Cannot locate default realm
    at sun.security.krb5.Config.getDefaultRealm(Config.java:1029)
    at sun.security.krb5.Realm.getDefault(Realm.java:64)
    ... 41 common frames omitted 

Which makes me think that the library is not picking up the config props properly because jaas.loginModule is specified as ScramLoginModule but it's using Krb5LoginModule to authenticate.

But well, it's striking to find that when the configuration is done as follows (the difference lies in the last part with ssl credentials outside binder's environment), it connects to the binder which is specified in the global ssl props(outside the binder's env) and silently ignores the other binder without showing any error logs.

Say if password credentials of the binder kafka-2-with-ssl were specified in the global ssl props, that binder is created and the bindings subscribed to that binder start consuming the events. But this is useful only when we need to create single binder.

spring:
  cloud:
    stream:
      bindings:
        binding-1:
          binder: kafka-1-with-ssl
          destination: <destination-1>
          content-type: text/plain
          group: <group-id-1>
          consumer:
            header-mode: headers
        binding-2:
          binder: kafka-2-with-ssl
          destination: <destination-2>
          content-type: text/plain
          group: <group-id-2>
          consumer:
            header-mode: headers
            

      binders:
        kafka-1-with-ssl:
          type: kafka
          defaultCandidate: false
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder: 
                      brokers: <broker-hostnames-1>
                      configuration:
                        ssl:
                          truststore:
                            location: <location-1>
                            password: <ts-password-1>
                            type: JKS
                      jaas:
                        loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
                        options:
                          username: <username-1>
                          password: <password-1>

        kafka-2-with-ssl:
          type: kafka
          defaultCandidate: false
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder: 
                      brokers: <broker-hostnames-2>
                      configuration:
                        ssl:
                          truststore:
                            location: <location-2>
                            password: <ts-password-2>
                            type: JKS
                      jaas:
                        loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
                        options:
                          username: <username-2>
                          password: <password-2>
      kafka:
        binder:
          configuration:
            security:
              protocol: SASL_SSL
            sasl:
              mechanism: SCRAM-SHA-256 
            ssl:
              truststore:
                location: <location-2>
                password: <ts-password-2> 
                type: JKS
          jaas:
            loginModule: org.apache.kafka.common.security.scram.ScramLoginModule
             options:
               username: <username-2>
               password: <password-2> 

Assure you that nothing is wrong with the ssl credentials. Tested diligently with either of the ssl-kafka-binder successfully getting created individually. The aim is to connect to multiple kafka binders with SASL_SSL protocol. Thanks in advance.

1
Just out of curiosity, what happens if you only have a single cluster defined in the multi-binder configuration? i.e. can you test with a single binder and see if that throws any exceptions? I just want to rule out the possibility of something is overriding your configurations.sobychacko

1 Answers

0
votes

I think you may want to follow the solutions implemented in KIP-85 for this issue. Instead of using the Spring Cloud Stream Kafka binder provided JAAS configuration or setting the java.security.auth.login.config property, use the sasl.jaas.config property which takes precedence over other methods. By using sasl.jaas.config, you can override the restriction placed by JVM in which a JVM-wide static security context is used, thus ignoring any subsequent JAAS configurations found after the first one.

Here is a sample application that demonstrates how to connect to multiple Kafka clusters with different security contexts as a multi-binder application.