I have a cloudera cluster with 3 Brokers on 3 different machines. I am developing from a fourth one inside the cluster.
I have created my topic as followed: create topic /usr/bin/kafka-topics --zookeeper host:2181,host2:2181,hosts3:2181/kafka --create --partitions 10 --replication-factor 2 --topic topicname
My root dir in zookeeper is not root, it is /kafka
Here is my code of the producer:
class Kafkaproducer(object):
def __init__(self, **kwargs):
if kwargs:
try:
self.producer = KafkaProducer(**kwargs)
except Exception as ex:
print "unable to create Producer Object " + str(ex)
self.iw = Imageworker()
log = Logger()
self.logs = log.logger('Producer')
def set_topic(self, topic):
"""
Set Topic for Producer
:param self:
:param topic: Topic String for Kafka
:return: no value
"""
self.topic = topic
print self.producer.partitions_for(topic )
def send_message(self, file):
"""
send a single message to kafka broker
:param self:
:param file: absolute filepath from file to send to broker
:return: no value
"""
print self.topic
try:
print "create json message .. "
message = self.iw.read_image_file(file)
except Exception as ex:
print "unable to read file" + str(ex)
try:
print "send message"+ self.iw.get_imagename(file)
self.producer.send(self.topic, message)
except Exception as Ex:
print "unable to send kafka message " + str(ex)
def _handle_fetch_response(self):
print "error"
def send_message_synchron(self, file ):
"""
:param data:
:return:
"""
try:
print "create json message .. "
message = self.iw.read_image_file(file)
except Exception as ex:
print "unable to read file" + str(ex)
try:
#print "send message "+ self.iw.get_imagename(file)
future = self.producer.send(self.topic, message)
future.error_on_callbacks=True
#result = future.get(timeout=1000)
result = future.succeeded()
print future.is_done
if result:
print future.value
print result
print "success!!!"
meta = future.get(timeout=100)
except Exception as ex:
print "unable to send kafka message " + str(ex)
try:
if future.is_done:
print "Message send successful "
except KafkaError:
log.exception()
print "Error in Kafka"
pass
def flush_producer(self):
self.producer.flush()
I am able to send messages asynchron with the send_messages function. Also I get the number of partitions from the used topic. Problem is, that the messages disappear.
I have checked it twice with my python consumer and the following statement:
/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list myhosts --topic topic_name
Further I would like to send messages with my synchronous function for getting the result of the future. Here, I am not able to get a future result. The line result = future.get(timeout=1000) fails.
Hope that someone has an idea in that case. Thanks a lot,
Jörn
future.get()
without a timeout? – Mariusz