4
votes

I'm building an app using Kafka and Spark Streaming. Input data comes from a third part streaming and it's published on a kafka topic. This code shows the Stream Proxy module: it's the way I get the results from the streaming and how I send them to KafkaPublisher (it's shown just a sketch):

def on_result_response(self,*args):
    self.kafkaPublisher.pushMessage(str(args[0]))

The KafkaPublisher is realized with these two methods:

class KafkaPublisher:

def __init__(self,address,port,topic):
    self.kafka = KafkaClient(str(address)+":"+str(port))
    self.producer = SimpleProducer(self.kafka)
    self.topic=topic



def pushMessage(self,message):
    self.producer.send_messages(self.topic, message)
    self.producer = SimpleProducer(self.kafka, async=True)

And the app is launched by this main:

from StreamProxy import StreamProxy


streamProxy=StreamProxy("localhost",9092,"task1")
streamProxy.getStreaming(20)  #seconds of streaming

After some batch processing (10 seconds more or less) it's launched the following exceptions:

Exception in thread Thread-2354: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner File "/usr/lib/python2.7/threading.py", line 754, in run File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py", line 164, in _send_upstream File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 649, in send_produce_request File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 253, in _send_broker_aware_request File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 74, in _get_conn File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 236, in connect error: [Errno 24] Too many open files

Exception in thread Thread-2355: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner File "/usr/lib/python2.7/threading.py", line 754, in run File "/usr/local/lib/python2.7/dist-packages/kafka/producer/base.py", line 164, in _send_upstream File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 649, in send_produce_request File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 253, in _send_broker_aware_request File "/usr/local/lib/python2.7/dist-packages/kafka/client.py", line 74, in _get_conn File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 236, in connect error: [Errno 24] Too many open files

Please note that there are many different exceptions with the same message and surely the problem is publisher-side.

1
is StreamProxy something you wrote? If so can you link the source code to getStreaming?William Hammond

1 Answers

1
votes

Try to delete the row:

self.producer = SimpleProducer(self.kafka, async=True)