2
votes

Kafka producer is sending .gz files but not able to decompress and read the files at the consumer end. Getting error as "IOError: Not a gzipped file"

Producer - bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Airport < ~/Downloads/stocks.json.gz

Consumer -

import sys 
import gzip
import StringIO
from kafka import KafkaConsumer

consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BROKERS)

try:
    for message in consumer:
        f = StringIO.StringIO(message.value)
        gzip_f = gzip.GzipFile(fileobj=f)
        unzipped_content = gzip_f.read()
        content = unzipped_content.decode('utf8')
        print (content)
except KeyboardInterrupt:
    sys.exit()

Error at consumer -

Traceback (most recent call last):
  File "consumer.py", line 18, in <module>
    unzipped_content = gzip_f.read()
  File "/usr/lib64/python2.6/gzip.py", line 212, in read
    self._read(readsize)
  File "/usr/lib64/python2.6/gzip.py", line 255, in _read
    self._read_gzip_header()
  File "/usr/lib64/python2.6/gzip.py", line 156, in _read_gzip_header
    raise IOError, 'Not a gzipped file'
IOError: Not a gzipped file
1
Doesn't the console-producer create a message for every line in the input file? Are you sure this works with your gzipped file? Can you check the topic how many messages it produced with that?Thilo
If we put print (message) instead of gzip.GzipFile(fileobj=f), we get the output -- ConsumerRecord(topic=u'Airport', partition=0, offset=961956, timestamp=1525798407789, timestamp_type=0, key=None, value='\x1f\xef\xbf\xbd\x08\x08)~\xef\xbf\xbdZ\x00\x03text.txt\x00\xef\xbf\xbdH\xef\xbf\xbd\xef\xbf\xbd\xef\xbf\xbd\xef\xbf\xbd\xef\xbf\xbd\xef\xbf\xbd/WH,JU\xef\xbf\xbd\xef\xbf\xbd/\xef\xbf\xbd\xef\xbf\xbd\xef\xbf\xbd\x02\x00A\xef\xbf\xbdf\xef\xbf\xbd\x14\x00\x00\x00', checksum=2006205804, serialized_key_size=-1, serialized_value_size=81)Inder
So that is only 81 bytes. Your file was bigger, wasn't it?Thilo
There are message values for each input line with different size. Need to decompress the value.Thank you for your help.Inder
What is the fetch size in bytes in your configuration?wandermonk

1 Answers

0
votes

Kafka is not meant for sending huge payloads/messages. You should see it as a Distributed Message BUS which gives you all the privileges of a distributed system.

Kafka limits the size of the messages which could be sent across because of the following reasons

  • Huge messages increases the memory pressure in the broker.
  • Large messages slows down the broker and handling them is very expensive.

Solution:

  • You could very well use a Reference Based Messaging where you send the location of the huge message to the consumer rather than sending huge data as is. This will allow you to use the capabilities of an external datastore and also reduces the pressure on the Kafka Brokers.
  • You could also chunk the data and send it inline and re-assemble at the sink.

Playing with Batch Size:

batch.size measures batch size in total bytes instead of the number of messages. It controls how many bytes of data to collect before sending messages to the Kafka broker. Set this as high as possible, without exceeding available memory. The default value is 16384.

If you increase the size of your buffer, it might never get full. The Producer sends the information eventually, based on other triggers, such as linger time in milliseconds. Although you can impair memory usage by setting the buffer batch size too high, this does not impact latency.

If your producer is sending all the time, you are probably getting the best throughput possible. If the producer is often idle, you might not be writing enough data to warrant the current allocation of resources.

Since, your data is a gzip you can use the Reference Based Messaging.

Instead of playing with the fetch size and message max byte size which cannot cover all the file size store your files on a distributed file system like NFS/HDFS/S3 and send the reference to the consumer. The consumer can pick the location and unzip the data.