1
votes

I'm actually working on topology taking data from kafka and persist them into elasticsearch. Ok first, I used the basic KafkaSpout from storm dependency to listen for data coming from a precise kafka topic and, I re-implemented the Elasticsearch bolt from the elasticsearch-hadoop project: https://github.com/elastic/elasticsearch-hadoop/blob/master/storm/src/main/java/org/elasticsearch/storm/EsBolt.java. The goal was to write on several indices in elasticsearch. So, when I process the messages coming from kafka, I have some exceptions when the number of data grow up in the kafka queue. This is one part of the stack trace in the worker logs:

2016-04-13T22:24:44.641+0000 b.s.m.n.Client [ERROR] failed to send 580 messages to Netty-Client-ip-[internal-ip].ec2.internal/[internal-ip]:6700: 
java.nio.channels.ClosedChannelException
2016-04-13T22:24:44.641+0000 b.s.m.n.Client [ERROR] failed to send 575 messages to Netty-Client-ip-[internal-ip].ec2.internal/[internal-ip]:6700:
java.nio.channels.ClosedChannelException
2016-04-13T22:25:05.970+0000 b.s.m.n.Client [WARN] Re-connection to ip-[internal-ip].ec2.internal/[internal-ip]:6701 was successful but 52890 messages
has been lost so far
2016-04-13T22:36:33.571+0000 b.s.m.n.StormClientHandler [INFO] Connection failed Netty-Client-ip-ip-[internal-ip].ec2.internal/[internal-ip]:6701
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_77]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_77]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_77]
at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.8.0_77]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[na:1.8.0_77]
at org.apache.storm.netty.channel.socket.nio.NioWorker.read(NioWorker.java:64) [storm-core-0.9.6.jar:0.9.6]
at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) [storm-core-0.9.6.jar:0.9.6]
at org.apache.storm.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) [storm-core-0.9.6.jar:0.9.6]
at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) [storm-core-0.9.6.jar:0.9.6]
at org.apache.storm.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [storm-core-0.9.6.jar:0.9.6]
at org.apache.storm.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [storm-core-0.9.6.jar:0.9.6]
at org.apache.storm.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [storm-core-0.9.6.jar:0.9.6]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_77]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_77]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]

I'm using a storm cluster of 3 nodes (1 nimbus+UI+Zookeeper and 2 supervisors). Storm version 0.9.6. Each of these machines have 4GB RAM and this is the content my storm.yml config file:

storm.zookeeper.servers:
  - "nimbus-ip"
storm.local.dir: "/mnt/storm"
nimbus.seeds: ["nimbus-ip"]
storm.zookeeper.port: 2181
ui.port: 8080
nimbus.host: "nimbus-ip"
supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
storm.messaging.netty.max_wait_ms: 10000

Can anyone help me to know why workers can't communicate due to Netty-Client hostname resolution? I already saw one report of this issue in the 0.9.4 version of storm https://issues.apache.org/jira/browse/STORM-908. Is it possible that the 0.9.6 version does not fix this issue?

Many thanks!!

1

1 Answers

0
votes

I got here from google looking for answers to a similar problem. In my case, the error was:

o.a.s.m.n.Client [ERROR] connection attempt 104 to Netty-Client-ip-XXX-XXX-XXX-XXX.ec2.internal/XXX.XXX.XXX.XXX:6703 failed: java.net.ConnectException: Connection refused: ip-XXX-XXX-XXX-XXX.ec2.internal/XXX.XXX.XXX.XXX:6703

This was appearing on a 2-node storm cluster (v1.0.1).

At first, I thought this was a networking issue with AWS (which is where I was deploying the nodes). I started to look at security group rules, /etc/hosts files etc etc, none of which helped.

After some searching I discovered this: https://issues.apache.org/jira/browse/STORM-1382 and figured that maybe the issue wasn't the network at all, but something on the other end wasn't running.

So, I ssh-d into a worker node and took a look at the supervisor log, which showed me something like this lots and lots:

o.a.s.d.supervisor [INFO] 30236e62-d2e1-4d5c-b75c-f54ef07653a4 still hasn't started

When I looked at the worker.log itself, I discovered there was a problem with the default java version. That was my problem, but other people's problems may be related to other reasons that a worker may fail.

Anyway, once I set the correct default java version it all kicked into life.