0
votes

Running Apache Beam DataFlow, I was able to run a Streaming pipeline successfully for 12 days (November 5-17), then DataFlow job stopped processing data. I see SSL errors when contacting AI Platform prediction and DataFlow shows:

Processing stuck in step <step_id> for at least <time_interval> without outputting or completing in state finish at <stack_trace>

Is it enough by handling SSL exception?, what is the best way to prevent this deadlock in DataFlow.

Related article here

Version

Streaming Job
Python 2.7 Apache Beam GCP: 2.16.0

Flow

Tweet Python listener (Reads Tweets using tweepy) -> PubSub -> DataFlow (Reads data from PubSub, calls AI Platform prediction) -> BigQuery

I tried to access my data today and noticed that Pipeline stopped processing back in November 17th. I see this error:

2019-12-06 21:15:26.960 PSTError message from worker: Processing stuck in step s02 for at least 476h25m00s without outputting or completing in state finish at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693) at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323) at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) at org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:330) at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) at org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1320) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:151) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1053) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

I also see this error:

java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -2222113: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 158, in _execute
    response = task()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 191, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 343, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 369, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 663, in process_bundle
    data.ptransform_id].process_encoded(data.data)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 143, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 255, in apache_beam.runners.worker.operations.Operation.output
    def output(self, windowed_value, output_index=0):
  File "apache_beam/runners/worker/operations.py", line 256, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
    delayed_application = self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
    self._invoke_process_per_window(
  File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
    def process_outputs(self, windowed_input_element, results):
  File "apache_beam/runners/common.py", line 919, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
    delayed_application = self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
    self._invoke_process_per_window(
  File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
    def process_outputs(self, windowed_input_element, results):
  File "apache_beam/runners/common.py", line 919, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
    delayed_application = self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 441, in apache_beam.runners.common.SimpleInvoker.invoke_process
    windowed_value, self.process_method(windowed_value.value))
  File "streaming_twitter.py", line 203, in <lambda>
  File "streaming_twitter.py", line 112, in estimate
  File "streaming_twitter.py", line 96, in prediction
  File "/usr/local/lib/python2.7/dist-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/googleapiclient/http.py", line 851, in execute
    method=str(self.method), body=self.body, headers=self.headers)
  File "/usr/local/lib/python2.7/dist-packages/googleapiclient/http.py", line 165, in _retry_request
    resp, content = http.request(uri, method, *args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/google_auth_httplib2.py", line 198, in request
    uri, method, body=body, headers=request_headers, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 2133, in request
    cachekey,
  File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 1796, in _request
    conn, request_uri, method, body, headers
  File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 1737, in _conn_request
    response = conn.getresponse()
  File "/usr/lib/python2.7/httplib.py", line 1121, in getresponse
    response.begin()
  File "/usr/lib/python2.7/httplib.py", line 438, in begin
    version, status, reason = self._read_status()
  File "/usr/lib/python2.7/httplib.py", line 394, in _read_status
    line = self.fp.readline(_MAXLINE + 1)
  File "/usr/lib/python2.7/socket.py", line 480, in readline
    data = self._sock.recv(self._rbufsize)
  File "/usr/lib/python2.7/ssl.py", line 766, in recv
    return self.read(buflen)
  File "/usr/lib/python2.7/ssl.py", line 653, in read
    v = self._sslobj.read(len)
RuntimeError: error: [Errno 0] Error [while running 'generatedPtransform-2222099']

        java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
        org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:330)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
        org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1320)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:151)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1053)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -2222113: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 158, in _execute
    response = task()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 191, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 343, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 369, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 663, in process_bundle
    data.ptransform_id].process_encoded(data.data)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 143, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 255, in apache_beam.runners.worker.operations.Operation.output
    def output(self, windowed_value, output_index=0):
  File "apache_beam/runners/worker/operations.py", line 256, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
    delayed_application = self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
    self._invoke_process_per_window(
  File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
    def process_outputs(self, windowed_input_element, results):
  File "apache_beam/runners/common.py", line 919, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
    delayed_application = self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 834, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.PerWindowInvoker.invoke_process
    self._invoke_process_per_window(
  File "apache_beam/runners/common.py", line 659, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 880, in apache_beam.runners.common._OutputProcessor.process_outputs
    def process_outputs(self, windowed_input_element, results):
  File "apache_beam/runners/common.py", line 919, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 143, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 593, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 594, in apache_beam.runners.worker.operations.DoOperation.process
    delayed_application = self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 776, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 782, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 849, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 780, in apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 441, in apache_beam.runners.common.SimpleInvoker.invoke_process
    windowed_value, self.process_method(windowed_value.value))
  File "streaming_twitter.py", line 203, in <lambda>
  File "streaming_twitter.py", line 112, in estimate
  File "streaming_twitter.py", line 96, in prediction
  File "/usr/local/lib/python2.7/dist-packages/googleapiclient/_helpers.py", line 130, in positional_wrapper
    return wrapped(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/googleapiclient/http.py", line 851, in execute
    method=str(self.method), body=self.body, headers=self.headers)
  File "/usr/local/lib/python2.7/dist-packages/googleapiclient/http.py", line 165, in _retry_request
    resp, content = http.request(uri, method, *args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/google_auth_httplib2.py", line 198, in request
    uri, method, body=body, headers=request_headers, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 2133, in request
    cachekey,
  File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 1796, in _request
    conn, request_uri, method, body, headers
  File "/usr/local/lib/python2.7/dist-packages/httplib2/__init__.py", line 1737, in _conn_request
    response = conn.getresponse()
  File "/usr/lib/python2.7/httplib.py", line 1121, in getresponse
    response.begin()
  File "/usr/lib/python2.7/httplib.py", line 438, in begin
    version, status, reason = self._read_status()
  File "/usr/lib/python2.7/httplib.py", line 394, in _read_status
    line = self.fp.readline(_MAXLINE + 1)
  File "/usr/lib/python2.7/socket.py", line 480, in readline
    data = self._sock.recv(self._rbufsize)
  File "/usr/lib/python2.7/ssl.py", line 766, in recv
    return self.read(buflen)
  File "/usr/lib/python2.7/ssl.py", line 653, in read
    v = self._sslobj.read(len)
RuntimeError: error: [Errno 0] Error [while running 'generatedPtransform-2222099']

        org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
        org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140)
        org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:249)
        org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
        org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
        org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:297)
        org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:738)
        org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Processing stuck in step s02 for at least 05h20m00s without outputting or completing in state process
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
  at org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.maybeWait(RemoteGrpcPortWriteOperation.java:175)
  at org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.process(RemoteGrpcPortWriteOperation.java:196)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
  at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
  at org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
  at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1320)
  at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:151)
  at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1053)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

AI Platform

1
General heads up, I have definitely had streaming jobs just randomly get stuck for no reason whatsoever. Used to happen more in older versions of dataflow, but could still be happening.CasualT

1 Answers

2
votes

Are you running some extremely slow operation during bundle finalization (DoFn.finish_bundle or DoFn.teardown) (for example, a per-element RPC call) ? If so please try to optimize to do that in batches. As the page you referred to points out, "Processing stuck" warning just means that DoFn did not transition out of a given state for a long period. Could be due to something being very slow or actually being stuck, for example while performing some RPC. If actually stuck, please try introducing a timeout to that operation.