I'm writing Kafka Streams application using the DSL API that will read tuples from a kafka topic. In the topology I want to batch the tuples. Then I want to write the bath to a file on disk if (1) 30 seconds has passed or (2) the size of a batch is exceeded 1 GB.
The topology I have written groups tuples using a TimeWindowedKStream. Then calls aggregate and passes a Windowed Store.
My problem is that when the state store attempted to write to the Kafka ChangeLog I get a
org.apache.kafka.common.errors.RecordTooLargeException
exception.
In particular:
Caused by: org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sending since an error caught with a previous record (key \x00\x00\x00\x06\x00\x00\x01h$\xE7\x88 \x00\x00\x00\x00 value [B@419761c timestamp 1546807396524) to topic ibv2-capt-consumer-group-3-record-store-changelog due to org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept..
I've tried setting CACHE_MAX_BYTES_BUFFERING_CONFIG
to be 1MB, but as the documentation states this config if for the entire topology.
Here is my topology
Here is the Scala code I've been using. Note I am using kafka-streams-scala here.
val builder = new StreamsBuilderS()
import com.lightbend.kafka.scala.streams.DefaultSerdes._
implicit val recordSerde = (new RecordSerde).asInstanceOf[Serde[Record]]
implicit val recordSeqSerde = (new RecordSeqSerde).asInstanceOf[Serde[RecordSeq]]
val inputStream: KStreamS[String, Record] = builder.stream[String,Record](topic)
val keyed = inputStream.selectKey[Int]((k,r) => random.nextInt(10))
val grouped: TimeWindowedKStreamS[Int, Record] = keyed.groupByKey.windowedBy(TimeWindows.of(TimeUnit.SECONDS.toMillis(30L)))
import org.apache.kafka.common.utils.Bytes
val windowedStore: Materialized[Int, RecordSeq, WindowStore[Bytes, Array[Byte]]] = Materialized
.as[Int,RecordSeq,WindowStore[Bytes, Array[Byte]]]("record-store")
.withKeySerde(integerSerde)
.withValueSerde(recordSeqSerde)
.withLoggingEnabled(ChangeLogConfig.getChangeLogConfig.asJava) // increased max.request.size to 10 x default
val records: KTableS[Windowed[Int], RecordSeq] = grouped.aggregate(
() => RecordSeq(Seq()),
(randon: Int, record: Record, recordSeq: RecordSeq) => RecordSeq(recordSeq.records :+ record),
windowedStore
)
val recordSeqStream: KStreamS[String, RecordSeq] = records.toStream((ws, r) => s"${ws.key()}-${ws.window().start()}-${ws.window().end()}")
recordSeqStream.foreach((k: String, rs: RecordSeq) => WrappedRecordFileWriter.write(k, rs))
Note: case class RecordSeq(records: Seq[Record])