0
votes

I'm writing Kafka Streams application using the DSL API that will read tuples from a kafka topic. In the topology I want to batch the tuples. Then I want to write the bath to a file on disk if (1) 30 seconds has passed or (2) the size of a batch is exceeded 1 GB.

The topology I have written groups tuples using a TimeWindowedKStream. Then calls aggregate and passes a Windowed Store.

My problem is that when the state store attempted to write to the Kafka ChangeLog I get a

org.apache.kafka.common.errors.RecordTooLargeException

exception.

In particular:

Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sending since an error caught with a previous record (key \x00\x00\x00\x06\x00\x00\x01h$\xE7\x88 \x00\x00\x00\x00 value [B@419761c timestamp 1546807396524) to topic ibv2-capt-consumer-group-3-record-store-changelog due to org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept..

I've tried setting CACHE_MAX_BYTES_BUFFERING_CONFIG to be 1MB, but as the documentation states this config if for the entire topology.

Here is my topology

Here is the Scala code I've been using. Note I am using kafka-streams-scala here.

val builder = new StreamsBuilderS()

import com.lightbend.kafka.scala.streams.DefaultSerdes._

implicit val recordSerde = (new RecordSerde).asInstanceOf[Serde[Record]]
implicit val recordSeqSerde = (new RecordSeqSerde).asInstanceOf[Serde[RecordSeq]]

val inputStream: KStreamS[String, Record] = builder.stream[String,Record](topic)

val keyed = inputStream.selectKey[Int]((k,r) => random.nextInt(10)) 

val grouped: TimeWindowedKStreamS[Int, Record] = keyed.groupByKey.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30L)))

import org.apache.kafka.common.utils.Bytes

val windowedStore: Materialized[Int, RecordSeq, WindowStore[Bytes, Array[Byte]]] = Materialized
  .as[Int,RecordSeq,WindowStore[Bytes, Array[Byte]]]("record-store")
  .withKeySerde(integerSerde)
  .withValueSerde(recordSeqSerde)
  .withLoggingEnabled(ChangeLogConfig.getChangeLogConfig.asJava)  // increased max.request.size to 10 x default

val records: KTableS[Windowed[Int], RecordSeq] = grouped.aggregate(
  () => RecordSeq(Seq()),
  (randon: Int, record: Record, recordSeq: RecordSeq) => RecordSeq(recordSeq.records :+ record),
  windowedStore
)

val recordSeqStream: KStreamS[String, RecordSeq] = records.toStream((ws, r) => s"${ws.key()}-${ws.window().start()}-${ws.window().end()}")

recordSeqStream.foreach((k: String, rs: RecordSeq) => WrappedRecordFileWriter.write(k, rs))

Note: case class RecordSeq(records: Seq[Record])

1

1 Answers

1
votes

A topic can have the record with maximum size as defined in message.max.bytes property. This is the largest size of message which a broker can receive and append in the topic. Your record size possibly exceeds that limit. Hence you need to change the configuration of this property to allow the higher size of record.

It can be set at broker level as well as topic level. You can refer more details here:

http://kafka.apache.org/documentation/#brokerconfigs

http://kafka.apache.org/documentation/#topicconfigs