0
votes

I have a cloudera cluster with 3 Brokers on 3 different machines. I am developing from a fourth one inside the cluster.

I have created my topic as followed: create topic /usr/bin/kafka-topics --zookeeper host:2181,host2:2181,hosts3:2181/kafka --create --partitions 10 --replication-factor 2 --topic topicname

My root dir in zookeeper is not root, it is /kafka

Here is my code of the producer:

class Kafkaproducer(object):
    def __init__(self, **kwargs):
        if kwargs:
            try:
                self.producer = KafkaProducer(**kwargs)
            except Exception as ex:
                print "unable to create Producer Object " + str(ex)
            self.iw = Imageworker()
            log = Logger()
            self.logs = log.logger('Producer')


    def set_topic(self, topic):
        """
        Set Topic for Producer
        :param self:
        :param topic: Topic String for Kafka
        :return: no value
        """
        self.topic = topic
        print self.producer.partitions_for(topic )


    def send_message(self, file):
        """
        send a single message to kafka broker
        :param self:
        :param file: absolute filepath from file to send to broker
        :return: no value
        """
        print self.topic
        try:
            print "create json message .. "
            message = self.iw.read_image_file(file)
        except Exception as ex:
            print "unable to read file" + str(ex)
        try:
            print "send message"+ self.iw.get_imagename(file)
            self.producer.send(self.topic, message)
        except Exception as Ex:
            print "unable to send kafka message " + str(ex)

    def _handle_fetch_response(self):
        print "error"

    def send_message_synchron(self, file ):
        """

        :param data:
        :return:
        """
        try:
            print "create json message .. "
            message = self.iw.read_image_file(file)

        except Exception as ex:
            print "unable to read file" + str(ex)
        try:
            #print "send message "+ self.iw.get_imagename(file)
            future = self.producer.send(self.topic, message)
            future.error_on_callbacks=True
            #result = future.get(timeout=1000)
            result = future.succeeded()

            print future.is_done
            if result:
                print future.value
                print result
                print "success!!!"
                meta = future.get(timeout=100)
        except Exception as ex:
            print "unable to send kafka message " + str(ex)
        try:
            if future.is_done:
                print "Message send successful "
        except KafkaError:
            log.exception()
            print "Error in Kafka"
            pass


    def flush_producer(self):
        self.producer.flush()

I am able to send messages asynchron with the send_messages function. Also I get the number of partitions from the used topic. Problem is, that the messages disappear.

I have checked it twice with my python consumer and the following statement:

/opt/cloudera/parcels/KAFKA-2.2.0-1.2.2.0.p0.68/lib/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list myhosts --topic topic_name

Further I would like to send messages with my synchronous function for getting the result of the future. Here, I am not able to get a future result. The line result = future.get(timeout=1000) fails.

Hope that someone has an idea in that case. Thanks a lot,

Jörn

1
What parameters do you pass to create KafkaProducer? What error message do you get when you try synchronous send with timeout=1000? Did you try future.get() without a timeout?Mariusz

1 Answers

0
votes

Found the problem, but do not know how to fix it. I read the producer configuration from a properties file

bootstrap_servers=['h1:9092' ,'h2:9092','h3:9092']
api_version=(0,10)
value_serializer=str.encode
buffer_memory=200000000
retries=5
max_block_ms=10000

producer = Kafkaproducer(**dic)  # do not work
roducer = Kafkaproducer(bootstrap_servers=['h1:9092' ,'h2:9092','h3:9092'],api_version=(0,10)...   # works well

On the consumer site I am able to work with consumer = Kafkaconsumer(**dic)

After fixing the producer Call, the synchronous error war also gone. But why I am not able to call the producer with a dictionary ?

--> {'retries': 5, 'max_block_ms': 10000, 'buffer_memory': 200000000, 'bootstrap_servers': ['h1:9092', 'h2:9092', 'h3:9092'], 'value_serializer': 'str.encode', 'api_version': (0, 10)}

Thank you