0
votes

I've deploy a flink cluster on kubernetes, this is composed of 1 jobmanager and 6 taskmanagers. I tried to run a flink job that consume high amount of data on that cluster. But It seems that it is not resilient since when a taskmanager pod restart, the whole job failed. So i was wondering if flink cluster deployed on top of kubernetes was resilient to failure? Cause it happens very often that a taskmanager pod restarts

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Pipeline execution failed
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: Pipeline execution failed
        at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:100)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:322)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:308)
        at eu.engielab.csai.SemanticApp.runSemantic(SemanticApp.java:48)
        at eu.engielab.csai.SemanticApp.main(SemanticApp.java:76)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        ... 8 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 3d1314d6ead3f577523e28c71a347e59)
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.flink.client.program.ContextEnvironment.getJobExecutionResult(ContextEnvironment.java:117)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:74)
        at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:153)
        at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:97)
        ... 17 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 3d1314d6ead3f577523e28c71a347e59)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:125)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$24(RestClusterClient.java:670)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$9(FutureUtils.java:394)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
        at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
        ... 24 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
        at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
        at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
        at sun.reflect.GeneratedMethodAccessor34.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException: Exception in chained task 'FlatMap (FlatMap at TextIO.Read4/Read/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Drop key/Values/Map/ParMultiDo(Anonymous).output)'
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
        at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
        at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
        at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:124)
        at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:60)
        at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
        at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException: Exception in chained task 'FlatMap (FlatMap at TextIO.Read4/Read/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Drop key/Values/Map/ParMultiDo(Anonymous).output)'
        at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:81)
        at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:230)
        at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:272)
        at org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
        at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:418)
        at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
        at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: javax.net.ssl.SSLException: Connection reset
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
        at org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded$NaiveProcessFn.process(SplittableParDoNaiveBounded.java:309)
        at org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded$NaiveProcessFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
        at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
        at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:124)
        at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:60)
        at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
        at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:60)
        at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:35)
        at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
        at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:230)
        at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:272)
        at org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
        at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:418)
        at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
        at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
        at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
        at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
        at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:124)
        at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:60)
        at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
        at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: javax.net.ssl.SSLException: Connection reset
        at org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$BoundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:374)
        at org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$BoundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:317)
        at org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.tryClaim(RestrictionTrackers.java:59)
        at org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.processElement(Read.java:303)
Caused by: javax.net.ssl.SSLException: Connection reset
        at sun.security.ssl.Alert.createSSLException(Alert.java:127)
        at sun.security.ssl.TransportContext.fatal(TransportContext.java:324)
        at sun.security.ssl.TransportContext.fatal(TransportContext.java:267)
        at sun.security.ssl.TransportContext.fatal(TransportContext.java:262)
        at sun.security.ssl.SSLTransport.decode(SSLTransport.java:138)
        at sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1386)
        at sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1354)
        at sun.security.ssl.SSLSocketImpl.access$300(SSLSocketImpl.java:73)
        at sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:948)
        at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
        at org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:197)
        at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176)
        at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135)
        at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
        at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
        at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
        at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
        at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
        at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
        at java.security.DigestInputStream.read(DigestInputStream.java:161)
        at com.amazonaws.services.s3.internal.DigestValidationInputStream.read(DigestValidationInputStream.java:59)
        at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
        at com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125)
        at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
        at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
        at org.apache.beam.sdk.io.aws.s3.S3ReadableSeekableByteChannel.read(S3ReadableSeekableByteChannel.java:106)
        at org.apache.beam.sdk.io.TextSource$TextBasedReader.tryToEnsureNumberOfBytesInBuffer(TextSource.java:273)
        at org.apache.beam.sdk.io.TextSource$TextBasedReader.findDelimiterBounds(TextSource.java:182)
        at org.apache.beam.sdk.io.TextSource$TextBasedReader.readNextRecord(TextSource.java:237)
        at org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:487)
        at org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.advance(OffsetBasedSource.java:258)
        at org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$BoundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:352)
        at org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$BoundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:317)
        at org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.tryClaim(RestrictionTrackers.java:59)
        at org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.processElement(Read.java:303)
        at org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded$NaiveProcessFn.process(SplittableParDoNaiveBounded.java:309)
        at org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded$NaiveProcessFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
        at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
        at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:124)
        at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:60)
        at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
        at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:60)
        at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:35)
        at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
        at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$DoFnOutputManager.output(FlinkDoFnFunction.java:230)
        at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:272)
        at org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:84)
        at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:418)
        at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
        at org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
        at org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
        at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:191)
        at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
        at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:124)
        at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.flatMap(FlinkDoFnFunction.java:60)
        at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
        at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
        at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)
        at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:514)
        at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:357)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:776)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
        at java.lang.Thread.run(Thread.java:748)
        Suppressed: java.net.SocketException: Broken pipe (Write failed)
                at java.net.SocketOutputStream.socketWrite0(Native Method)
                at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
                at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
                at sun.security.ssl.SSLSocketOutputRecord.encodeAlert(SSLSocketOutputRecord.java:81)
                at sun.security.ssl.TransportContext.fatal(TransportContext.java:355)
                ... 70 more
Caused by: java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:210)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:457)
        at sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:237)
        at sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:190)
        at sun.security.ssl.SSLTransport.decode(SSLTransport.java:109)
        ... 67 more

flink-taskmanager-54c69c65d8-2vsjs                    1/1     Running            0          16h
flink-taskmanager-54c69c65d8-wj5sf                    1/1     Running            0          16h
flink-jobmanager-7f4c5c56b8-pt8jk                     1/1     Running            0          16h
flink-taskmanager-54c69c65d8-npmhg                    1/1     Running            1          16h
flink-taskmanager-54c69c65d8-dkxrn                    1/1     Running            1          16h
2
Even without k8s Flink is resilient to failures. Why exactly your Flink job is failing on top of k8s? Is the job getting at least running at some period and then failing? Did u check if you have enough pods to deploy all TM, slots, and flink sub tasks? Are you using stateful address for pod+TMs? - Felipe
i added full log. In fact, everytime a taskmanager restarts, the job fails automatically. And I can't explain why. Yes, normally I have enough pod to deploy all TM, I checked the resources of my aws instance. I don't think so i'm using stateful address. They are all replicatset. I install my k8s flink following this tutorial : ci.apache.org/projects/flink/flink-docs-master/docs/deployment/… - coolguy2021

2 Answers

0
votes

Unless a job is able to take advantage of fine-grained recovery, any task manager failure will cause all jobs running on that TM to fail and restart. This is normal. What you should be trying to figure out is why the pod is restarting. One common cause of this in containerized environments is not having the memory properly configured, in which case out-of-memory exceptions become a frequent occurrence.

0
votes

i added more memory to each taskmanager and increase jvm memory, and it works now, below what i added in the flink conf file :

taskmanager.memory.process.size: 7g
taskmanager.memory.jvm-metaspace.size: 1g
taskmanager.memory.jvm-overhead.max: 2048m