I have a Spring Cloud Kafka Streams application that uses a StateStore in the Processor API, when using a transformer to perform a deduplication.
The state store key-value are of the following types: <String, TransferEmitted>
.
When running the application, at the moment of putting a value in the state store (dedupStore.put(key, value)
), I get this exception:
Caused by: java.lang.ClassCastException: com.codependent.outboxpattern.account.TransferEmitted cannot be cast to java.lang.String
This is due to the fact that the default value serde for the KafkaStreamsStateStore
is a StringSerde
.
Thus, I have added the valueSerde parameter in the KafkaStreamsStateStore
annotation, indicating the one for a SpecificAvroSerde
:
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE,
valueSerde = "io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde")
Now I get a NullPointerException in AbstractKafkaAvroSerializer.serializeImpl
because at id = this.schemaRegistry.getId(subject, schema);
schemaRegistry is null:
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message Caused by: java.lang.NullPointerException at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82) at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
Despite having configured the schema registry as a Spring bean...
@Configuration
class SchemaRegistryConfiguration {
@Bean
fun schemaRegistryClient(@Value("\${spring.cloud.stream.schema-registry-client.endpoint}") endpoint: String): SchemaRegistryClient {
val client = ConfluentSchemaRegistryClient()
client.setEndpoint(endpoint)
return client
}
}
...when Kafka sets up the SpecificAvroSerde
it uses the no-params constructor so it doesn't initialize the schema registry client:
public class SpecificAvroSerde<T extends SpecificRecord> implements Serde<T> {
private final Serde<T> inner;
public SpecificAvroSerde() {
this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(), new SpecificAvroDeserializer());
}
public SpecificAvroSerde(SchemaRegistryClient client) {
if (client == null) {
throw new IllegalArgumentException("schema registry client must not be null");
} else {
this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(client), new SpecificAvroDeserializer(client));
}
}
How can I configure this application so that it allows to serialize a StateStore<String, TransferEmitted>
?
EXCERPTS FROM THE PROJECT (source available at https://github.com/codependent/kafka-outbox-pattern)
KStream
const val DEDUP_STORE = "dedup-store"
@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val fraudDetectionService: FraudDetectionService) {
@KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
return input
.transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)
.filter { _, value -> fraudDetectionService.isFraudulent(value) }
}
}
Transformer
@Suppress("UNCHECKED_CAST")
class DeduplicationTransformer : Transformer<String, TransferEmitted, KeyValue<String, TransferEmitted>> {
private lateinit var dedupStore: KeyValueStore<String, TransferEmitted>
private lateinit var context: ProcessorContext
override fun init(context: ProcessorContext) {
this.context = context
dedupStore = context.getStateStore(DEDUP_STORE) as KeyValueStore<String, TransferEmitted>
}
override fun transform(key: String, value: TransferEmitted): KeyValue<String, TransferEmitted>? {
return if (isDuplicate(key)) {
null
} else {
dedupStore.put(key, value)
KeyValue(key, value)
}
}
private fun isDuplicate(key: String) = dedupStore[key] != null
override fun close() {
}
}
application.yml
spring:
application:
name: fraud-service
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8081
kafka:
streams:
binder:
configuration:
application:
id: fraud-service
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
schema:
registry:
url: http://localhost:8081
bindings:
input:
destination: transfer
contentType: application/*+avro
output:
destination: fraudulent-transfer
contentType: application/*+avro
server:
port: 8086
logging:
level:
org.springframework.cloud.stream: debug