0
votes

I have been running an apache beam based data ingestion job which parses an input CSV file and writes data on a sink. This job works fine when one job is submitted at a time (under normal load circumstances).

But recently when I started load testing, I started scheduling multiple jobs sequentially in a loop where I observed some exceptions and job failures. Currently, I am using a script to schedule 9 jobs through Flink REST API for RUNNING. When these jobs are scheduled, sometimes all jobs are executing without any issue but the majority of the time 2 or 3 out of 9 jobs are failing with the following exceptions. I have tried the job with multiple input CSV files but it is showing similar behavior.

Exception 1:

2020-06-02 11:22:45,686 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The program plan could not be fetched - the program aborted pre-maturely.

System.err: (none)

System.out: (none)
        at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:108)
        at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
        at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
        at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Exception 2:

2020-06-02 11:22:46,035 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The program caused an error: 
        at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
        at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
        at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
        at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
        at org.apache.flink.client.program.OptimizerPlanEnvironment.execute(OptimizerPlanEnvironment.java:54)
        at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139)
        at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
        at com.spotify.scio.ScioContext.execute(ScioContext.scala:598)
        at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:586)
        at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:574)
        at com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:694)
        at com.spotify.scio.ScioContext.run(ScioContext.scala:574)
        at com.sparkcognition.foundation.ingest.jobs.delimitedingest.DelimitedIngest$.main(DelimitedIngest.scala:70)
        at com.sparkcognition.foundation.ingest.jobs.delimitedingest.DelimitedIngest.main(DelimitedIngest.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:557)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:449)
        at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
        ... 7 more

The job jar file is developed in Scala using Spotify's Scio library (0.8.0). The Flink cluster has these specs:

  • Flink version 1.8.3
  • 1 master and 2 worker nodes
  • 32 task slots with 2 task executors
  • job manager heap size = 48Gb
  • task executor heap size = 24Gb
1

1 Answers

0
votes

Submitting from the REST or UI, fails if you code (env.execute()) is enclosed on Exception() block. I see the stackTrace is similar, but succeeds sometimes and fails sometimes behavior is different. With my code, REST submission used to fail consistently. Check the below strackOverFlow reference and see if it helps. Issue with job submission from Flink Job UI (Exception:org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException)