We have a kubernetes cron job on GCP that submits several copies of the same Python dataflow job, each in its own container. Whenever we need a new copy of the job, we just add it to the spec->jobTemplate->spec->template->spec->containers part of the cron job yaml and adjust the dataflow job parameters as needed. This usually works fine, but the latest copy we tried to add does not work. The existing copies are all still working as expected. The job seems to fail on submission to GCP, and the error message is not very helpful:
Traceback (most recent call last):
File "/app/job.py", line 117, in <module>
newness.pipeline.run_dataflow(sys.argv)
File "/app/newness/pipeline.py", line 480, in run_dataflow
result = pipe.run()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 403, in run
self.to_runner_api(), self.runner, self._options).run(False)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.py", line 416, in run
return self.runner.run_pipeline(self)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 389, in run_pipeline
self.dataflow_client.create_job(self.job), self)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.py", line 184, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 504, in create_job
return self.submit_job_description(job)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/utils/retry.py", line 184, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/apiclient.py", line 551, in submit_job_description
response = self._client.projects_locations_jobs.Create(request)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/clients/dataflow/dataflow_v1b3_client.py", line 578, in Create
config, request, global_params=global_params)
File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python2.7/dist-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse
http_response, method_config=method_config, request=request)
apitools.base.py.exceptions.HttpError: <exception str() failed>
The job does not appear in the dataflow console at all.
The previous lines of the container logs look like:
2019-10-13T03:57:47.725542287Z Successfully downloaded apache-beam
2019-10-13T03:58:17.125601280Z INFO:root:Staging SDK sources from PyPI to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/dataflow_python_sdk.tar
2019-10-13T03:58:17.324843623Z INFO:root:Starting GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/dataflow_python_sdk.tar...
2019-10-13T03:58:24.825657227Z INFO:root:Completed GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/dataflow_python_sdk.tar
2019-10-13T03:58:25.225646529Z INFO:root:Downloading binary distribtution of the SDK from PyPi
2019-10-13T03:58:25.225716554Z INFO:root:Executing command: ['/usr/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmpk5TfMS', 'apache-beam==2.8.0', '--no-deps', '--only-binary', ':all:', '--python-version', '27', '--implementation', 'cp', '--abi', 'cp27mu', '--platform', 'manylinux1_x86_64']
2019-10-13T03:59:33.926186906Z Collecting apache-beam==2.8.0
2019-10-13T03:59:52.125678183Z Using cached https://files.pythonhosted.org/packages/0f/63/ea5453ba656d060936acf41d2ec057f23aafd69649e2129ac66fdda67d48/apache_beam-2.8.0-cp27-cp27mu-manylinux1_x86_64.whl
2019-10-13T04:00:11.525435891Z Saved /tmp/tmpk5TfMS/apache_beam-2.8.0-cp27-cp27mu-manylinux1_x86_64.whl
2019-10-13T04:00:12.025054706Z Successfully downloaded apache-beam
2019-10-13T04:00:26.726190542Z INFO:root:Staging binary distribution of the SDK from PyPI to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/apache_beam-2.8.0-cp27-cp27mu-manylinux1_x86_64.whl
2019-10-13T04:00:26.825618945Z INFO:root:Starting GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/apache_beam-2.8.0-cp27-cp27mu-manylinux1_x86_64.whl...
2019-10-13T04:00:33.725522899Z INFO:root:Completed GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1570936519.827087/apache_beam-2.8.0-cp27-cp27mu-manylinux1_x86_64.whl
2019-10-13T04:06:14.525017097Z Traceback (most recent call last):
...
Why is this job failing to submit? Are there any other logs we can look at to see the cause of this failure?
(Most of our dataflow jobs are written in Java, where the error messages are usually more helpful.)
UPDATE: Running job locally (Windows) with apache-beam 2.16 has the same issue but more logging detail:
...
INFO:root:Starting GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1571606418.971000/apache_beam-2.16.0-cp27-cp27mu-manylinux1_x86_64.whl...
INFO:root:Completed GCS upload to gs://gcs-bucket-name/staging/newness-boosting-c2898.1571606418.971000/apache_beam-2.16.0-cp27-cp27mu-manylinux1_x86_64.whl in 3 seconds.
WARNING:root:Discarding unparseable args: ['job.py', '--days_history=30']
WARNING:root:Discarding unparseable args: ['job.py', '--days_history=30']
WARNING:root:Retry with exponential backoff: waiting for 2.64795143823 seconds before retrying submit_job_description because we caught exception: error: [Errno 10053] An established connection was aborted by the software in your host machine
Traceback for above exception (most recent call last):
File "C:\Python27\lib\site-packages\apache_beam\utils\retry.py", line 206, in wrapper
return fun(*args, **kwargs)
File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\internal\apiclient.py", line 593, in submit_job_description
response = self._client.projects_locations_jobs.Create(request)
File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\internal\clients\dataflow\dataflow_v1b3_client.py", line 657, in Create
config, request, global_params=global_params)
File "C:\Python27\lib\site-packages\apitools\base\py\base_api.py", line 729, in _RunMethod
http, http_request, **opts)
File "C:\Python27\lib\site-packages\apitools\base\py\http_wrapper.py", line 346, in MakeRequest
check_response_func=check_response_func)
File "C:\Python27\lib\site-packages\apitools\base\py\http_wrapper.py", line 396, in _MakeRequestNoRetry
redirections=redirections, connection_type=connection_type)
File "C:\Python27\lib\site-packages\oauth2client\transport.py", line 169, in new_request
redirections, connection_type)
File "C:\Python27\lib\site-packages\oauth2client\transport.py", line 169, in new_request
redirections, connection_type)
File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1694, in request
(response, content) = self._request(conn, authority, uri, request_uri, method, body, headers, redirections, cachekey)
File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1434, in _request
(response, content) = self._conn_request(conn, request_uri, method, body, headers)
File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1390, in _conn_request
response = conn.getresponse()
File "C:\Python27\lib\httplib.py", line 1121, in getresponse
response.begin()
File "C:\Python27\lib\httplib.py", line 438, in begin
version, status, reason = self._read_status()
File "C:\Python27\lib\httplib.py", line 394, in _read_status
line = self.fp.readline(_MAXLINE + 1)
File "C:\Python27\lib\socket.py", line 480, in readline
data = self._sock.recv(self._rbufsize)
File "C:\Python27\lib\ssl.py", line 754, in recv
return self.read(buflen)
File "C:\Python27\lib\ssl.py", line 641, in read
v = self._sslobj.read(len)
... retries 4 times total ...
Traceback (most recent call last):
File "job.py", line 117, in <module>
newness.pipeline.run_dataflow(sys.argv)
File "C:\Users\LeeW\Desktop\newness\newness\pipeline.py", line 480, in run_dataflow
result = pipe.run()
File "C:\Python27\lib\site-packages\apache_beam\pipeline.py", line 407, in run
self._options).run(False)
File "C:\Python27\lib\site-packages\apache_beam\pipeline.py", line 420, in run
return self.runner.run_pipeline(self, self._options)
File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 485, in run_pipeline
self.dataflow_client.create_job(self.job), self)
File "C:\Python27\lib\site-packages\apache_beam\utils\retry.py", line 206, in wrapper
return fun(*args, **kwargs)
File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\internal\apiclient.py", line 546, in create_job
return self.submit_job_description(job)
File "C:\Python27\lib\site-packages\apache_beam\utils\retry.py", line 219, in wrapper
raise_with_traceback(exn, exn_traceback)
File "C:\Python27\lib\site-packages\apache_beam\utils\retry.py", line 206, in wrapper
return fun(*args, **kwargs)
File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\internal\apiclient.py", line 593, in submit_job_description
response = self._client.projects_locations_jobs.Create(request)
File "C:\Python27\lib\site-packages\apache_beam\runners\dataflow\internal\clients\dataflow\dataflow_v1b3_client.py", line 657, in Create
config, request, global_params=global_params)
File "C:\Python27\lib\site-packages\apitools\base\py\base_api.py", line 729, in _RunMethod
http, http_request, **opts)
File "C:\Python27\lib\site-packages\apitools\base\py\http_wrapper.py", line 346, in MakeRequest
check_response_func=check_response_func)
File "C:\Python27\lib\site-packages\apitools\base\py\http_wrapper.py", line 396, in _MakeRequestNoRetry
redirections=redirections, connection_type=connection_type)
File "C:\Python27\lib\site-packages\oauth2client\transport.py", line 169, in new_request
redirections, connection_type)
File "C:\Python27\lib\site-packages\oauth2client\transport.py", line 169, in new_request
redirections, connection_type)
File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1694, in request
(response, content) = self._request(conn, authority, uri, request_uri, method, body, headers, redirections, cachekey)
File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1434, in _request
(response, content) = self._conn_request(conn, request_uri, method, body, headers)
File "C:\Python27\lib\site-packages\httplib2\__init__.py", line 1390, in _conn_request
response = conn.getresponse()
File "C:\Python27\lib\httplib.py", line 1121, in getresponse
response.begin()
File "C:\Python27\lib\httplib.py", line 438, in begin
version, status, reason = self._read_status()
File "C:\Python27\lib\httplib.py", line 394, in _read_status
line = self.fp.readline(_MAXLINE + 1)
File "C:\Python27\lib\socket.py", line 480, in readline
data = self._sock.recv(self._rbufsize)
File "C:\Python27\lib\ssl.py", line 754, in recv
return self.read(buflen)
File "C:\Python27\lib\ssl.py", line 641, in read
v = self._sslobj.read(len)
socket.error: [Errno 10053] An established connection was aborted by the software in your host machine