3
votes

I have a Kafka producer code in Java that watches a directory for new files using java nio WatchService api and takes any new file and pushes to a kafka topic. Spark streaming consumer reads from the kafka topic. I am getting the following error after the Kafka producer job keeps running for a day. The producer pushes about 500 files every 2 mins. My Kafka topic has 1 partition and 2 replication factor. Can someone please help?

org.apache.kafka.common.KafkaException: Failed to construct kafka producer         
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:342) 
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:166) 
    at com.hp.hawkeye.HawkeyeKafkaProducer.Sender.createProducer(Sender.java:60) 
    at com.hp.hawkeye.HawkeyeKafkaProducer.Sender.<init>(Sender.java:38)   
    at com.hp.hawkeye.HawkeyeKafkaProducer.HawkeyeKafkaProducer.<init>(HawkeyeKafkaProducer.java:54) 
    at com.hp.hawkeye.HawkeyeKafkaProducer.myKafkaTestJob.main(myKafkaTestJob.java:81)

Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
    at org.apache.kafka.common.network.Selector.<init>(Selector.java:125)
    at org.apache.kafka.common.network.Selector.<init>(Selector.java:147)  
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:306)

... 7 more 
Caused by: java.io.IOException: Too many open files         
     at sun.nio.ch.EPollArrayWrapper.epollCreate(Native Method)         
     at sun.nio.ch.EPollArrayWrapper.<init>(EPollArrayWrapper.java:130)        
     at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:69)      
     at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36) 
     at java.nio.channels.Selector.open(Selector.java:227)         
     at org.apache.kafka.common.network.Selector.<init>(Selector.java:123)     
 ... 9 more
1
I'm also using spark streaming and producing to Kafka, can you paste your code here?Arnon Rodman
@Steven I can see this old question nobody answered.Are you able to resolve this issue? I am also facing same issue.Could you please guide or give us the pointer.AmitK
@AmitK yes we solved this by having multiple threads and concurrency model. It worked fine after that. Each thread worked independently in pushing messages to kafka. Plus we also did what is in the answer below from voldy - increased open files sizeSteven Park

1 Answers

1
votes

Check ulimit -aH

check with your admin and increase the open files size, for eg:

open files                      (-n) 655536

else I suspect there might be leaks in your code, refer:

http://mail-archives.apache.org/mod_mbox/spark-user/201504.mbox/%3CCAKWX9VVJZObU9omOVCfPaJ_bPAJWiHcxeE7RyeqxUHPWvfj7WA@mail.gmail.com%3E