3
votes

I have the following scenario:

My application looks like this:

@SpringBootApplication
@EnableBinding(Sink.class)
public class MyApplication {
  private static Logger log = LoggerFactory.getLogger(MyApplication.class);

  public static void main(String[] args) {
    SpringApplication.run(MyApplication.class, args);
  }

  @StreamListener(Sink.INPUT)
  public void myMessageSink(MyMessage message) {
    log.info("Received new message: {}", message);
  }
}

Whereas MyMessage is the class created by Avro from the Avro schema.

My application.properties looks like this:

spring.cloud.stream.bindings.input.destination=myTopic
spring.cloud.stream.bindings.input.group=${spring.application.name}
spring.cloud.stream.bindings.input.contentType=application/*+avro

My problem is now that every time a new message is received, the following exception is thrown:

org.springframework.messaging.MessagingException: Exception thrown while invoking MyApplication#myMessageSink[1 args]; nested exception is org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:316) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    ...
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:335) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:321) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) ~[avro-1.8.1.jar:1.8.1]
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertFromInternal(AbstractAvroMessageConverter.java:91) ~[spring-cloud-stream-schema-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:175) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:67) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:117) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:307) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    ... 35 common frames omitted

From what I understand, the problem is that the Confluent stack includes the ID of the schema of the message as part of the message payload and clients are expected to start reading the actual Avro message after the schema ID. It seems I need to configure the Kafka binding to use Confluent's KafkaAvroDeserializer but I cannot figure out how to achieve this.

(I can perfectly fine retrieve the messages using Confluent's avro console consumer so it doesn't seem to be an issue with the Avro encoding)

I also tried playing around with the @EnableSchemaRegistry annotation and configuring a ConfluentSchemaRegistryClient bean, but it looks to me like this does only control where the schemas are stored/retrieved from but not the actual deserialization.

Is this even supposed to be working somehow?

2

2 Answers

0
votes

Does it work when setting the per-binding property spring.cloud.stream.kafka.bindings.input.consumer.configuration.value.deserializer to have Confluent's KafkaAvroDeserializer class name?

0
votes

Sort of answering my own question. What I did for now is implement a MessageConverter which just removes the first 4 bytes of any message before passing them to the Avro decoder. The code is mostly taken from spring-cloud-stream's AbstractAvroMessageConverter:

public class ConfluentAvroSchemaMessageConverter extends AvroSchemaMessageConverter {

public ConfluentAvroSchemaMessageConverter() {
    super(new MimeType("application", "avro+confluent"));
}

@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
    Object result = null;
    try {
        byte[] payload = (byte[]) message.getPayload();

        // byte array to contain the message without the confluent header (first 4 bytes)
        byte[] payloadWithoutConfluentHeader = new byte[payload.length - 4];
        ByteBuffer buf = ByteBuffer.wrap(payload);
        MimeType mimeType = getContentTypeResolver().resolve(message.getHeaders());
        if (mimeType == null) {
            if (conversionHint instanceof MimeType) {
                mimeType = (MimeType) conversionHint;
            }
            else {
                return null;
            }
        }

        // read first 4 bytes and copy the rest to the new byte array
        // see https://groups.google.com/forum/#!topic/confluent-platform/rI1WNPp8DJU
        buf.getInt();
        buf.get(payloadWithoutConfluentHeader);

        Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType);
        Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass);
        DatumReader<Object> reader = getDatumReader((Class<Object>) targetClass, readerSchema, writerSchema);
        Decoder decoder = DecoderFactory.get().binaryDecoder(payloadWithoutConfluentHeader, null);
        result = reader.read(null, decoder);
    }
    catch (IOException e) {
            throw new MessageConversionException(message, "Failed to read payload", e);
    }
    return result;

}

I then set the content type for the incoming Kafka topic to application/avro+confluent via application.properties.

This at least lets me retrieve messages encoded with the Confluent stack, but of course it does not interact with the schema registry in any way.