I have been running an apache beam based data ingestion job which parses an input CSV file and writes data on a sink. This job works fine when one job is submitted at a time (under normal load circumstances).
But recently when I started load testing, I started scheduling multiple jobs sequentially in a loop where I observed some exceptions and job failures. Currently, I am using a script to schedule 9 jobs through Flink REST API for RUNNING
. When these jobs are scheduled, sometimes all jobs are executing without any issue but the majority of the time 2 or 3 out of 9 jobs are failing with the following exceptions. I have tried the job with multiple input CSV files but it is showing similar behavior.
Exception 1:
2020-06-02 11:22:45,686 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The program plan could not be fetched - the program aborted pre-maturely.
System.err: (none)
System.out: (none)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:108)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Exception 2:
2020-06-02 11:22:46,035 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The program caused an error:
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at org.apache.flink.client.program.OptimizerPlanEnvironment.execute(OptimizerPlanEnvironment.java:54)
at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139)
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
at com.spotify.scio.ScioContext.execute(ScioContext.scala:598)
at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:586)
at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:574)
at com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:694)
at com.spotify.scio.ScioContext.run(ScioContext.scala:574)
at com.sparkcognition.foundation.ingest.jobs.delimitedingest.DelimitedIngest$.main(DelimitedIngest.scala:70)
at com.sparkcognition.foundation.ingest.jobs.delimitedingest.DelimitedIngest.main(DelimitedIngest.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:557)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:449)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
... 7 more
The job jar file is developed in Scala using Spotify's Scio library (0.8.0). The Flink cluster has these specs:
- Flink version 1.8.3
- 1 master and 2 worker nodes
- 32 task slots with 2 task executors
- job manager heap size = 48Gb
- task executor heap size = 24Gb