So I think I've run myself into confusion as I understand there are two different kafka binders for SpringCloudStreams:
- Spring Cloud Streams Kafka Binder
- Spring Cloud Streams Kafka Streams Binder
I'm looking for the correct YAML settings to define the serializer and deserializer in the normal kafka binder for spring cloud streams:
I can tweak the defaults using this logic:
spring:
main:
web-application-type: NONE
application:
name: tbfm-translator
kafka:
consumer:
group-id: ${consumer_id}
bootstrap-servers: ${kafka_servers}
cloud:
schemaRegistryClient:
endpoint: ${schema_registry}
stream:
# default:
# producer.useNativeEncoding: true
# consumer.useNativeEncoding: true
defaultBinder: kafka
kafka:
binder:
auto-add-partitions: true # I wonder if its cause this is set
auto-create-topics: true # Disabling this seem to override the server setings and will auto create
producer-properties:
# For additional properties you can check here:
# https://docs.confluent.io/current/installation/configuration/producer-configs.html
schema.registry.url: ${schema_registry}
# Disable for auto schema registration
auto.register.schemas: false
# Use only the latest schema version
use.latest.version: true
# This will use reflection to generate schemas from classes - used to validate current data set
# against the scheam registry for valid production
schema.reflection: true
# To use an avro key enable the following line
#key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
#This will use a string based key - aka not in the registry - dont need a name strategy with string serializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
# This will control the Serializer Setup
value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
which is:
spring.cloud.stream.kafka.binder.producer-properties.value.serializer
spring.cloud.stream.kafka.binder.producer-properties.key.serializer
I figure I should be able to do this on a per-topic basis:
spring:
cloud:
stream:
bindings:
my-topic:
destination: a-topic
xxxxxxxx??
I've come across setting:
producer:
use-native-encoding: false
keySerde: <CLASS>
But this doesn't seem to be working. Is there an easy property I can set to do this on the per-topic basis? I think the keySerde
is for the Kafka-streams implementation not the normal kafka binder.