I have an Apache Beam (v2.15.0) job executed in Google Cloud Dataflow. This job is a streaming job that reads messages from Google Cloud PubSub, groups the data and tries to write them into InfluxDB.
The main problem is that when the data have to be written in InfluxDB too many write requests are sent to InfluxDB at the same moment and I receive a lot of the following error:
{
insertId: "750692785846999589:29668:0:536239"
jsonPayload: {
job: "2019-09-09_07_41_32-8614892132258718725"
logger: "functions.DashboardFunctions$SendPostLiteFn2"
message: "The writing to InfluxDB has failed. LOG: engine: error syncing wal"
stage: "P2"
step: "L-I-SendPost"
thread: "339"
work: "5a8|130|1|091443-992359958146824871"
worker: "dataflow-dashboard-test-09090741-ixek-harness-r7cz"
}
labels: {
compute.googleapis.com/resource_id: "750692785846999589"
compute.googleapis.com/resource_name: "dataflow-dashboard-test-09090741-ixek-harness-r7cz"
compute.googleapis.com/resource_type: "instance"
dataflow.googleapis.com/job_id: "2019-09-09_07_41_32-8614892132258718725"
dataflow.googleapis.com/job_name: "dataflow-dashboard-test"
dataflow.googleapis.com/region: "europe-west1"
}
logName: "projects/XXXX/logs/dataflow.googleapis.com%2Fworker"
receiveTimestamp: "2019-09-09T14:46:50.419635294Z"
resource: {
labels: {
job_id: "2019-09-09_07_41_32-8614892132258718725"
job_name: "dataflow-dashboard-test"
project_id: "XXXX"
region: "europe-west1"
step_id: "L-I-SendPost"
}
type: "dataflow_step"
}
severity: "ERROR"
timestamp: "2019-09-09T14:46:35.640Z"
}
I don't understand this behaviour since I think that I'm batching the writes...
Below you can find the code I've written to send the data in batches to InfluxDB:
/**
* A DoFn that sends POSTS to InfluxDB.
*/
public static class WrtieToInfluxDB extends DoFn<List<String>, Void> {
private static final Logger LOG = LoggerFactory.getLogger(WrtieToInfluxDB.class);
private InfluxDB influxClient = null;
private BatchPoints batchPoints = null;
@Setup
public void initInfluxClient() {
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder()
.connectTimeout(40, TimeUnit.SECONDS)
.readTimeout(60, TimeUnit.SECONDS)
.writeTimeout(60, TimeUnit.SECONDS);
this.influxClient =
InfluxDBFactory
.connect(
Settings.getInfluxUrl(),
Settings.getInfluxUsername(),
Settings.getInfluxPassword(),
okHttpClientBuilder)
.enableGzip();
}
@StartBundle
public void startBundle(StartBundleContext context) {
Util.MyOptions myOptions = context.getPipelineOptions().as(Util.MyOptions.class);
this.batchPoints =
BatchPoints
.database(Settings.getDbNameProvider(Settings.StatsType.DASHBOARD, myOptions.getTest()))
.consistency(InfluxDB.ConsistencyLevel.ONE)
.build();
}
@ProcessElement
public void processElement(final ProcessContext c) {
// Code to builds the point
Point.Builder point = Point
...
// Inserts the point into the batch object
this.batchPoints.point(point.build());
}
@FinishBundle
public void finishBundle(FinishBundleContext context) {
// Send a POST to influxDB with all the points of the batchPoints
try {
this.influxClient.write(this.batchPoints);
this.influxClient.flush();
} catch (Throwable t) {
LOG.error("The writing to InfluxDB has failed. LOG: " + t.getMessage());
}
}
}
Edit: The job has a 1 minute fixed window.