4
votes

Issue background

Currently we are using: Kafka Streams API (version 1.1.0) to process messages from Kafka cluster (3 brokers, 3 partitions per topic, with replication factor 2). Installed Kafka is in version 1.1.1 .

End users report us the problem with disappearing data. They report that suddenly they can't see any data (eg. yesterday they could see n records in UI and next day morning table was empty). We checked changelog topic for this particular users and it looked strange, it looks like after few days of inactivity(given key-value pair might be unchanged for days) aggregate value in changelog topic was missing.

Code

KTable assembly line: (messages are grouped by 'username' from the event)


@Bean
public KTable<UsernameVO, UserItems> itemsOfTheUser() {
    return streamsBuilder.stream("application-user-UserItems", Consumed.with(Serdes.String(), serdes.forA(UserItems.class)))
                         .groupBy((key, event) -> event.getUsername(),
                                 Serialized.with(serdes.forA(UsernameVO.class), serdes.forA(UserItems.class)))
                         .aggregate(
                                 UserItems::none,
                                 (key, event, userItems) ->
                                         userItems.after(event),
                                 Materialized
                                         .<UsernameVO, UserItems> as(persistentKeyValueStore("application-user-UserItems"))
                                         .withKeySerde(serdes.forA(UsernameVO.class))
                                         .withValueSerde(serdes.forA(UserItems.class)));
}

Aggregate object (KTable value):


public class UserItems {

private final Map<String, Item> items;

public static UserItems none() {
    return new UserItems();
}

private UserItems() {
    this(emptyMap());
}

@JsonCreator
private UserItems(Map<String, Item> userItems) {
    this.userItems = userItems;
}

@JsonValue
@SuppressWarnings("unused")
Map<String, Item> getUserItems() {
    return Collections.unmodifiableMap(items);
}

...
public UserItems after(ItemAddedEvent itemEvent) {
    Item item = Item.from(itemEvent);

    Map<String, Item> newItems = new HashMap<>(items);
    newItems.put(itemEvent.getItemName(), item);
    return new UserItems(newItems);
}

Kafka topics

application-user-UserItems


There is no problem with this source topic. It has retention set to maximum, all messages are present all the time.


application-user-UserItems-store-changelog (compacted. has default configuration - no retention changed, nor anything)


Here is the strange part. We can observe in the changelog that for some of the users the values are getting lost:

Offset | Partition |   Key   |  Value  
...........................................  
...  
320         0        "User1" : {"ItemName1":{"param":"foo"}}  
325         0        "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}  
1056        0        "User1" : {"ItemName3":{"param":"zyx"}}  
...    

We can see above that at first messages are agregated correctly: there is Item1 that was processed, then Item2 was applied to aggregate. But after some period of time - it might be few days - another event is being processed - the value under underlying "User1" key seems to be missing, and only Item3 is present.

In the application, user has no possibility to remove all items and add another in one action - user can only add or remove item on by one. So if he remove ItemName1 and ItemName2 and then add ItemName3 we expect something like that in the changelog:

Offset | Partition |   Key   |  Value   
..............................................  
...  
320         0        "User1" : {"ItemName1":{"param":"foo"}}   
325         0        "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}   
1054        0        "User1" : {"ItemName2":{"param":"bar"}}   
1055        0        "User1" : {}   
1056        0        "User1" : {"ItemName3":{"param":"zyx"}}  

Conclusion

At first we thought it is related to changelog topic retention (but we checked it and it has only compaction enabled).

application-user-UserItems-store-changelog  PartitionCount:3    ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=104857600   
    Topic: application-user-UserItems-store-changelog   Partition: 0    Leader: 0   Replicas: 0 Isr: 0   
    Topic: application-user-UserItems-store-changelog   Partition: 1    Leader: 2   Replicas: 2 Isr: 2   
    Topic: application-user-UserItems-store-changelog   Partition: 2    Leader: 1   Replicas: 1 Isr: 

Any ideas or hints would be appreciated. Cheers

1
Hard to say. Can it be a issue with a Serde ? A put(key, null) is interpreted as delete -- thus, if, for any reason, on serialization null is returned, the record would get deleted.Matthias J. Sax
This could also happen the other way, if on read, deserialization returns null, we would re-initialize the aggregation via provided Initializer.Matthias J. Sax
Also note, by default, caching might "swallow" some writes to the changelog topic, thus even the uncompacted part, might not reveal all write operations.Matthias J. Sax
Thanks for the hints. We extended the serializer to log all null's that could delete values but it is not it - according to log it is not happening. We are investigating further.AndreyB
@MatthiasJ.Sax FYIAndreyB

1 Answers

0
votes

I have experienced the same problem as you described and it seems that the problem is related to your kafka-streams configuration. You have mentioned that you have the following configuration for your "source" topic:

3 brokers, 3 partitions per topic, with replication factor 2

Make sure you put the following property to your kafka streams configuration(replication.factor) at least to 2 (it is set to 1 by default)

StreamsConfig.REPLICATION_FACTOR_CONFIG [replication.factor]

That corresponds to what you have written as well (replication factor for changelog topic is set to 1)

application-user-UserItems-store-changelog PartitionCount:3 ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=104857600

So, my assumption is that you are loosing data due to broker outage (the data though should preserve in your source topic due to replication factor 2, so you could reprocess and populate changelog topic)