1
votes

I have an Apache Beam (v2.15.0) job executed in Google Cloud Dataflow. This job is a streaming job that reads messages from Google Cloud PubSub, groups the data and tries to write them into InfluxDB.

The main problem is that when the data have to be written in InfluxDB too many write requests are sent to InfluxDB at the same moment and I receive a lot of the following error:

{
 insertId: "750692785846999589:29668:0:536239"  
 jsonPayload: {
  job: "2019-09-09_07_41_32-8614892132258718725"   
  logger: "functions.DashboardFunctions$SendPostLiteFn2"   
  message: "The writing to InfluxDB has failed. LOG: engine: error syncing wal"   
  stage: "P2"   
  step: "L-I-SendPost"   
  thread: "339"   
  work: "5a8|130|1|091443-992359958146824871"   
  worker: "dataflow-dashboard-test-09090741-ixek-harness-r7cz"   
 }
 labels: {
  compute.googleapis.com/resource_id: "750692785846999589"   
  compute.googleapis.com/resource_name: "dataflow-dashboard-test-09090741-ixek-harness-r7cz"   
  compute.googleapis.com/resource_type: "instance"   
  dataflow.googleapis.com/job_id: "2019-09-09_07_41_32-8614892132258718725"   
  dataflow.googleapis.com/job_name: "dataflow-dashboard-test"   
  dataflow.googleapis.com/region: "europe-west1"   
 }
 logName: "projects/XXXX/logs/dataflow.googleapis.com%2Fworker"  
 receiveTimestamp: "2019-09-09T14:46:50.419635294Z"  
 resource: {
  labels: {
   job_id: "2019-09-09_07_41_32-8614892132258718725"    
   job_name: "dataflow-dashboard-test"    
   project_id: "XXXX"    
   region: "europe-west1"    
   step_id: "L-I-SendPost"    
  }
  type: "dataflow_step"   
 }
 severity: "ERROR"  
 timestamp: "2019-09-09T14:46:35.640Z"  
}

I don't understand this behaviour since I think that I'm batching the writes...

Below you can find the code I've written to send the data in batches to InfluxDB:

/**
 * A DoFn that sends POSTS to InfluxDB.
 */
public static class WrtieToInfluxDB extends DoFn<List<String>, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(WrtieToInfluxDB.class);

    private InfluxDB influxClient = null;
    private BatchPoints batchPoints = null;

    @Setup
    public void initInfluxClient() {
        OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder()
                .connectTimeout(40, TimeUnit.SECONDS)
                .readTimeout(60, TimeUnit.SECONDS)
                .writeTimeout(60, TimeUnit.SECONDS);

        this.influxClient =
                InfluxDBFactory
                        .connect(
                                Settings.getInfluxUrl(),
                                Settings.getInfluxUsername(),
                                Settings.getInfluxPassword(),
                                okHttpClientBuilder)
                        .enableGzip();
    }

    @StartBundle
    public void startBundle(StartBundleContext context) {
        Util.MyOptions myOptions = context.getPipelineOptions().as(Util.MyOptions.class);

        this.batchPoints =
                BatchPoints
                        .database(Settings.getDbNameProvider(Settings.StatsType.DASHBOARD, myOptions.getTest()))
                        .consistency(InfluxDB.ConsistencyLevel.ONE)
                        .build();
    }

    @ProcessElement
    public void processElement(final ProcessContext c) {
        // Code to builds the point
        Point.Builder point = Point
            ...

        // Inserts the point into the batch object
        this.batchPoints.point(point.build());
    }

    @FinishBundle
    public void finishBundle(FinishBundleContext context) {
        // Send a POST to influxDB with all the points of the batchPoints
        try {
            this.influxClient.write(this.batchPoints);
            this.influxClient.flush();
        } catch (Throwable t) {
            LOG.error("The writing to InfluxDB has failed. LOG: " + t.getMessage());
        }
    }
}

Edit: The job has a 1 minute fixed window.

1

1 Answers

0
votes

Dataflow streaming attempts to parallelize your work by using small bundle sizes which means that @FinishBundle may be invoked on very small bundles (even as small as a single element/KV pair).

You should look into GroupIntoBatches to help you increase the batch size which should decrease the amount of load you are placing on InfluxDB.