0
votes

I'm developing a new Kafka application using Spring Cloud Stream. I followed the new functional bindings introduced recently.

From the docs it says that the binder will try to match some common types from Kafka Streams such as Integer, String, byte[], etc.

I have a custom type and according to the doc I need to create a custom Serde.

My custom Serde bean definition looks like the following:

@Bean
public Serde<Foo> fooSerde() {...}

This Stream binding works as expected.

@Bean
public Function<KStream<String, byte[]>, KStream<String, Foo[]>> process() {...}

However if I change to the following, the binder is unable to find my custom Serde bean and it defaults to the Json Serde.

@Bean
public Function<KStream<String, byte[]>, KStream<String, ?[]>> process() {...}

Please note the "?" as value type of the KStream. Is this the expected behaviour or I missed something here?

I can definitely go back to the binding inside application.yml such as spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=Foo.class but I'm having trouble creating such no-arg Serde due to dependency injection. I need to rely on SchemaRegistryClient to be injected into my Serde since I'm using AVRO, with this I cannot have no-arg constructor in my custom Serde requested by Kafka, unless I go with the anti-pattern to make SchemaRegistryClient static.

Any help would be appreciated!

1

1 Answers

0
votes

Time to answer my own question.

The issue is with the wildcard "?". With this wildcard being the value for KStream spring is unable to hook the serde beans. So the solution is to have the strong type in the parameter. Then everything works well.