Kafka producer is sending .gz files but not able to decompress and read the files at the consumer end. Getting error as "IOError: Not a gzipped file"
Producer - bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Airport < ~/Downloads/stocks.json.gz
Consumer -
import sys
import gzip
import StringIO
from kafka import KafkaConsumer
consumer = KafkaConsumer(KAFKA_TOPIC, bootstrap_servers=KAFKA_BROKERS)
try:
for message in consumer:
f = StringIO.StringIO(message.value)
gzip_f = gzip.GzipFile(fileobj=f)
unzipped_content = gzip_f.read()
content = unzipped_content.decode('utf8')
print (content)
except KeyboardInterrupt:
sys.exit()
Error at consumer -
Traceback (most recent call last):
File "consumer.py", line 18, in <module>
unzipped_content = gzip_f.read()
File "/usr/lib64/python2.6/gzip.py", line 212, in read
self._read(readsize)
File "/usr/lib64/python2.6/gzip.py", line 255, in _read
self._read_gzip_header()
File "/usr/lib64/python2.6/gzip.py", line 156, in _read_gzip_header
raise IOError, 'Not a gzipped file'
IOError: Not a gzipped file