Our SDK version is Apache Beam Python 3.7 SDK 2.25.0
There is a pipeline which reads data from Pub/Sub, transforms it and saves results to GCS. Usually it works fine for 1-2 weeks. After that it stucks.
"Operation ongoing in step s01 for at least 05m00s without outputting or completing in state process
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.maybeWait(RemoteGrpcPortWriteOperation.java:175)
at org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.process(RemoteGrpcPortWriteOperation.java:196)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
at org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1400)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:156)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1101)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Step 01 is just a "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=subscription)
After this dataflow increases the number of workers and stops processing any new data. Job is still in RUNNNING
state.
We just need to restart the job to solve it. But it happens every ~2 weeks.
How can we fix it?