2
votes

I am trying to integrate SpringBoot Application with Kafka Schema Registry. I have created a kafkaPrducer which will send message to Kafka Topic after validating to Schema Registry:

public class Producer {
      @Value("${topic.name}")
      private final String TOPIC;
      private final KafkaTemplate<Integer, Data> kafkaTemplate;
      @Autowired
      public Producer(KafkaTemplate<Integer, Data> kafkaTemplate) {
         this.kafkaTemplate = kafkaTemplate;
      }
     public void sendTestEvent(Data data) throws Exception {
     System.out.println("started");
     Integer key = data.getTestEventId();
      this.kafkaTemplate.send(this.TOPIC,key,data);
}

My application.properties file

 server.port=8084
topic.name=test-topic
server.servlet.context-path=/api/v1
spring.application.name=kafkatest
spring.kafka.bootstrap-servers=*************.com:9093
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.jaas.enabled=true
spring.kafka.properties.security.protocol= SASL_SSL
spring.kafka.properties.security.krb5.config = file:/etc/krb5.conf
spring.kafka.properties.sasl.mechanism = GSSAPI
spring.kafka.properties.sasl.kerberos.service.name= kafka
spring.kafka.properties.sasl.jaas.config = com.sun.security.auth.module.Krb5LoginModule required 
useTicketCache=false serviceName="kafka" storeKey=true principal="***************" useKeyTab=true 
keyTab="/home/api/config/kafkaclient.keytab";
spring.kafka.ssl.trust-store-location= file:/home/api/config/truststore.p12
spring.kafka.ssl.trust-store-password=*********************
spring.kafka.ssl.trust-store-type= PKCS12
spring.kafka.basic.auth.credentials.source=USER_INFO 
spring.kafka.basic.auth.user.info=<username>:<password>
spring.kafka.schema.registry.url=https://schema-registry-*****************/subjects/test- 
topic/versions/latest

But i am getting error:

io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005

> 2020-06-28 22:24:59.047  INFO 2019 --- [nio-8084-exec-1] o.a.k.c.s.authenticator.AbstractLogin    : Successfully logged in.
2020-06-28 22:24:59.054  INFO 2019 --- [ha*********] o.a.k.c.security.kerberos.KerberosLogin  : [Principal=ha*********]: TGT refresh thread started.
2020-06-28 22:24:59.065  INFO 2019 --- [ha**********] o.a.k.c.security.kerberos.KerberosLogin  : [Principal=ha*********]: TGT valid starting at: Sun Jun 28 22:25:07 IST 2020
2020-06-28 22:24:59.067  INFO 2019 --- [ha*********] o.a.k.c.security.kerberos.KerberosLogin  : [Principal=ha*********]: TGT expires: Mon Jun 29 08:25:07 IST 2020
2020-06-28 22:24:59.072  INFO 2019 --- [ha*********] o.a.k.c.security.kerberos.KerberosLogin  : [Principal=ha*********]: TGT refresh sleeping until: Mon Jun 29 06:28:47 IST 2020
2020-06-28 22:24:59.307  WARN 2019 --- [nio-8084-exec-1] o.a.k.clients.producer.ProducerConfig    : The configuration 'security.krb5.config' was supplied but isn't a known config.
2020-06-28 22:24:59.334  INFO 2019 --- [nio-8084-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 2.0.1
2020-06-28 22:24:59.340  INFO 2019 --- [nio-8084-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fa14************
2020-06-28 22:25:03.041  INFO 2019 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: GlUYY***********
2020-06-28 22:25:04.994 ERROR 2019 --- [nio-8084-exec-1] o.a.c.c.C.[.[.[.[dispatcherServlet]      : Servlet.service() for servlet [dispatcherServlet] in context with path [/api/v1] threw exception [Request processing failed; nested exception is org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"Event","namespace":"com.*******.kafka.avro.event.sample","fields":[{"name":"event_envelope","type":{"type":"record","name":"EventEnvelope","fields":[{"name":"data","type":{"type":"record","name":"Data","fields":[{"name":"testEventId","type":"int"},{"name":"test","type":{"type":"string","avro.java.string":"String"}}]},"default":{}}]}}]}] with root cause

io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 2]; error code: 50005
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198) ~[kafka-schema-registry-client-5.3.1.jar!/:na]
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70) ~[kafka-avro-serializer-5.3.0.jar!/:na]
        at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-5.3.0.jar!/:na]
        at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-2.0.1.jar!/:na]
        at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-2.0.1.jar!/:na]
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841) ~[kafka-clients-2.0.1.jar!/:na]
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803) ~[kafka-clients-2.0.1.jar!/:na]
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444) ~[spring-kafka-2.2.6.RELEASE.jar!/:2.2.6.RELEASE]
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:381) ~[spring-kafka-2.2.6.RELEASE.jar!/:2.2.6.RELEASE]
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:199) ~[spring-kafka-2.2.6.RELEASE.jar!/:2.2.6.RELEASE]
        at io.confluent.developer.spring.avro.Producer.sendTestEvent(Producer.java:58) ~[classes!/:0.0.1-SNAPSHOT]
        at io.confluent.developer.spring.avro.KafkaController.sendMessageToKafkaTopic(KafkaController.java:31) ~[classes!/:0.0.1-SNAPSHOT]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
        at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
        at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:190) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:104) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1039) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:908) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:660) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882) ~[spring-webmvc-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.7.RELEASE.jar!/:5.1.7.RELEASE]
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:836) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1747) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
        at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-9.0.19.jar!/:9.0.19]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]

Since I am using https schema registry do I have to set any other properties in application.properties like keystore and trustore certificate?

3
could you post your Schema?Armando Ballaci
Try using version 5.5.0 of the Avro serializerOneCricketeer
@ArmandoBallaci I am using 5.3 but facing sane issue My Schema is: { "type": "record", "namespace": "com.kafka.avro.event.sample", "name": "data", "fields": [{ "name": "testEventId", "type": "int" }, { "name": "test", "type": "string" }] }Bunny
@OneCricketeer I even tried with 5.5 but same error. Is this error related to authentication with schema registary? I am using basic authentication. When I am trying to do curl to the schema registry with credentials. I am able to fetch schema.Bunny

3 Answers

1
votes

I am now able to post a message to kafka in Avro format after validating through schema registary.

The mistake which I was making is related to subject name of the schema. By default Avro schema uses TopicNameStrategy which means it will look for subject name which look like this topic name-value. If it doesn't find it will throwing error not able to retrieve schema because schema is not present with that subject name.

I changed the subject name of schema to test-topic-value and able to post message onto test-topic.

I hope this will help and save someone time.

0
votes

This is because schema-registry is down. You need to verify in logs what's the exact issue.

Command to see schema-registry logs: confluent local services schema-registry log -f (-f make it live) In my case, I had a conflict port usage(another tool was using 8081). Logs talk, just need to hear it.

-1
votes
<!-- https://mvnrepository.com/artifact/io.confluent/kafka-avro-serializer -->
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>5.3.0</version>
        </dependency>