I'm recently creating a micro-service using Spring Cloud Stream with Kafka. I'm new in both worlds so please forgive me if I asked a stupid question.
The flow of this service is to consume from a topic, transform the data and then produce the result to various topics.
The data that comes into the consumer topic are database changes. Basically we have another service monitoring the database log and produce the changes to the topic that my new service is consuming from.
In my new service I have defined consumer and producer bindings. The database data comes in as byte[]
format and the consumer would read the data and decode the byte[]
into a Java object DBData
, then based on the table name different conversion will be performed.
Please see the following as the code example.
@StreamListener
@SendTo("OUTPUT_MOCK")
public KStream<String, Mock> process(@Input("DB_SOURCE") KStream<String, byte[]> input) {
return input
.map(((String key, byte[] bytes) -> {
try {
return new KeyValue<>(key, DBDecoder.decode(bytes)); // decodes byte[] into DBData
} catch (Exception e) {
return new KeyValue<>(key, null);
}
}))
.filter((key, v) -> (v instanceof DBData)) // filter value type
.map((key, v) -> new KeyValue<>(key, (DBData) v))
.filter((String key, DBData v) -> v.getTableName().equalse("MOCK")) // check the table name
.flatMap((String key, DBData v) -> extractMockDataChanges(v)); // convert to Mock object
}
From the code example you can see that the DB data comes in and then it's being decoded into DBData
format. Then the result is being filter based on the table name and eventually produce the converted Mock
object to the OUTPUT_MOCK
topic.
This code works perfectly but my main issue is the DBData
transformation part. This OUTPUT_MOCK
topic is one of the many other producer topics. I have to do this for many other tables and each time I would have to repeat the decoding process, this seems to be unnecessary and redundant.
Is there a better way to handle the database data transformation so the transformed DBData
would be available for other stream processors?
PS: I looked into the state store, but it seems to be overkill as Kafka would serialize the data when adding them inside the store and upon extraction it will deserialize them. So there are extra overhead which I'm trying to avoid.
branch
method inKStream
API and use a predicate to check the table name. That way, you can have a predicate for each of your tables and then redirect to the output topic accordingly. – sobychacko