I would like to test Spring Cloud Stream with Confluent Schema Registry and Avro Schema evolution to integrate it with my application. I have found out that Spring Cloud Stream does not support a secure connection to Confluent Schema Registry and the implementation is still very basic. Hence, I have decided to use Confluent Schema Registry Client in conjunction with Spring Kafka for the schema registry part and use Spring Cloud Stream for the rest. Here is the code for Consumer and Producer as well as the Avro Schema:
Consumer:
import com.example.Sensor;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerApplication {
private final Log logger = LogFactory.getLog(getClass());
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Sensor data) {
logger.info(data);
}
@Configuration
static class ConfluentSchemaRegistryConfiguration {
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(endpoint, 100);
return client;
}
}
}
Application.yaml:
spring.kafka.properties.schema.registry.url: kafka-url:kafka-port
spring:
cloud:
stream:
bindings:
input:
destination: sensor-topic
schemaRegistryClient:
endpoint: https://user:password@schema-registry-url:schema-registry-port
schema:
avro:
schema-locations: classpath:avro/sensor.avsc
kafka:
binder:
brokers: SSL://kafka-url:kafka-port
configuration:
security.protocol: SSL
ssl.truststore.type: JKS
ssl.truststore.location: client.truststore.jks
ssl.truststore.password: secret
ssl.keystore.type: PKCS12
ssl.keystore.location: client.keystore.p12
ssl.keystore.password: secret
ssl.key.password: secret
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
basic.auth.credentials.source: USER_INFO
basic.auth.user.info: user:password
server.port: 9999
Producer:
import com.example.Sensor;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Random;
import java.util.UUID;
@SpringBootApplication
@EnableBinding(Source.class)
@RestController
public class Producer1Application {
@Autowired
private Source source;
private Random random = new Random();
public static void main(String[] args) {
SpringApplication.run(Producer1Application.class, args);
}
@RequestMapping(value = "/messages", method = RequestMethod.POST)
public String sendMessage() {
source.output().send(MessageBuilder.withPayload(randomSensor()).build());
return "ok, have fun with v1 payload!";
}
private Sensor randomSensor() {
Sensor sensor = new Sensor();
sensor.setId(UUID.randomUUID().toString() + "-v1");
sensor.setAcceleration(random.nextFloat() * 10);
sensor.setVelocity(random.nextFloat() * 100);
sensor.setTemperature(random.nextFloat() * 50);
return sensor;
}
//Another convenience POST method for testing deterministic values
@RequestMapping(value = "/messagesX", method = RequestMethod.POST)
public String sendMessageX(@RequestParam(value="id") String id, @RequestParam(value="acceleration") float acceleration,
@RequestParam(value="velocity") float velocity, @RequestParam(value="temperature") float temperature) {
Sensor sensor = new Sensor();
sensor.setId(id + "-v1");
sensor.setAcceleration(acceleration);
sensor.setVelocity(velocity);
sensor.setTemperature(temperature);
source.output().send(MessageBuilder.withPayload(sensor).build());
return "ok, have fun with v1 payload!";
}
@Configuration
static class ConfluentSchemaRegistryConfiguration {
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
CachedSchemaRegistryClient client = new CachedSchemaRegistryClient(endpoint, 100);
return client;
}
}
}
Application.yaml
spring.kafka.properties.schema.registry.url: kafka-url:kafka-port
spring:
cloud:
stream:
bindings:
output:
contentType: application/*+avro
destination: sensor-topic
schemaRegistryClient:
endpoint: https://user:password@schema-registry-url:schema-registry-port
schema:
avro:
schema-locations: classpath:avro/sensor.avsc
kafka:
binder:
brokers: SSL://kafka-url:kafka-port
configuration:
security.protocol: SSL
ssl.truststore.type: JKS
ssl.truststore.location: client.truststore.jks
ssl.truststore.password: secret
ssl.keystore.type: PKCS12
ssl.keystore.location: client.keystore.p12
ssl.keystore.password: secret
ssl.key.password: secret
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
basic.auth.credentials.source: USER_INFO
basic.auth.user.info: user:password
server.port: 9009
Consumer Schema:
{
"namespace" : "com.example",
"type" : "record",
"name" : "Sensor",
"fields" : [
{"name":"id","type":"string"},
{"name":"internalTemperature", "type":"float", "default":0.0, "aliases":["temperature"]},
{"name":"externalTemperature", "type":"float", "default":0.0},
{"name":"acceleration", "type":"float","default":0.0},
{"name":"velocity","type":"float","default":0.0}
]
}
Producer Schema:
{
"namespace" : "com.example",
"type" : "record",
"name" : "Sensor",
"fields" : [
{"name":"id","type":"string"},
{"name":"temperature", "type":"float", "default":0.0},
{"name":"acceleration", "type":"float","default":0.0},
{"name":"velocity","type":"float","default":0.0}
]
}
I have stuck with the following exception. I would appreciate it if you could help me to understand what the issue is.
java.lang.IllegalStateException: Failed to convert message: 'GenericMessage [payload={"id": "a50a7646-a1c9-49a2-a7f6-f09b77fc3116-v1", "temperature": 10.430967, "acceleration": 5.434994, "velocity": 70.19337}, headers={id=de6e40e4-ba8f-9dca-1753-c06a1751e2d4, contentType=application/*+avro, timestamp=1556160207806}]' to outbound message.
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$OutboundContentTypeConvertingInterceptor.doPreSend(MessageConverterConfigurer.java:325) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$AbstractContentTypeInterceptor.preSend(MessageConverterConfigurer.java:353) ~[spring-cloud-stream-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:608) ~[spring-integration-core-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:443) ~[spring-integration-core-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at sample.producer1.Producer1Application.sendMessage(Producer1Application.java:39) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:189) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:138) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:892) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:797) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1038) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:942) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1005) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:908) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:660) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:882) ~[spring-webmvc-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:741) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.17.jar:9.0.17]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.springframework.boot.actuate.web.trace.servlet.HttpTraceFilter.doFilterInternal(HttpTraceFilter.java:90) ~[spring-boot-actuator-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:92) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:93) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.filterAndRecordMetrics(WebMvcMetricsFilter.java:117) ~[spring-boot-actuator-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:106) ~[spring-boot-actuator-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:200) ~[tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:490) [tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:139) [tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) [tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:408) [tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:834) [tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1415) [tomcat-embed-core-9.0.17.jar:9.0.17]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.17.jar:9.0.17]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_191]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_191]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.17.jar:9.0.17]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_191]
P.S: I am using Confluent Client 5.2.1, Avro 1.8.2, Spring Boot 2.1.4.RELEASE and Spring Cloud Stream Fishtown.RELEASE.