I'm developing a new Kafka application using Spring Cloud Stream. I followed the new functional bindings introduced recently.
From the docs it says that the binder will try to match some common types from Kafka Streams such as Integer
, String
, byte[]
, etc.
I have a custom type and according to the doc I need to create a custom Serde.
My custom Serde bean definition looks like the following:
@Bean
public Serde<Foo> fooSerde() {...}
This Stream binding works as expected.
@Bean
public Function<KStream<String, byte[]>, KStream<String, Foo[]>> process() {...}
However if I change to the following, the binder is unable to find my custom Serde bean and it defaults to the Json Serde.
@Bean
public Function<KStream<String, byte[]>, KStream<String, ?[]>> process() {...}
Please note the "?" as value type of the KStream. Is this the expected behaviour or I missed something here?
I can definitely go back to the binding inside application.yml
such as spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=Foo.class
but I'm having trouble creating such no-arg Serde due to dependency injection. I need to rely on SchemaRegistryClient
to be injected into my Serde since I'm using AVRO, with this I cannot have no-arg constructor in my custom Serde requested by Kafka, unless I go with the anti-pattern to make SchemaRegistryClient
static.
Any help would be appreciated!