I am trying to implement a Spring Cloud Dataflow stream that reads records from a database, passes these to a processor which converts into a Avro schema and then pass this to be consumed by a sink application.
I have the data flowing from the SQL DB to my source app and passing the data across via the Kafka binder with no issues by I am running into problems sending the data across from the Processor to the the Sink application serializing/deserializing with Avro.
I have created a avro schema called ech.avsc and have generated a class called EchRecord for it using the avro-maven-plugin within the Processor.
I have added the following dependencies to the pom of both processor and sink
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
I have set the properties of the processor to
spring.cloud.stream.bindings.output.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990
on the Sink side the properties look like
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990
The Processor application code looks as follows:
@EnableBinding(Processor.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class EchProcessorApplication {
private static Logger logger = LoggerFactory.getLogger(EchProcessorApplication.class);
public static void main(String[] args) {
SpringApplication.run(EchProcessorApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public EchRecord transform(List<Map<String, Object>> record) {
return EchRecord.newBuilder()
.setCallId(11111).build();;
}
On the Sink side the code as it stands looks like as follows:
@EnableBinding(Sink.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class AvroLoggerApplication {
private static Logger LOGGER = LoggerFactory.getLogger(AvroLoggerApplication.class);
public static void main(String[] args) {
SpringApplication.run(AvroLoggerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void logHandler(Object data) {
LOGGER.info("data='{}'", data.toString());
LOGGER.info("class='{}'", data.getClass());
}
}
I have a Spring Schema Registry Server running and reachable by both applications and I can see on querying the registry that the schema has been delivered to the server.
I can see if I enable debug logging on the sink application that the contentType is being set correctly on the received messages: contentType=application/vnd.echrecord.v1+avro
In the Sink application I have setup a method with the @StreamListener annotation to retrieve the messages taking in an Object and printing out the data and the class type and it appears to be retrieving a byte array.
How do I go about changing the code of the Sink application to deserialize the Avro message into something that I can retrieve the set data from?