0
votes

My requirement is to connect the Kafka topic through the SSL with Spring Boot and Apache Camel, for that, I have written the below code but I'm facing an error like Caused by sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target this

anyone, please help me how to resolve this error.

//in this code i'm configured the SSL
    @Configuration
    public class Testing {
        @Bean
         SSLContextParameters sslContextParameters(){
            KeyStoreParameters store = new KeyStoreParameters();
            store.setResource("kafka.client.truststore.jks");
            store.setPassword("123456");
    
            TrustManagersParameters trust = new TrustManagersParameters();
            trust.setKeyStore(store);
    
            SSLContextParameters parameters = new SSLContextParameters();
            parameters.setTrustManagers(trust);
    
            return parameters;
          }
    
    }

In the below file, I'm calling router with sslContextParameters parameter

    @Autowired
    SSLContextParameters params;
@Override
    public void configure() throws Exception {
    from("{{timerOnce}}").process(consumerCreate).to(
                "https://xx.xx.xx.xxx/consumers/group-id?sslContextParameters=params");

}

****** I've Used Another Approach for Connecting Kafka Cluster through SSL but no luck it's getting exception like this ****** org.apache.camel.spring.boot.CamelSpringBootInitializationException: java.io.IOException: Invalid Keystore format

Below code, I enabled SSL

public Endpoint setupSSLConext(CamelContext camelContext) throws Exception {

        KeyStoreParameters keyStoreParameters = new KeyStoreParameters();
        // Change this path to point to your truststore/keystore as jks files
        keyStoreParameters.setResource("kafka.client.truststore.jks");
        keyStoreParameters.setPassword("123456");

        KeyManagersParameters keyManagersParameters = new KeyManagersParameters();
        keyManagersParameters.setKeyStore(keyStoreParameters);
        keyManagersParameters.setKeyPassword("123456");

        TrustManagersParameters trustManagersParameters = new TrustManagersParameters();
        trustManagersParameters.setKeyStore(keyStoreParameters);

        SSLContextParameters sslContextParameters = new SSLContextParameters();
        sslContextParameters.setKeyManagers(keyManagersParameters);
        sslContextParameters.setTrustManagers(trustManagersParameters);

        HttpComponent httpComponent = camelContext.getComponent("https4", HttpComponent.class);
        httpComponent.setSslContextParameters(sslContextParameters);


        // This is important to make your cert skip CN/Hostname checks
        httpComponent.setX509HostnameVerifier(new X509HostnameVerifier() {
            @Override
            public void verify(String s, SSLSocket sslSocket) throws IOException {

            }

            @Override
            public void verify(String s, X509Certificate x509Certificate) throws SSLException {

            }

            @Override
            public void verify(String s, String[] strings, String[] strings1) throws SSLException {

            }

            @Override
            public boolean verify(String s, SSLSession sslSession) {
                // I don't mind just return true for all or you can add your own logic
                return true;
            }

        });

        return     httpComponent.createEndpoint("https://XX.XX.X.XXX/consumers/");
    }

Below code in the router I used ENDPOINT

    public void configure() throws Exception {

        Endpoint createEndpoint = cdcHelper.setupSSLConext(context);

        from("{{timerOnce}}").process(consumerCreate)
                .to(createEndpoint);    // calling kafka consumer 

    }
}
1

1 Answers

0
votes

You can follow below approach to set up a Kafka consumer using Apache Camel & SpringBoot.

add the below properties to your application.properties

# kafka configuration
kafka.topic=iot1
kafka.camelKafkaOptions.groupId=grp1
kafka.camelKafkaOptions.brokers=kafka.localtest:9093
kafka.camelKafkaOptions.consumersCount=10
kafka.camelKafkaOptions.autoOffsetReset=latest
kafka.camelKafkaOptions.autoCommitEnable=false
kafka.camelKafkaOptions.allowManualCommit=true
kafka.camelKafkaOptions.metadataMaxAgeMs=5000
kafka.camelKafkaOptions.securityProtocol=SSL
kafka.camelKafkaOptions.sslEndpointAlgorithm=HTTPS
kafka.camelKafkaOptions..sslKeyPassword=<ssl key password>
kafka.camelKafkaOptions..sslKeystoreLocation=<keystorepath>
kafka.camelKafkaOptions.sslKeystorePassword=<sslkeystore password>
kafka.camelKafkaOptions.sslTruststoreLocation=<truststore path>
kafka.camelKafkaOptions.sslTruststorePassword=<password>

and create a utility method, to construct a kafka url

@Component
public class KafkaUtility {
      public String getKafkaEndpoint(String topicName ){
       StringBuilder urlBuilder = new StringBuilder("kafka:" + topicName);

        if (!getCamelKafkaOptions().isEmpty()) {
            urlBuilder.append("&");
            getCamelKafkaOptions().forEach(
                (key, value) -> {
                    if (StringUtils.isNotBlank(value)) {
                        appendConfig(urlBuilder, key, value);
                    }
                }
            );
        }
        // strip the last "&" symbol
         String kafkaURL = stripLastAnd(urlBuilder.toString());
        return kafkaURL;
   }
}

In your route builder, implement the below

    @Autowired
    private KafkaUtility kafkaUtility;

  from(kafkaUtility.getKafkaEndpoint())
  .process("yourprocessor")
  .to("tourl");