1
votes

I am using a Kafka consumer but when running it and trying to fetch 1000 messages I get the following error:

kafka.consumer.fetcher.RecordTooLargeError: RecordTooLargeError: ("There are some messages at [Partition=Offset]: {TopicPartition(topic='stag-client-topic', partition=0): 177} whose size is larger than the fetch size 247483647 and hence cannot be ever returned. Increase the fetch size, or decrease the maximum message size the broker will allow.", {TopicPartition(topic='stag-client-topic', partition=0): 177})

I am already using the maximum fetch size in the configuration of my consumer. Here is the function which defines the consumer

def kafka_decoder(x, context=dict()):
    try:
        return json.loads(x.decode('utf-8'))
    except json.JSONDecodeError as e:
        return None

def build_consumer(topic, servers, auto_commit, context=dict()):
        try:
            return KafkaConsumer(
                topic,
                bootstrap_servers=servers,
                value_deserializer=lambda value: kafka_decoder(value, context={
                    'event_string': value.decode('utf-8')}),
                key_deserializer=lambda key: key.decode('utf-8'),
                group_id='client-',
                api_version=(0, 10, 1),
                enable_auto_commit=auto_commit,
                auto_offset_reset='earliest',
                request_timeout_ms=30000,
                security_protocol='SASL_SSL',
                max_partition_fetch_bytes=247483647,
                max_poll_records=10000,
                fetch_max_wait_ms=4000,
                fetch_max_bytes=247483647,
                sasl_mechanism='PLAIN',
                ssl_check_hostname = False,
                sasl_plain_username='usrname',
                sasl_plain_password='somepsswrd')
        except Exception:
            print('Error in Kafka consumer creation')

Does anyone have any suggestions on how to proceed here?

2

2 Answers

1
votes

It's failing on one offset, not just getting 1000 records (your poll size is 10,000 anyway)

You need to increase max_partition_fetch_bytes=247483647 and fetch_max_bytes=247483647

And you may also want to adjust the max sizes of records kafka can hold.

How can I send large messages with Kafka (over 15MB)?

Overall, though, you really should avoid putting over 236 MB of data in a single record to begin with

0
votes

As the error says "whose size is larger than the fetch size 247483647" , you should start by adding 1024*1024 bytes more, so e.g. max_partition_fetch_bytes=248532223 and so on if its still less