Running Apache Beam DataFlow, I was able to run a Streaming pipeline successfully for 12 days (November 5-17), then DataFlow job stopped processing data. I see SSL errors when contacting AI Platform prediction and DataFlow shows:
Processing stuck in step <step_id> for at least <time_interval> without outputting or completing in state finish at <stack_trace>
Is it enough by handling SSL exception?, what is the best way to prevent this deadlock in DataFlow.
Related article here
Version
Streaming Job
Python 2.7 Apache Beam GCP: 2.16.0
Flow
Tweet Python listener (Reads Tweets using tweepy) -> PubSub -> DataFlow (Reads data from PubSub, calls AI Platform prediction) -> BigQuery
I tried to access my data today and noticed that Pipeline stopped processing back in November 17th. I see this error:
2019-12-06 21:15:26.960 PSTError message from worker: Processing stuck in step s02 for at least 476h25m00s without outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) at org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:330) at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) at org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1320) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:151) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1053) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
I also see this error:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -2222113: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 158, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 191, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 343, in do_instruction
request.instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 369, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 663, in process_bundle
data.ptransform_id].process_encoded(data.data)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 143, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 255, in apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
File "apache_beam/runners/worker/operations.py", line 256, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
self.consumer.process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
delayed_application = self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 919, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
self.consumer.process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
delayed_application = self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 919, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
self.consumer.process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
delayed_application = self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 441, in apache_beam.runners.common.SimpleInvoker.invoke_process
windowed_value, self.process_method(windowed_value.value))
File "streaming_twitter.py", line 203, in <lambda>
File "streaming_twitter.py", line 112, in estimate
File "streaming_twitter.py", line 96, in prediction
File "/usr/local/lib/python2.7/dist-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
return wrapped(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/googleapiclient/http.py", line 851, in execute
method=str(self.method), body=self.body, headers=self.headers)
File "/usr/local/lib/python2.7/dist-packages/googleapiclient/http.py", line 165, in _retry_request
resp, content = http.request(uri, method, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/google_auth_httplib2.py", line 198, in request
uri, method, body=body, headers=request_headers, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 2133, in request
cachekey,
File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 1796, in _request
conn, request_uri, method, body, headers
File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 1737, in _conn_request
response = conn.getresponse()
File "/usr/lib/python2.7/httplib.py", line 1121, in getresponse
response.begin()
File "/usr/lib/python2.7/httplib.py", line 438, in begin
version, status, reason = self._read_status()
File "/usr/lib/python2.7/httplib.py", line 394, in _read_status
line = self.fp.readline(_MAXLINE + 1)
File "/usr/lib/python2.7/socket.py", line 480, in readline
data = self._sock.recv(self._rbufsize)
File "/usr/lib/python2.7/ssl.py", line 766, in recv
return self.read(buflen)
File "/usr/lib/python2.7/ssl.py", line 653, in read
v = self._sslobj.read(len)
RuntimeError: error: [Errno 0] Error [while running 'generatedPtransform-2222099']
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:330)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1320)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:151)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1053)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -2222113: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 158, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 191, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 343, in do_instruction
request.instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 369, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 663, in process_bundle
data.ptransform_id].process_encoded(data.data)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 143, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 255, in apache_beam.runners.worker.operations.Operation.output
def output(self, windowed_value, output_index=0):
File "apache_beam/runners/worker/operations.py", line 256, in apache_beam.runners.worker.operations.Operation.output
cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
self.consumer.process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
delayed_application = self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 919, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
self.consumer.process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
delayed_application = self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise
File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
self._invoke_process_per_window(
File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
output_processor.process_outputs(
File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
def process_outputs(self, windowed_input_element, results):
File "apache_beam/runners/common.py", line 919, in apache_beam.runners.common._OutputProcessor.process_outputs
self.main_receivers.receive(windowed_value)
File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
self.consumer.process(windowed_value)
File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
with self.scoped_process_state:
File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
delayed_application = self.dofn_receiver.receive(o)
File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
self.process(windowed_value)
File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
self._reraise_augmented(exn)
File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
raise_with_traceback(new_exn)
File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
return self.do_fn_invoker.invoke_process(windowed_value)
File "apache_beam/runners/common.py", line 441, in apache_beam.runners.common.SimpleInvoker.invoke_process
windowed_value, self.process_method(windowed_value.value))
File "streaming_twitter.py", line 203, in <lambda>
File "streaming_twitter.py", line 112, in estimate
File "streaming_twitter.py", line 96, in prediction
File "/usr/local/lib/python2.7/dist-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
return wrapped(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/googleapiclient/http.py", line 851, in execute
method=str(self.method), body=self.body, headers=self.headers)
File "/usr/local/lib/python2.7/dist-packages/googleapiclient/http.py", line 165, in _retry_request
resp, content = http.request(uri, method, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/google_auth_httplib2.py", line 198, in request
uri, method, body=body, headers=request_headers, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 2133, in request
cachekey,
File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 1796, in _request
conn, request_uri, method, body, headers
File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 1737, in _conn_request
response = conn.getresponse()
File "/usr/lib/python2.7/httplib.py", line 1121, in getresponse
response.begin()
File "/usr/lib/python2.7/httplib.py", line 438, in begin
version, status, reason = self._read_status()
File "/usr/lib/python2.7/httplib.py", line 394, in _read_status
line = self.fp.readline(_MAXLINE + 1)
File "/usr/lib/python2.7/socket.py", line 480, in readline
data = self._sock.recv(self._rbufsize)
File "/usr/lib/python2.7/ssl.py", line 766, in recv
return self.read(buflen)
File "/usr/lib/python2.7/ssl.py", line 653, in read
v = self._sslobj.read(len)
RuntimeError: error: [Errno 0] Error [while running 'generatedPtransform-2222099']
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Processing stuck in step s02 for at least 05h20m00s without outputting or completing in state process
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.maybeWait(RemoteGrpcPortWriteOperation.java:175)
at org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.process(RemoteGrpcPortWriteOperation.java:196)
at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
at org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1320)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:151)
at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1053)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)