2
votes

I found two questions asking why no result record is emitted if there is no new record put into a partition:
1. "Kafka Stream Suppress session-windowed-aggregation" and
2. "Kafka Streams (Suppress): Closing a TimeWindow by timeout"

In the answers to both questions, the explanation is that it is necessary to send a new record in order to emit one.

I do not understand why emitting a record after time out without a new record would violate the suppress contract and would appreciate an explanation.

The best proposal so far is to use dummy records to trigger the emission.

I thought that closing and restarting the stream (topology) might be more appropriate than writing dummy records. I thought that the new instance of the stream would peak up the records and emit the result as the time out has already expired.

However, I tried and saw that it did not work. I would appreciate an explanation if possible.

@Slf4j
public class KafkaStreamVerticle extends AbstractVerticle {

  private KafkaStreams streams;

  @Override
  public void start(Future<Void> startFuture) throws Exception {

    Single.fromCallable(() -> getStreamConfiguration()).subscribe(config -> {

      final StreamsBuilder builder = new StreamsBuilder();

      builder.<String, String>stream(KafkaProducerVerticle.TOPIC)
          .flatMapValues((k, v) -> List.<JsonObject>of(new JsonObject(v).put("origKey", k)))
          .selectKey((k, v) -> v.getString(KafkaProducerVerticle.CATEGORY))
          .flatMapValues(v -> List.<String>of(v.toString()))
          .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
          .windowedBy(TimeWindows.of(Duration.ofSeconds(4)).grace(Duration.ZERO)).count()
          // .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())).toStream().foreach((k,
          .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(4), BufferConfig.unbounded()))
          .toStream().foreach((k, v) -> log.info("********* {}: {} - {}: {}", k.key(),
              k.window().start(), k.window().end(), v));

      streams = buildAndStartsNewStreamsInstance(config, builder);
      Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
      restartStreamsPeriodicaly(config, builder, 30_000L);
      log.info("consumer deployed");
      startFuture.complete();
    });
  }

  private KafkaStreams buildAndStartsNewStreamsInstance(Properties config,
      final StreamsBuilder builder) {
    KafkaStreams streams = new KafkaStreams(builder.build(), config);
    streams.cleanUp();
    streams.start();
    return streams;
  }

  private void restartStreamsPeriodicaly(Properties config, final StreamsBuilder builder,
      @NonNull Long period) {
    vertx.setPeriodic(period, l -> {
      log.info("restarting streams!!");
      streams.close();
      streams = buildAndStartsNewStreamsInstance(config, builder);
    });
  }

  private Properties getStreamConfiguration() {
    final Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "suppress-example");
    streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "suppress-client");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
        Serdes.String().getClass().getName());
    streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
        Serdes.String().getClass().getName());
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
    streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
    streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0L);
    return streamsConfiguration;
  }
}
1

1 Answers

3
votes

Kafka Stream provides event-time semantics, this means, it's internal time only advanced base on the timestamps of the records (internal time does never advance based on wall-clock time). The "time out" you are taking about, it also based on event-time (not wall-clock time).

Assume you have a window of size 5 (ie, [0,5) would be a window), and you see data with ts=1,2,3. This implies that the next record might have timestamp=4 and must be contained in the window. However, if no new data is arrives, the window result cannot be emitted, no matter how long you wait. Only if a record with timestamp=5 arrives, the internal time advances and is now greater than window-end time, and the result for the window is emitted. If suppress() would emit the data after some wall-clock based timeout, and the next record has timestamp=4, it would have emitted the wrong result.

Additionally, suppress() remembers its internal state and time. Hence, even if you restart your application, suppress() will still buffer the data and will still wait for a record with timestamp=5 to emit the data.