My streams application is simply materializing a KTable from a topic of records. With 100K records in the topic, no problems. However, with 15M records in the topic, once we get a few million records in, instances will crash with an exception like:
Exception in thread “Companies-de1f21f9-b445-449e-a59b-5e0cecfa54d1-StreamThread-1” org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (timestamp 1601327726515) to topic Companies-companies.read-changelog due to org.apache.kafka.common.errors.TimeoutException: Expiring 40 record(s) for Companies-companies.read-changelog-0:120001 ms has passed since batch creation]
Here is a Gist with a detailed example of the service we're running.
What's puzzling to me is that the error crashing my streams app (below) is referencing a producer being overloaded, however, this service is merely materializing a KTable.
streamsBuilder
.stream(egressTopic, Consumed.with(Serdes.String(), companySerde))
.toTable(Materialized.<String, Company, KeyValueStore<Bytes, byte[]>>as(companyKTableName)
.withKeySerde(Serdes.String())
.withValueSerde(companySerde));
Properties I have already tuned in attempts to get this operating nominally:
batch.size
10000linger.ms
1000request.timeout.ms
300000max.block.ms
300000retry.backoff.ms
1000replication.factor
3