Flume agent 1 does not connect to Flume agent 2. What could be the reason ?
I am using Flume to stream log file to HDFS using 2 Agents. The first agent is located at the source machine where the log file exists, while the second agent is located in the machine (IP Address is 10.10.201.40) where Hadoop is installed.
The configuration file of the first agent (flume-src-agent.conf) is as follows:
source_agent.sources = weblogic_server
source_agent.sources.weblogic_server.type = exec
source_agent.sources.weblogic_server.command = tail -f AdminServer.log
source_agent.sources.weblogic_server.batchSize = 1
source_agent.sources.weblogic_server.channels = memoryChannel
source_agent.sources.weblogic_server.interceptors = itime ihost itype
source_agent.sources.weblogic_server.interceptors.itime.type = timestamp
source_agent.sources.weblogic_server.interceptors.ihost.type = host
source_agent.sources.weblogic_server.interceptors.ihost.useIP = false
source_agent.sources.weblogic_server.interceptors.ihost.hostHeader = host
source_agent.sources.weblogic_server.interceptors.itype.type = static
source_agent.sources.weblogic_server.interceptors.itype.key = log_type
source_agent.sources.weblogic_server.interceptors.itype.value = apache_access_combined
source_agent.channels = memoryChannel
source_agent.channels.memoryChannel.type = memory
source_agent.channels.memoryChannel.capacity = 100
source_agent.sinks = avro_sink
source_agent.sinks.avro_sink.type = avro
source_agent.sinks.avro_sink.channel = memoryChannel
source_agent.sinks.avro_sink.hostname = 10.10.201.40
source_agent.sinks.avro_sink.port = 4545
The configuration file of the second agent (flume-trg-agent.conf) is as follows:
collector.sources = AvroIn
collector.sources.AvroIn.type = avro
collector.sources.AvroIn.bind = 0.0.0.0
collector.sources.AvroIn.port = 4545
collector.sources.AvroIn.channels = mc1 mc2
collector.channels = mc1 mc2
collector.channels.mc1.type = memory
collector.channels.mc1.capacity = 100
collector.channels.mc2.type = memory
collector.channels.mc2.capacity = 100
collector.sinks = HadoopOut
collector.sinks.HadoopOut.type = hdfs
collector.sinks.HadoopOut.channel = mc2
collector.sinks.HadoopOut.hdfs.path = hdfs://localhost:54310/user/root
collector.sinks.HadoopOut.hdfs.callTimeout = 150000
collector.sinks.HadoopOut.hdfs.fileType = DataStream
collector.sinks.HadoopOut.hdfs.writeFormat = Text
collector.sinks.HadoopOut.hdfs.rollSize = 0
collector.sinks.HadoopOut.hdfs.rollCount = 10000
collector.sinks.HadoopOut.hdfs.rollInterval = 600
When the 1st agent is run, I get the following error:
2015-04-08 15:14:10,251 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException:Failed to send events
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:382)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:744)
Caused by: org.apache.flume.FlumeException: NettyAvroRpcClient {host:10.10.201.40, port:4545}: RPC connection error
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:161)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:115)
at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:590)
at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)
at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:209)
at org.apache.flume.sink.AbstractRpcSink.verifyConnection(AbstractRpcSink.java:269)
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:339)
... 3 more
Caused by: java.io.IOException: Error connecting to /10.10.201.40:4545
at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
at.org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
at.org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
at.org.apache.avro.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:147)
When the 2nd Agent is run, I get the following error:
2015-04-08 15:53:31,649 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG-org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Pollingsink runner starting
2015-04-08 15:53:31,844 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start EventDrivenSourceRunner: { source:Avro source AvroIn: {bindAddress: 0.0.0.0, port: 4545 } } - Exception follows.
org.jboss.netty.channel.ChannelException: Failed to bind to: /0.0.0.0:4545
at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:298)
at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)
at org.apache.flume.source.AvroSource.start(AvroSource.java:225)
at org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:126)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.bind(NioServerSocketPipelineSink.java:138)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleServerSocket(NioServerSocketPipelineSink.java:90)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:64)
at org.jboss.netty.channel.Channels.bind(Channels.java:569)
at org.jboss.netty.channel.AbstractChannel.bind(AbstractChannel.java:187)
at org.jboss.netty.bootstrap.ServerBootstrap$Binder.channelOpen(ServerBootstrap.java:343)
at org.jboss.netty.channel.Channels.fireChannelOpen(Channels.java:170)
at org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory.newChannel(NioServerSocketChannelFactory.java:158)
at org.jboss.netty.channel.socket.nio.NioServerSocketChannel.<init>(NioServerSocketChannel.java:80)
at org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory.newChannel(NioServerSocketChannelFactory.java:86)
at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:277)
... 13 more