0
votes

I am trying to implement a Spring Cloud Dataflow stream that reads records from a database, passes these to a processor which converts into a Avro schema and then pass this to be consumed by a sink application.

I have the data flowing from the SQL DB to my source app and passing the data across via the Kafka binder with no issues by I am running into problems sending the data across from the Processor to the the Sink application serializing/deserializing with Avro.

I have created a avro schema called ech.avsc and have generated a class called EchRecord for it using the avro-maven-plugin within the Processor.

I have added the following dependencies to the pom of both processor and sink

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-schema</artifactId>
    <version>1.2.2.RELEASE</version>
</dependency>
<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro</artifactId>
  <version>1.8.2</version>
</dependency>

I have set the properties of the processor to

spring.cloud.stream.bindings.output.contentType=application/*+avro
spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled=true
spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990

on the Sink side the properties look like spring.cloud.stream.schemaRegistryClient.endpoint=http://192.168.99.100:8990

The Processor application code looks as follows:

@EnableBinding(Processor.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class EchProcessorApplication {

private static Logger logger = LoggerFactory.getLogger(EchProcessorApplication.class);

public static void main(String[] args) {
    SpringApplication.run(EchProcessorApplication.class, args);
}


@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public EchRecord transform(List<Map<String, Object>> record) {
    return EchRecord.newBuilder()
            .setCallId(11111).build();;
}

On the Sink side the code as it stands looks like as follows:

@EnableBinding(Sink.class)
@SpringBootApplication
@EnableSchemaRegistryClient
public class AvroLoggerApplication {



    private static Logger LOGGER = LoggerFactory.getLogger(AvroLoggerApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(AvroLoggerApplication.class, args);
    }


    @StreamListener(Sink.INPUT)
    public void logHandler(Object data) {

        LOGGER.info("data='{}'", data.toString());
        LOGGER.info("class='{}'", data.getClass());


    }
}

I have a Spring Schema Registry Server running and reachable by both applications and I can see on querying the registry that the schema has been delivered to the server.

I can see if I enable debug logging on the sink application that the contentType is being set correctly on the received messages: contentType=application/vnd.echrecord.v1+avro

In the Sink application I have setup a method with the @StreamListener annotation to retrieve the messages taking in an Object and printing out the data and the class type and it appears to be retrieving a byte array.

How do I go about changing the code of the Sink application to deserialize the Avro message into something that I can retrieve the set data from?

1
Could you provide a small sample application (source and sink) where we can reproduce the issue? No need to use any db source, just a basic source serializing using Avro and a consumer deserializing it. This way, its easier to troubleshoot.sobychacko

1 Answers

1
votes

A couple of things to try here. On the producing side, since your type is already an Avro type (SpecificRecord or GenericRecord) you don't need the dynamicSchemaGeneration flag, that's meant for reflection based writers, mostly for testing as it has an impact on performance.

Since your sink can see the correct type as you posted, what you need now is to have your type on the sink. So for instance add the type on the sink and annotate the method with the proper type: EchRecord that will give you the right type.

You can also set it to be GenericRecord in order to be able to access it like an object container using record.get(<propertyname>)