0
votes

I hava a 8 nodes Flink cluster and a 5 nodes Kafka cluster to run a WordCount job. In the first case, lot of data is generated and pushed to Kafka and then Flink job is launched. Everything works fine in this case.
While in the second case, Flink streaming job is launched first, then data is produced into Kafka topic. In this case, the Flink job is usually switched to failed status. Some times it fails immediately after the job is launched. Sometimes it fails several minutes after the job is launched.

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Error at remote task manager 'worker1/192.168.1.38:35240'.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.runtime.io.network.partition.ProducerFailedException
at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.writeAndFlushNextMessageIfPossible(PartitionRequestQueue.java:164)
at org.apache.flink.runtime.io.network.netty.PartitionRequestQueue.userEventTriggered(PartitionRequestQueue.java:96)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:279)
at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:265)
at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:279)
at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:265)
at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:279)
at io.netty.channel.AbstractChannelHandlerContext.fireUserEventTriggered(AbstractChannelHandlerContext.java:265)
at io.netty.channel.ChannelInboundHandlerAdapter.userEventTriggered(ChannelInboundHandlerAdapter.java:108)
at io.netty.channel.AbstractChannelHandlerContext.invokeUserEventTriggered(AbstractChannelHandlerContext.java:279)
at io.netty.channel.AbstractChannelHandlerContext.access$500(AbstractChannelHandlerContext.java:32)
at io.netty.channel.AbstractChannelHandlerContext$6.run(AbstractChannelHandlerContext.java:270)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
... 2 more
01/24/2016 22:21:32 Keyed Reduce -> Sink: Unnamed(29/32) switched to FAILED

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Error at remote task manager 'worker1/192.168.1.38:35240'.
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.decodeMsg(PartitionRequestClientHandler.java:241)
at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.channelRead(PartitionRequestClientHandler.java:164)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:745)

In the log file of worker4, the error is:

23:03:43,786 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Map -> Flat Map -> Map (20/32) switched to FAILED with exception.
java.lang.Exception: Error while fetching from broker:
Exception for partition 19: kafka.common.UnknownException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at java.lang.Class.newInstance(Class.java:383)
    at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
    at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:406)

    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Error while fetching from broker:
Exception for partition 19: kafka.common.UnknownException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at java.lang.Class.newInstance(Class.java:383)
    at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
    at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:406)

    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:422)

Before this UnknowException, there are some logs related to zookeeper:

08:58:47,720 INFO  org.I0Itec.zkclient.ZkEventThread                                 - Terminate ZkClient event thread.
08:58:47,737 INFO  org.apache.zookeeper.ZooKeeper                                - Session: 0x15277fbb7c70020 closed
08:58:47,737 INFO  org.apache.zookeeper.ClientCnxn                               - EventThread shut down
08:58:47,737 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Map -> Flat Map -> Map (6/32) switched to FAILED with exception.
1
Which version do you use (Flink, Kafka, Flink-Kafka-Connector)? - Matthias J. Sax
flink-0.10.1, kafka-0.8.2.1, flink-connector-kafka-0.10.1 - Jun
The kafka consumer class used in the program is FlinkKafkaConsumer082 - Jun

1 Answers

0
votes

The root cause of the error is

Exception for partition 19: kafka.common.UnknownException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at java.lang.Class.newInstance(Class.java:383)
    at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
    at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala)
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:406)
    at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:242)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.run(FlinkKafkaConsumer.java:397)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)

The UnknownException is most likely triggered by this error on the Kafka side:

[2016-01-25 12:45:30,195] ERROR [Replica Manager on Broker 2]: Error when processing fetch request for partition [WordCount,4] offset 335517 from consumer with correlation id 0. Possible cause: Attempt to read with a maximum offset (335515) less than the start offset (335517). (kafka.server.ReplicaManager)

I've filed a JIRA in Flink for the problem: https://issues.apache.org/jira/browse/FLINK-3288