0
votes

I've been getting the below error in the dataflow pipeline after running it for few hours and I think this cause to loose some data. I am basically loosing data for few month which used to have, but the pipeline finish successfully and it took 4 more hours that it used to take to finish(12hr this time).

(c1e6e4a686086ce4): java.io.IOException: com.google.api.client.googleapis.json.GoogleJsonResponseException: 503 Service Unavailable at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:431) at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:289) at com.google.cloud.dataflow.sdk.runners.worker.TextSink$TextFileWriter.close(TextSink.java:243) at com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.finish(WriteOperation.java:100) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:254) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:191) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:144) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:180) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:161) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:148) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 503 Service Unavailable at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:145) at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:113) at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:40) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:432) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357) ... 4 more


(327e81fa21383d97): java.io.IOException: java.io.IOException: Pipe broken at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:431) at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:289) at com.google.cloud.dataflow.sdk.runners.worker.TextSink$TextFileWriter.close(TextSink.java:243) at com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.finish(WriteOperation.java:100) at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:254) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:191) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:144) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:180) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:161) at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:148) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Pipe broken at java.io.PipedInputStream.read(PipedInputStream.java:321) at java.io.PipedInputStream.read(PipedInputStream.java:377) at com.google.api.client.util.ByteStreams.read(ByteStreams.java:181) at com.google.api.client.googleapis.media.MediaHttpUploader.setContentAndHeadersOnCurrentRequest(MediaHttpUploader.java:629) at com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:409) at com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:336) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:427) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352) at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469) at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:357) ... 4 more

1
This looks like a transient error. Dataflow retries such errors transparently (though it will notify you of them - as it did) and without data loss. Are you confident your pipeline produced an incorrect result? If so, please provide more info: e.g. job ID and how you concluded which data is lost? - jkff
@jkff What I know is we haven't change any code and our source of the pipeline has the all the data. jobId: 2016-07-29_17_27_22-714252920423430559 - Lahiru
I looked at this job. It seems to match what I said in my previous comment: the transient errors were successfully retried, and the result should be correct. Please let me know more detail if you have reason to believe that some data was lost. - jkff
@jkff Thanks for the response. At the end of the job we store it in to bigquery and I clearly see missing data for few months. Then I ran with a time range for missing data and generated it and I am unblocked. But I am still curious why it took 4 hour extra even with less data processing ? I usually don't see so many error, I believe 4hr extra is because of retrying logic. - Lahiru
I'd like to investigate this more. Could you please contact dataflow-feedback@ with more details about what kind of data the job is expected to produce and what it really produced? I'm seeing that the job retried a very small amount of work compared to the total work (4 shards out of many thousands), so this alone seem unlikely to explain loss of several months of data - something else must be going on. It would also help to have an ID of a previous similar job that ran 4h faster, to compare what parts exactly got slower. - jkff

1 Answers

0
votes

The issue here is from some old records in Storage with an old schemaVersion. These weren't being cleaned up, and so the pipeline was failing when using a range that included the old schema versions.