0
votes

So I think I've run myself into confusion as I understand there are two different kafka binders for SpringCloudStreams:

  • Spring Cloud Streams Kafka Binder
  • Spring Cloud Streams Kafka Streams Binder

I'm looking for the correct YAML settings to define the serializer and deserializer in the normal kafka binder for spring cloud streams:

I can tweak the defaults using this logic:

spring:
  main:
    web-application-type: NONE
  application:
    name: tbfm-translator
  kafka:
    consumer:
      group-id: ${consumer_id}
    bootstrap-servers: ${kafka_servers}
  cloud:
    schemaRegistryClient:
      endpoint: ${schema_registry}
    stream:
#      default:
#        producer.useNativeEncoding: true
#        consumer.useNativeEncoding: true
      defaultBinder: kafka
      kafka:
        binder:
          auto-add-partitions: true # I wonder if its cause this is set
          auto-create-topics: true # Disabling this seem to override the server setings and will auto create

          producer-properties:
            # For additional properties you can check here:
            # https://docs.confluent.io/current/installation/configuration/producer-configs.html

            schema.registry.url: ${schema_registry}

            # Disable for auto schema registration
            auto.register.schemas: false

            # Use only the latest schema version
            use.latest.version: true

            # This will use reflection to generate schemas from classes - used to validate current data set
            # against the scheam registry for valid production
            schema.reflection: true

            # To use an avro key enable the following line
            #key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

            #This will use a string based key - aka not in the registry - dont need a name strategy with string serializer
            key.serializer: org.apache.kafka.common.serialization.StringSerializer

            # This will control the Serializer Setup
            value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

which is:

spring.cloud.stream.kafka.binder.producer-properties.value.serializer
spring.cloud.stream.kafka.binder.producer-properties.key.serializer

I figure I should be able to do this on a per-topic basis:


spring:
  cloud:
    stream:
      bindings:
        my-topic:
          destination: a-topic
          xxxxxxxx??

I've come across setting:

          producer:
            use-native-encoding: false
            keySerde: <CLASS>

But this doesn't seem to be working. Is there an easy property I can set to do this on the per-topic basis? I think the keySerde is for the Kafka-streams implementation not the normal kafka binder.

2

2 Answers

1
votes

use-native-encoding must be true to use your own serializers.

spring.cloud.stream.kafka.bindings.my-topic.producer.configuration.value.serializer: ...

See the documentation for kafka-specific producer properties.

configuration

Map with a key/value pair containing generic Kafka producer properties.

Default: Empty map.

0
votes


    stream:
      bindings: # Define output topics here and then again in the kafka.bindings section
        test:
          destination: multi-output
          producer:
            useNativeDecoding: true

      kafka:
        bindings:
          test:
            destination: multi-output
            producer:
              configuration:
                value.serializer: org.apache.kafka.common.serialization.StringSerializer

This seems to work - but very annoying I have to duplicate the binding definition in two places

Makes we want to shy away from the YAML style definition