2
votes

I have a Spring Cloud Kafka Streams application that uses a StateStore in the Processor API, when using a transformer to perform a deduplication.

The state store key-value are of the following types: <String, TransferEmitted>.

When running the application, at the moment of putting a value in the state store (dedupStore.put(key, value)), I get this exception:

Caused by: java.lang.ClassCastException: com.codependent.outboxpattern.account.TransferEmitted cannot be cast to java.lang.String

This is due to the fact that the default value serde for the KafkaStreamsStateStore is a StringSerde.

Thus, I have added the valueSerde parameter in the KafkaStreamsStateStore annotation, indicating the one for a SpecificAvroSerde:

    @KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE,
            valueSerde = "io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde")

Now I get a NullPointerException in AbstractKafkaAvroSerializer.serializeImpl because at id = this.schemaRegistry.getId(subject, schema); schemaRegistry is null:

Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message Caused by: java.lang.NullPointerException at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82) at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)

Despite having configured the schema registry as a Spring bean...

@Configuration
class SchemaRegistryConfiguration {

    @Bean
    fun schemaRegistryClient(@Value("\${spring.cloud.stream.schema-registry-client.endpoint}") endpoint: String): SchemaRegistryClient {
        val client = ConfluentSchemaRegistryClient()
        client.setEndpoint(endpoint)
        return client
    }

}

...when Kafka sets up the SpecificAvroSerde it uses the no-params constructor so it doesn't initialize the schema registry client:

public class SpecificAvroSerde<T extends SpecificRecord> implements Serde<T> {
    private final Serde<T> inner;

    public SpecificAvroSerde() {
        this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(), new SpecificAvroDeserializer());
    }

    public SpecificAvroSerde(SchemaRegistryClient client) {
        if (client == null) {
            throw new IllegalArgumentException("schema registry client must not be null");
        } else {
            this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(client), new SpecificAvroDeserializer(client));
        }
    }

How can I configure this application so that it allows to serialize a StateStore<String, TransferEmitted>?

EXCERPTS FROM THE PROJECT (source available at https://github.com/codependent/kafka-outbox-pattern)

KStream

const val DEDUP_STORE = "dedup-store"

@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val fraudDetectionService: FraudDetectionService) {

    @KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
    @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
        return input
                .transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)
                .filter { _, value -> fraudDetectionService.isFraudulent(value) }

    }

}

Transformer

@Suppress("UNCHECKED_CAST")
class DeduplicationTransformer : Transformer<String, TransferEmitted, KeyValue<String, TransferEmitted>> {

    private lateinit var dedupStore: KeyValueStore<String, TransferEmitted>
    private lateinit var context: ProcessorContext

    override fun init(context: ProcessorContext) {
        this.context = context
        dedupStore = context.getStateStore(DEDUP_STORE) as KeyValueStore<String, TransferEmitted>
    }

    override fun transform(key: String, value: TransferEmitted): KeyValue<String, TransferEmitted>? {
        return if (isDuplicate(key)) {
            null
        } else {
            dedupStore.put(key, value)
            KeyValue(key, value)
        }
    }

    private fun isDuplicate(key: String) = dedupStore[key] != null

    override fun close() {
    }
}

application.yml

spring:
  application:
    name: fraud-service
  cloud:
    stream:
      schema-registry-client:
        endpoint: http://localhost:8081
      kafka:
        streams:
          binder:
            configuration:
              application:
                id: fraud-service
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              schema:
                registry:
                  url: http://localhost:8081
      bindings:
        input:
          destination: transfer
          contentType: application/*+avro
        output:
          destination: fraudulent-transfer
          contentType: application/*+avro

server:
  port: 8086

logging:
  level:
    org.springframework.cloud.stream: debug

1
Can you debug the application and see if its due to any gap in the binder that this exception is happening? If so, please raise an issue or contribute a fix. Feel free to ping on gitter if you want to chat.sobychacko

1 Answers

0
votes

I ran into the same issue and forgot that schema.registry.url needs to be passed in to make sure that you can store Avro records in your State store.

For eg:

    @Bean
    public StoreBuilder eventStore(Map<String, String> schemaConfig) {
        final Duration windowSize = Duration.ofMinutes(DUPLICATION_WINDOW_DURATION);

        // retention period must be at least window size -- for this use case, we don't need a longer retention period
        // and thus just use the window size as retention time
        final Duration retentionPeriod = windowSize;

        // We have to specify schema.registry.url here, otherwise schemaRegistry value will end up null
        KafkaAvroSerializer serializer = new KafkaAvroSerializer();
        KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
        serializer.configure(schemaConfig, true);
        deserializer.configure(schemaConfig, true);

        final StoreBuilder<WindowStore<Object, Long>> dedupStoreBuilder = Stores.windowStoreBuilder(
                Stores.persistentWindowStore(STORE_NAME,
                        retentionPeriod,
                        windowSize,
                        false
                ),
                Serdes.serdeFrom(serializer, deserializer),
                // timestamp value is long
                Serdes.Long());
        return dedupStoreBuilder;
    }

    @Bean
    public Map<String, String> schemaConfig(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String url) {
        return Collections.singletonMap("schema.registry.url", "http://localhost:8081");
    }

Here's the application.yml file:

spring:
  cloud:
    stream:
      schemaRegistryClient:
        endpoint: http://localhost:8081

After I did this, I was able to get this Store properly configured and didn't see a NullPointerException anymore.