I am using a Kafka consumer but when running it and trying to fetch 1000 messages I get the following error:
kafka.consumer.fetcher.RecordTooLargeError: RecordTooLargeError: ("There are some messages at [Partition=Offset]: {TopicPartition(topic='stag-client-topic', partition=0): 177} whose size is larger than the fetch size 247483647 and hence cannot be ever returned. Increase the fetch size, or decrease the maximum message size the broker will allow.", {TopicPartition(topic='stag-client-topic', partition=0): 177})
I am already using the maximum fetch size in the configuration of my consumer. Here is the function which defines the consumer
def kafka_decoder(x, context=dict()):
try:
return json.loads(x.decode('utf-8'))
except json.JSONDecodeError as e:
return None
def build_consumer(topic, servers, auto_commit, context=dict()):
try:
return KafkaConsumer(
topic,
bootstrap_servers=servers,
value_deserializer=lambda value: kafka_decoder(value, context={
'event_string': value.decode('utf-8')}),
key_deserializer=lambda key: key.decode('utf-8'),
group_id='client-',
api_version=(0, 10, 1),
enable_auto_commit=auto_commit,
auto_offset_reset='earliest',
request_timeout_ms=30000,
security_protocol='SASL_SSL',
max_partition_fetch_bytes=247483647,
max_poll_records=10000,
fetch_max_wait_ms=4000,
fetch_max_bytes=247483647,
sasl_mechanism='PLAIN',
ssl_check_hostname = False,
sasl_plain_username='usrname',
sasl_plain_password='somepsswrd')
except Exception:
print('Error in Kafka consumer creation')
Does anyone have any suggestions on how to proceed here?