0
votes

I'm using Dataflow to write data into BigQuery using BigQueryIO.Write.to().

Sometimes, I get this warning from Dataflow:

{
 metadata: {
  severity: "WARNING"    
  projectId: "[...]"    
  serviceName: "dataflow.googleapis.com"    
  region: "us-east1-d"    
  labels: {
   compute.googleapis.com/resource_type: "instance"     
   compute.googleapis.com/resource_name: "dataflow-[...]-08240401-e41e-harness-7dkd"     
   dataflow.googleapis.com/region: "us-east1-d"     
   dataflow.googleapis.com/job_name: "[...]"     
   compute.googleapis.com/resource_id: "[...]"     
   dataflow.googleapis.com/step_id: ""     
   dataflow.googleapis.com/job_id: "[...]"     
  }
  timestamp: "2016-08-30T11:32:00.591Z"    
  projectNumber: "[...]"    
 }
 insertId: "[...]"   
 log: "dataflow.googleapis.com/worker"   
 structPayload: {
  message: "exception thrown while executing request"    
  work: "[...]"    
  thread: "117"    
  worker: "dataflow-[...]-08240401-e41e-harness-7dkd"    
  exception: "java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:170)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
    at sun.security.ssl.InputRecord.read(InputRecord.java:503)
    at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961)
    at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918)
    at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
    at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:704)
    at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:647)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1535)
    at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1440)
    at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
    at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
    at com.google.api.client.http.javanet.NetHttpResponse.<init>(NetHttpResponse.java:37)
    at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:94)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:981)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:419)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:352)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:469)
    at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter$1.call(BigQueryTableInserter.java:229)
    at com.google.cloud.dataflow.sdk.util.BigQueryTableInserter$1.call(BigQueryTableInserter.java:222)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)"    
  logger: "com.google.api.client.http.HttpTransport"    
  stage: "F5"    
  job: "[...]"    
 }
}

I don't see any "retry" log following this one.

My questions are:

  • Am I losing data? I don't know if the write operation is done correctly. If I understand the code correctly the entire write batch is in an uncertain state.
  • If so, is there a way for me to be certain to write data to BigQuery exactly once?
  • If so, shouldn't severity be ERROR instead of WARNING?

Here's a bit of context of my usage:

  • I'm using Dataflow in streaming mode, reading from Kafka using KafkaIO.java
  • "Sometimes" can be from 0 to 3 times per hour
  • Depending on the job, I'm using 2 to 36 workers of type n1-standard-4
  • Depending on the job, I'm writing from 3k to 10k messages/s to BigQuery
  • Average message size is 3kB
  • Dataflow workers are in the us-east1-d zone, BigQuery dataset location is US
1

1 Answers

1
votes

You will see these errors related to transient issues coming from the BigQuery streaming service. My experience is that you may see these spattered over the life of a job. If you see a massive breakout of these logs that typically means that the BigQuery streaming service is experiencing a failure.

Cloud Dataflow will retry the rows for the request (see code here BigQuery... line 290). If you are not seeing those log items or your records in the table at some point after the warning - there is something else wrong.

In streaming mode the service will retry ad infinitum. Meaning the job will not fail due to this issue. Since we try forever - it does beg the question as to whether this is an error or a warning. We will debate this internally, you could also post a note to the Apache Beam user group to push the debate :-)

You could create a metric on that warning message in Cloud Logging and take action on it. We are working on deeper Stackdriver integration and this is good use case.

You are not losing data, rather your data arrival in BigQuery will be delayed. I have built some simple fixed window and counts for say 1 minute windows - using event processing time. Then I look at the counts over time as an indicator of freshness. If my fixed window is lagging behind the watermark something is wrong with inserts.

  • Edited for additional clarification based on comment

In the case of IOException, which this exception inherits from, the path then calls ApiErrorExtractor() to test if this due to a rate limiting issue.

In this case SocketTimeout is not due to rate limiting, therefore the exception is thrown to the caller. The caller is BigQuery.IO line 2308 in finishBundle. It calls flushRows() which catches IOException and throws RuntimeException.

In steaming mode any bundle that fails in this manner is retried ad infinitum. Note: In batch mode the runner will attempt 4 times and then fail.

In this case (non-rate limiting case) you will not retry row logs.

You data is not lost, rather it will be delayed as the bundles are retried.

The worst case scenario is that all workers are experiencing this issue and therefore the pipeline cannot make progress. This could happen say if the BigQuery streaming service was down or dropping all connections. Now -- once the BiqQuery ingest service stabilizes and bundles get through you might see the rate limiting case kick in, but the back off code will help dampen those errors.

The ultimate worse case is that your incoming pipeline data rate is constantly hovering close to the maximum write rate (rate limiting rate) as governed by the BigQuery streaming ingest services. Thus if you experience a backlog from retries (transient or otherwise) - your pipeline may never catch up.

There is a Drain feature in streaming Dataflow which will stop processing incoming data and then advance the pipeline to gracefully drain all outstanding windows. However, Drain requires that finishBundle() succeeds. Thus, in this case (SocketTimeout) Drain will be stuck. If you terminated the pipeline vs. drain - you would experience data loss for the unfinished bundles.

If you wanted you could override the BigQuery.IO logic and pipe the data that is erroring someplace else. You could do this, but I'd rely on BigQuery streaming service to never have a terminal outage. With that said if you are running constantly at near rate limiting rates and are sensitive to unrecoverable backlog processing, you may want to implement a different reduction or sharding mechanism to avoid rate limiting issues.

One other suggestion around backlog recovery is that you could stop the event flow into your streaming source. For example, stop writing to the topic in Pub/Sub. You'd start writing to another topic with a subscription. Your existing Dataflow pipeline would drain down on the existing topic. You'd still have to deal with how to deal with the new backlog in the new subscription, but at least you are guaranteed to not lose any data within the existing pipeline.

If you are not using event time processing this approach might be pretty valid; however, you are using event time processing your windows will have overlapping output that are both marked as ONTIME even though this would not be the case.

Lots of assumptions on my part here in relation to your use case, but I wanted to share as your question raises other architectural concepts when thinking about data loss.

Hope this helps.