0
votes

My streams application is simply materializing a KTable from a topic of records. With 100K records in the topic, no problems. However, with 15M records in the topic, once we get a few million records in, instances will crash with an exception like:

Exception in thread “Companies-de1f21f9-b445-449e-a59b-5e0cecfa54d1-StreamThread-1” org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (timestamp 1601327726515) to topic Companies-companies.read-changelog due to org.apache.kafka.common.errors.TimeoutException: Expiring 40 record(s) for Companies-companies.read-changelog-0:120001 ms has passed since batch creation]

Here is a Gist with a detailed example of the service we're running.

What's puzzling to me is that the error crashing my streams app (below) is referencing a producer being overloaded, however, this service is merely materializing a KTable.

streamsBuilder
  .stream(egressTopic, Consumed.with(Serdes.String(), companySerde))
  .toTable(Materialized.<String, Company, KeyValueStore<Bytes, byte[]>>as(companyKTableName)
    .withKeySerde(Serdes.String())
    .withValueSerde(companySerde));

Properties I have already tuned in attempts to get this operating nominally:

  • batch.size 10000
  • linger.ms 1000
  • request.timeout.ms 300000
  • max.block.ms 300000
  • retry.backoff.ms 1000
  • replication.factor 3
1

1 Answers

0
votes

Each table is backed by a changelog topic for fault-tolerance. Thus, each write into the KTable is also a write into the corresponding changelog topic.

If you input topic is configured with log compaction, you could rewrite your program to

streamsBuilder.table(
    egressTopic,
    Materialized.<String, Company, KeyValueStore<Bytes, byte[]>>as(companyKTableName)
        .withKeySerde(Serdes.String())
        .withValueSerde(companySerde)
);

In addition, you enable topology.optimization="all": for this case, the input topic will be re-used an changelog to recover state and no additional changelog topic will be created.