0
votes

We have a kubernetes cron job on GCP that submits several copies of the same Python dataflow job, each in its own container. Whenever we need a new copy of the job, we just add it to the spec->jobTemplate->spec->template->spec->containers part of the cron job yaml and adjust the dataflow job parameters as needed. This usually works fine, but the latest copy we tried to add does not work. The existing copies are all still working as expected. The job seems to fail on submission to GCP, and the error message is not very helpful:

Traceback (most recent call last):
  File "/app/job.py", line 117, in <module>
    newness.pipeline.run_dataflow(sys.argv)
  File "/app/newness/pipeline.py", line 480, in run_dataflow
    result = pipe.run()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 403, in run
    self.to_runner_api(), self.runner, self._options).run(False)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 416, in run
    return self.runner.run_pipeline(self)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 389, in run_pipeline
    self.dataflow_client.create_job(self.job), self)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.py", line 184, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 504, in create_job
    return self.submit_job_description(job)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.py", line 184, in wrapper
    return fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 551, in submit_job_description
    response = self._client.projects_locations_jobs.Create(request)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py", line 578, in Create
    config, request, global_params=global_params)
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
    return self.ProcessHttpResponse(method_config, http_response, request)
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
    self.__ProcessHttpResponse(method_config, http_response, request))
  File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse
    http_response, method_config=method_config, request=request)
apitools.base.py.exceptions.HttpError: <exception str() failed>

The job does not appear in the dataflow console at all.

The previous lines of the container logs look like:

2019-10-13T03:57:47.725542287Z Successfully downloaded apache-beam

2019-10-13T03:58:17.125601280Z INFO:root:Staging SDK sources from PyPI to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/dataflow_python_sdk.tar

2019-10-13T03:58:17.324843623Z INFO:root:Starting GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/dataflow_python_sdk.tar...

2019-10-13T03:58:24.825657227Z INFO:root:Completed GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/dataflow_python_sdk.tar

2019-10-13T03:58:25.225646529Z INFO:root:Downloading binary distribtution of the SDK from PyPi

2019-10-13T03:58:25.225716554Z INFO:root:Executing command: ['/usr/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmpk5TfMS', 'apache-beam==2.8.0', '--no-deps', '--only-binary', ':all:', '--python-version', '27', '--implementation', 'cp', '--abi', 'cp27mu', '--platform', 'manylinux1_x86_64']

2019-10-13T03:59:33.926186906Z Collecting apache-beam==2.8.0

2019-10-13T03:59:52.125678183Z   Using cached https://files.pythonhosted.org/packages/0f/63/ea5453ba656d060936acf41d2ec057f23aafd69649e2129ac66fdda67d48/apache_beam-2.8.0-cp27-cp27mu-manylinux1_x86_64.whl

2019-10-13T04:00:11.525435891Z   Saved /tmp/tmpk5TfMS/apache_beam-2.8.0-cp27-cp27mu-manylinux1_x86_64.whl

2019-10-13T04:00:12.025054706Z Successfully downloaded apache-beam

2019-10-13T04:00:26.726190542Z INFO:root:Staging binary distribution of the SDK from PyPI to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/apache_beam-2.8.0-cp27-cp27mu-manylinux1_x86_64.whl

2019-10-13T04:00:26.825618945Z INFO:root:Starting GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/apache_beam-2.8.0-cp27-cp27mu-manylinux1_x86_64.whl...

2019-10-13T04:00:33.725522899Z INFO:root:Completed GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/apache_beam-2.8.0-cp27-cp27mu-manylinux1_x86_64.whl

2019-10-13T04:06:14.525017097Z Traceback (most recent call last):
...

Why is this job failing to submit? Are there any other logs we can look at to see the cause of this failure?

(Most of our dataflow jobs are written in Java, where the error messages are usually more helpful.)

UPDATE: Running job locally (Windows) with apache-beam 2.16 has the same issue but more logging detail:

...
INFO:root:Starting GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1571606418.971000/apache_beam-2.16.0-cp27-cp27mu-manylinux1_x86_64.whl...
INFO:root:Completed GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1571606418.971000/apache_beam-2.16.0-cp27-cp27mu-manylinux1_x86_64.whl in 3 seconds.
WARNING:root:Discarding unparseable args: ['job.py', '--days_history=30']
WARNING:root:Discarding unparseable args: ['job.py', '--days_history=30']
WARNING:root:Retry with exponential backoff: waiting for 2.64795143823 seconds before retrying submit_job_description because we caught exception: error: [Errno 10053] An established connection was aborted by the software in your host machine
 Traceback for above exception (most recent call last):
  File "C:\Python27\lib\site-packages\apache_beam\utils\retry.py", line 206, in wrapper
    return fun(*args, **kwargs)
  File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\internal\apiclient.py", line 593, in submit_job_description
    response = self._client.projects_locations_jobs.Create(request)
  File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\internal\clients\dataflow\dataflow_v1b3_client.py", line 657, in Create
    config, request, global_params=global_params)
  File "C:\Python27\lib\site-packages\apitools\base\py\base_api.py", line 729, in _RunMethod
    http, http_request, **opts)
  File "C:\Python27\lib\site-packages\apitools\base\py\http_wrapper.py", line 346, in MakeRequest
    check_response_func=check_response_func)
  File "C:\Python27\lib\site-packages\apitools\base\py\http_wrapper.py", line 396, in _MakeRequestNoRetry
    redirections=redirections, connection_type=connection_type)
  File "C:\Python27\lib\site-packages\oauth2client\transport.py", line 169, in new_request
    redirections, connection_type)
  File "C:\Python27\lib\site-packages\oauth2client\transport.py", line 169, in new_request
    redirections, connection_type)
  File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1694, in request
    (response, content) = self._request(conn, authority, uri, request_uri, method, body, headers, redirections, cachekey)
  File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1434, in _request
    (response, content) = self._conn_request(conn, request_uri, method, body, headers)
  File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1390, in _conn_request
    response = conn.getresponse()
  File "C:\Python27\lib\httplib.py", line 1121, in getresponse
    response.begin()
  File "C:\Python27\lib\httplib.py", line 438, in begin
    version, status, reason = self._read_status()
  File "C:\Python27\lib\httplib.py", line 394, in _read_status
    line = self.fp.readline(_MAXLINE + 1)
  File "C:\Python27\lib\socket.py", line 480, in readline
    data = self._sock.recv(self._rbufsize)
  File "C:\Python27\lib\ssl.py", line 754, in recv
    return self.read(buflen)
  File "C:\Python27\lib\ssl.py", line 641, in read
    v = self._sslobj.read(len)

... retries 4 times total ...

Traceback (most recent call last):
  File "job.py", line 117, in <module>
    newness.pipeline.run_dataflow(sys.argv)
  File "C:\Users\LeeW\Desktop\newness\newness\pipeline.py", line 480, in run_dataflow
    result = pipe.run()
  File "C:\Python27\lib\site-packages\apache_beam\pipeline.py", line 407, in run
    self._options).run(False)
  File "C:\Python27\lib\site-packages\apache_beam\pipeline.py", line 420, in run
    return self.runner.run_pipeline(self, self._options)
  File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 485, in run_pipeline
    self.dataflow_client.create_job(self.job), self)
  File "C:\Python27\lib\site-packages\apache_beam\utils\retry.py", line 206, in wrapper
    return fun(*args, **kwargs)
  File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\internal\apiclient.py", line 546, in create_job
    return self.submit_job_description(job)
  File "C:\Python27\lib\site-packages\apache_beam\utils\retry.py", line 219, in wrapper
    raise_with_traceback(exn, exn_traceback)
  File "C:\Python27\lib\site-packages\apache_beam\utils\retry.py", line 206, in wrapper
    return fun(*args, **kwargs)
  File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\internal\apiclient.py", line 593, in submit_job_description
    response = self._client.projects_locations_jobs.Create(request)
  File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\internal\clients\dataflow\dataflow_v1b3_client.py", line 657, in Create
    config, request, global_params=global_params)
  File "C:\Python27\lib\site-packages\apitools\base\py\base_api.py", line 729, in _RunMethod
    http, http_request, **opts)
  File "C:\Python27\lib\site-packages\apitools\base\py\http_wrapper.py", line 346, in MakeRequest
    check_response_func=check_response_func)
  File "C:\Python27\lib\site-packages\apitools\base\py\http_wrapper.py", line 396, in _MakeRequestNoRetry
    redirections=redirections, connection_type=connection_type)
  File "C:\Python27\lib\site-packages\oauth2client\transport.py", line 169, in new_request
    redirections, connection_type)
  File "C:\Python27\lib\site-packages\oauth2client\transport.py", line 169, in new_request
    redirections, connection_type)
  File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1694, in request
    (response, content) = self._request(conn, authority, uri, request_uri, method, body, headers, redirections, cachekey)
  File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1434, in _request
    (response, content) = self._conn_request(conn, request_uri, method, body, headers)
  File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1390, in _conn_request
    response = conn.getresponse()
  File "C:\Python27\lib\httplib.py", line 1121, in getresponse
    response.begin()
  File "C:\Python27\lib\httplib.py", line 438, in begin
    version, status, reason = self._read_status()
  File "C:\Python27\lib\httplib.py", line 394, in _read_status
    line = self.fp.readline(_MAXLINE + 1)
  File "C:\Python27\lib\socket.py", line 480, in readline
    data = self._sock.recv(self._rbufsize)
  File "C:\Python27\lib\ssl.py", line 754, in recv
    return self.read(buflen)
  File "C:\Python27\lib\ssl.py", line 641, in read
    v = self._sslobj.read(len)
socket.error: [Errno 10053] An established connection was aborted by the software in your host machine
1

1 Answers

0
votes

Which version of Beam Python SDK are you using?