2
votes

I'm new to Apache Flink so I'm currently trying to do a few experiments. I'm reading a topic from Kafka and then printing it out on the console. After printing about 100k+ kafka messages, it throws exceptions. The log output is below.

I'm using a custom class which extends AbstractDeserializationSchema to deserialize the kafka record value. I've even tried putting some exception handling in that but that doesn't get triggered.

The code I use to consume from Kafka is pretty straightforward:

public class Main {

  private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
  private static final int FLINK_PARALLELISM = 1;

  public static void main(String[] args) {
    LOGGER.info("Starting Flink Kafka Consumer");

    try {

        Properties props = new Properties();
        props.put("bootstrap.servers", Arrays.asList(
                "localhost:9092"
        ));
        props.put("group.id", "test_flink");


        StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        flinkEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        flinkEnv.setParallelism(FLINK_PARALLELISM);

        FlinkKafkaConsumer011<String> kafkaConsumer = new FlinkKafkaConsumer011<>("test", new SimpleStringSchema(), props);
        DataStream<String> kafkaStream = flinkEnv.addSource(kafkaConsumer);

        kafkaStream.print();


        flinkEnv.execute("Flink Test");
    } catch (Exception e) {
        LOGGER.error("Exception thrown: {}", e.getMessage());
    }

  }
}

Even after the exceptions the output still gets appended to the file from the topic. The kafka topic is up and running but whether I publish anything to it or not, I have the MiniCluster stopping. I haven't been able to pinpoint what the issue is.

Could someone point me in the right direction? Thanks

INFO [Source: Custom Source -> Sink: Print to Std. 

Out (1/1)] o.a.f.s.c.k.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='test', partition=0}]
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.c.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = test_flink
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.u.AppInfoParser - Kafka version : 0.11.0.2
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.u.AppInfoParser - Kafka commitId : 73be1e1168f91ee2
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for group test_flink.
INFO [main] o.a.f.r.m.MiniCluster - Shutting down Flink Mini Cluster
INFO [main] o.a.f.r.d.DispatcherRestEndpoint - Shutting down rest endpoint.
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.t.TaskExecutor - Stopping TaskExecutor akka://flink/user/taskmanager_0.
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.s.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (eb1f62611a047c5da09d8fa6f4e49084) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Job Flink Test (670e9073fbab507c41a26b5641a265eb) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.i.d.i.IOManager - I/O manager removed spill file directory /tmp/flink-io-763eca47-ab9c-4985-aa30-c7ac21442635
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.i.n.NetworkEnvironment - Shutting down the network environment and its components.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Try to restart or fail the job Flink Test (670e9073fbab507c41a26b5641a265eb) if no longer possible.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Job Flink Test (670e9073fbab507c41a26b5641a265eb) switched from state FAILING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Could not restart the job Flink Test (670e9073fbab507c41a26b5641a265eb) because the restart strategy prevented it.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.c.CheckpointCoordinator - Stopping checkpoint coordinator for job 670e9073fbab507c41a26b5641a265eb.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.c.StandaloneCompletedCheckpointStore - Shutting down
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.t.JobLeaderService - Stop job leader service.
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.t.TaskExecutor - Stopped TaskExecutor akka://flink/user/taskmanager_0.
INFO [ForkJoinPool.commonPool-worker-9] o.a.f.r.d.DispatcherRestEndpoint - Removing cache directory /tmp/flink-web-ui
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.r.s.SlotManager - Closing the SlotManager.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.r.s.SlotManager - Suspending the SlotManager.
INFO [ForkJoinPool.commonPool-worker-9] o.a.f.r.d.DispatcherRestEndpoint - Shut down complete.
INFO [flink-akka.actor.default-dispatcher-3] o.a.f.r.j.JobMaster - Close ResourceManager connection 3a6541e469014e5685a7510403385dcb: ResourceManager leader changed to new address null.
INFO [PermanentBlobCache shutdown hook] o.a.f.r.b.PermanentBlobCache - Shutting down BLOB cache
INFO [TransientBlobCache shutdown hook] o.a.f.r.b.TransientBlobCache - Shutting down BLOB cache
INFO [BlobServer shutdown hook] o.a.f.r.b.BlobServer - Stopped BLOB server at 0.0.0.0:46065
1
It looks as if the MiniCluster is being shut down. As a consequence, the TaskExecutors and the JobMaster will be stopped as well. Could you share the complete logs with us and maybe also the complete program code and how you execute it (I assume from the IDE). - Till Rohrmann
I've added a simple example which is giving me the same problem. I've also updated the logs to reflect that. Basically doesn't matter if I publish to the topic or not, it always stops the MiniCluster - AsadSMalik
I changed the dependency to 1.4.1 from 1.7.1 and it works now! compile group: 'org.apache.flink', name: 'flink-java', version: '1.4.1 compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version:'1.4.1' compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.11_2.11', version: '1.4.1 - AsadSMalik
Great to hear :-) - Till Rohrmann
Any indication as to why it would fail in 1.7? - AsadSMalik

1 Answers

1
votes

Change the JDK version to jdk-8u231-windows-x64. I had the same problem and eventually solved it by changing the version of the JDK.