Issue background
Currently we are using: Kafka Streams API (version 1.1.0) to process messages from Kafka cluster (3 brokers, 3 partitions per topic, with replication factor 2). Installed Kafka is in version 1.1.1 .
End users report us the problem with disappearing data. They report that suddenly they can't see any data (eg. yesterday they could see n records in UI and next day morning table was empty). We checked changelog topic for this particular users and it looked strange, it looks like after few days of inactivity(given key-value pair might be unchanged for days) aggregate value in changelog topic was missing.
Code
KTable assembly line: (messages are grouped by 'username' from the event)
@Bean
public KTable<UsernameVO, UserItems> itemsOfTheUser() {
return streamsBuilder.stream("application-user-UserItems", Consumed.with(Serdes.String(), serdes.forA(UserItems.class)))
.groupBy((key, event) -> event.getUsername(),
Serialized.with(serdes.forA(UsernameVO.class), serdes.forA(UserItems.class)))
.aggregate(
UserItems::none,
(key, event, userItems) ->
userItems.after(event),
Materialized
.<UsernameVO, UserItems> as(persistentKeyValueStore("application-user-UserItems"))
.withKeySerde(serdes.forA(UsernameVO.class))
.withValueSerde(serdes.forA(UserItems.class)));
}
Aggregate object (KTable value):
public class UserItems {
private final Map<String, Item> items;
public static UserItems none() {
return new UserItems();
}
private UserItems() {
this(emptyMap());
}
@JsonCreator
private UserItems(Map<String, Item> userItems) {
this.userItems = userItems;
}
@JsonValue
@SuppressWarnings("unused")
Map<String, Item> getUserItems() {
return Collections.unmodifiableMap(items);
}
...
public UserItems after(ItemAddedEvent itemEvent) {
Item item = Item.from(itemEvent);
Map<String, Item> newItems = new HashMap<>(items);
newItems.put(itemEvent.getItemName(), item);
return new UserItems(newItems);
}
Kafka topics
application-user-UserItems
There is no problem with this source topic. It has retention set to maximum, all messages are present all the time.
application-user-UserItems-store-changelog (compacted. has default configuration - no retention changed, nor anything)
Here is the strange part. We can observe in the changelog that for some of the users the values are getting lost:
Offset | Partition | Key | Value
...........................................
...
320 0 "User1" : {"ItemName1":{"param":"foo"}}
325 0 "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}
1056 0 "User1" : {"ItemName3":{"param":"zyx"}}
...
We can see above that at first messages are agregated correctly: there is Item1 that was processed, then Item2 was applied to aggregate. But after some period of time - it might be few days - another event is being processed - the value under underlying "User1" key seems to be missing, and only Item3 is present.
In the application, user has no possibility to remove all items and add another in one action - user can only add or remove item on by one. So if he remove ItemName1 and ItemName2 and then add ItemName3 we expect something like that in the changelog:
Offset | Partition | Key | Value
..............................................
...
320 0 "User1" : {"ItemName1":{"param":"foo"}}
325 0 "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}
1054 0 "User1" : {"ItemName2":{"param":"bar"}}
1055 0 "User1" : {}
1056 0 "User1" : {"ItemName3":{"param":"zyx"}}
Conclusion
At first we thought it is related to changelog topic retention (but we checked it and it has only compaction enabled).
application-user-UserItems-store-changelog PartitionCount:3 ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=104857600
Topic: application-user-UserItems-store-changelog Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: application-user-UserItems-store-changelog Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: application-user-UserItems-store-changelog Partition: 2 Leader: 1 Replicas: 1 Isr:
Any ideas or hints would be appreciated. Cheers
Serde
? Aput(key, null)
is interpreted as delete -- thus, if, for any reason, on serializationnull
is returned, the record would get deleted. – Matthias J. Saxnull
, we would re-initialize the aggregation via providedInitializer
. – Matthias J. Sax