2
votes

I have a single node kafka broker and simple streams application. I created 2 topics (topic1 and topic2).

Produced on topic1 - processed message - write to topic2

Note: For each message produced only one message is written to destination topic

I produced a single message. After it was written to topic2, I stopped the kafka broker. After sometime I restarted the broker and produced another message on topic1. Now streams app processed that message 3 times. Now without stopping the broker I produced messages to topic1 and waited for streams app to write to topic2 before producing again.

Streams app is behaving strangely. Sometimes for one produced message there are 2 messages written to destination topic and sometimes 3. I don't understand why is this happening. I mean even the messages produced after broker restart are being duplicated.

Update 1:

I am using Kafka version 1.0.0 and Kafka-Streams version 1.1.0

Below is the code.

Main.java

String credentials = env.get("CREDENTIALS");

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "activity-collection");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG, 100000);
props.put(StreamsConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, 200000);
props.put(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG, 60000);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 60000);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG), true);
props.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");

final StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> activityStream = builder.stream("activity_contenturl");
KStream<String, String> activityResultStream = AppUtil.hitContentUrls(credentials , activityStream);
activityResultStream.to("o365_user_activity");

AppUtil.java

public static KStream<String, String> hitContentUrls(String credentials, KStream<String, String> activityStream) {

        KStream<String, String> activityResultStream = activityStream
                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> apply(String value) {

                        ArrayList<String> log = new ArrayList<String>();
                        JSONObject received = new JSONObject(value);
                        String url = received.get("url").toString();

                        String accessToken = ServiceUtil.getAccessToken(credentials);
                        JSONObject activityLog = ServiceUtil.getActivityLogs(url, accessToken);

                        log.add(activityLog.toString());
                    }
                    return log;
                }                   
            });

                return activityResultStream;
    }

Update 2:

In a single broker and single partition environment with the above config, I started the Kafka broker and streams app. Produced 6 messages on source topic and when I started a consumer on destination topic there are 36 messages and counting. They keep on coming.

So I ran this to see consumer-groups:

kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

Output:

streams-collection-app-0

Next I ran this:

kafka_2.11-1.1.0/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group streams-collection-app-0

Output:

TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                                                                HOST            CLIENT-ID
o365_activity_contenturl 0          1               1               0               streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer-3a2940c2-47ab-49a0-ba72-4e49d341daee /127.0.0.1      streams-collection-app-0-244b6f55-b6be-40c4-9160-00ea45bba645-StreamThread-1-consumer

After a while the output showed this:

TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
o365_activity_contenturl 0          1               6               5               -               -               -

And then:

TOPIC                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
o365_activity_contenturl 0          1               7               6               -               -               -
1
What version do you use? Can you show the actual program? Did you check the committed offsets before/after you stop/restart the broker?Matthias J. Sax
@MatthiasJ.Sax Kindly see the update.el323
@MatthiasJ.Sax Is there a way to consume from __consumer_offsets topic using console consumer?el323
The code looks ok, from a first glance over it. Just wondering why you are using a flatMapValues() instead of mapValues() if you only want to emit a single output record per input record? The __consumer_offsets topic is a special internal topic that cannot be accessed as other topics; you can use command line tool bin/kafka-consumer-groups.sh to the the offset information.Matthias J. Sax
Thanks I have changed flatMapValues() to mapValues(). I will try that consumer-group command line tool and update you. Thanks again for all the helpel323

1 Answers

2
votes

seems you are facing with known limitation. Kafka topic by default stores messages at least 7 days, but committed offsets stored for 1 day (default config value offsets.retention.minutes = 1440). so if no messages were produced to your source topic during more than 1 day, after app restart all messages from topic will be reprocessed again (actually multiple times, depending on number of restarts, max 1 time per day per such topic with rare incoming messages).

you could find description of expiration committed offsets How does an offset expire for consumer group.

in kafka version 2.0 retention for committed offsets was increased KIP-186: Increase offsets retention default to 7 days.

to prevent reprocessing, you could add consumer property auto.offset.reset: latest (default value is earliest). there is exist a small risk with latest: if no one produced message into a source topic longer that day, and after that you restarted app, you could lost some messages (only messages which arrived exactly during restart).