0
votes

Do anyone has a hello world example of reading message as stream using kafka stream and spring boot.

My kafka cluster is SASL_ SSL secured. So how do I connect my spring boot kafka stream application with. What to write in application.properties file.

I donot want to use spring cloud stream.

server.port=8084 
topic.name=test-topic
server.servlet.context-path=/api/v1
spring.application.name=kafkatest 
spring.kafka.bootstrap-servers=*************.com:9093 
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer 
spring.kafka.jaas.enabled=true 
spring.kafka.properties.security.protocol= SASL_SSL 
spring.kafka.properties.security.krb5.config = file:/etc/krb5.conf 
spring.kafka.properties.sasl.mechanism = GSSAPI 
spring.kafka.properties.sasl.kerberos.service.name= kafka 
spring.kafka.properties.sasl.jaas.config = com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false serviceName="kafka" storeKey=true principal="***************" useKeyTab=true keyTab="/home/api/config/kafkaclient.keytab"; 
spring.kafka.ssl.trust-store-location= file:/home/api/config/truststore.p12 spring.kafka.ssl.trust-store-password=********************* 
spring.kafka.ssl.trust-store-type= PKCS12 
1

1 Answers

0
votes

I did this way.

Add the sasl config in the properties.

>  spring:   
>   kafka:
>     client-id: ${spring.app.name}
>     bootstrap-servers: <cluster_url>:9092
>     properties:
>       ssl.endpoint.identification.algorithm: https
>       sasl.mechanism: PLAIN
>       sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule   required
> username="xxxxx"   password="xxxxxxx";
>       security.protocol: SASL_SSL

And then created a bean which initializes KafkaStreamsConfiguration

@Bean
public KafkaStreamsConfiguration streamsConfig(KafkaProperties kafkaProperties) {
    Map<String, Object> streamsProperties = kafkaProperties.buildStreamsProperties();
    streamsProperties.put(BOOTSTRAP_SERVERS_CONFIG, server);
    streamsProperties.put(APPLICATION_ID_CONFIG, applicationId);
    streamsProperties.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsProperties.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    return new KafkaStreamsConfiguration(streamsProperties);
}

Note that: I'm using KafkaProperties.buildStreamsProperties() to fetch the streams config from properties