9
votes

I have a KStream x KStream join which is breaking down with the following exception.

Exception in thread “my-clicks-and-recs-join-streams-4c903fb1-5938-4919-9c56-2c8043b86986-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_15, processor=KSTREAM-SOURCE-0000000001, topic=my_outgoing_recs_prod, partition=15, offset=9248896
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:203)
    at org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:679)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:557)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_15] exception caught when producing
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:136)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:87)
    at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
    at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:59)
    at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:105)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:107)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:100)
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:64)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:47)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:82)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:189)
    ... 3 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

I am joining a Click topic with a Recommendation topic. The Click objects are really small (less than a KB). Recommendation, on the other hand, might be big, occasionally bigger than 1 MB.

I Googled for the exception and found ( here) that I need to set max.request.size in the producer configs.

What I don't understand is, where does the producer come into picture in the streams join? The topic in the exception above topic=my_outgoing_recs_prod is the recommendations topic, and not the final joined topic. Isn't the streaming application supposed to just "consume" from it?

Nevertheless, I tried setting the property as config.put("max.request.size", "31457280");, which is 30MB. I don't expect the recommendations record to exceed that limit. Still, the code is crashing.

I cannot change the configs in the Kafka cluster but, if needed, I can change the properties of the relevant topics in Kafka.

Could someone suggest what else I can try?

If nothing works, I am willing to ignore such oversizes messages. However, I don't know a way of handling this RecordTooLargeException.

My code to perform the join is as follows.

Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, JOINER_ID + "-" + System.getenv("HOSTNAME"));
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, booststrapList);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
config.put("max.request.size", "314572800");
config.put("message.max.bytes", "314572800");
config.put("max.message.bytes", "314572800");


KStreamBuilder builder = new KStreamBuilder();

KStream<String, byte[]> clicksStream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Serdes.String(), Serdes.ByteArray(), clicksTopic);
KStream<String, byte[]> recsStream = builder.stream(TopologyBuilder.AutoOffsetReset.LATEST, Serdes.String(), Serdes.ByteArray(), recsTopic);

KStream<String, ClickRec> join = clicksStream.join(
        recsStream,
        (click, recs) -> new ClickRec(click, recs),
        JoinWindows.of(windowMillis).until(3*windowMillis));

join.to(Serdes.String(), JoinSerdes.CLICK_SERDE, jointTopic);

KafkaStreams streams = new KafkaStreams(builder, config);
streams.cleanUp();
streams.start();

ClickRec is the joined object (which is far smaller than a Recommendation object and I don't expect it to be bigger than a few KBs).

Where do I put a try...catch in the code above to recover from such occasionally oversized objects?

1

1 Answers

17
votes

There are multiple configs at different levels:

  1. You have a broker setting message.max.bytes (default is 1000012) (cf http://kafka.apache.org/documentation/#brokerconfigs)
  2. There is a topic level config max.message.bytes (default is 1000012) (cf http://kafka.apache.org/documentation/#topicconfigs)
  3. Producer has max.request.size (default is 1048576) (cf. http://kafka.apache.org/documentation/#producerconfigs)

You stack trace indicates, that you need to change the setting at the broker or topic level:

Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

Maybe you also need to increase the producer setting.

Why do you need this in the first place:

As you perform a KStream-KStream join, the join operator builds up state (it must buffer records from both streams in order to compute the join). State is by default backed by a Kafka topic -- the local state is basically a cache while the Kafka topic is the source of truth. Thus, all your records will be written to this "changelog topic" that Kafka Streams creates automatically.