2
votes

We have a very simple pipeline that is reading from GCS, performing a simple ParDo, and then writing the results to BigQuery. It's autoscaling up to 50 VMs, running on GCP, and not doing anything fancy.

Reading all the data from GCS (~10B records & ~700+GB), and transforming it, all happens relatively quickly (in the first 7-10 minutes).

But, when it gets to the BigQuery write (using BigQueryIO), it slows right down - even though it only has to write about 1M records (~60MB). This step alone is taking ~20m.

In addition to the slow write to BigQuery, the graph shows that step as being "stopped" even though it was successful (albeit extremely slow). The step also looks overly complicated for just a simple write to BigQuery (see picture below).

The bottleneck appears to be when it reaches the step Executing operation BigQueryIO.Write/BatchLoads/WriteRename (see logs below).

Is there something I'm doing wrong in my code?

Code:

public class Pipeline {
    private static final String BIG_QUERY_TABLE = "<redacted>:<redacted>.melbourne_titles";
    private static final String BUCKET = "gs://<redacted>/*.gz";

    public static void main(String[] args) {
        DataflowPipelineOptions options = PipelineOptionsFactory
                .fromArgs(args)
                .withValidation()
                .as(DataflowPipelineOptions.class);
        options.setAutoscalingAlgorithm(THROUGHPUT_BASED);
        Pipeline pipeline = Pipeline.create(options);
        pipeline.apply(TextIO.read().from(BUCKET).withCompressionType(GZIP))
                .apply(ParDo.of(new DoFn<String, TableRow>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) throws Exception {
                        String input = c.element();
                        String title = input.split(",")[5];
                        if (title.toLowerCase().contains("melbourne")) {
                            TableRow tableRow = new TableRow();
                            tableRow.set("title", title);
                            c.output(tableRow);
                        }
                    }
                }))
                .apply(BigQueryIO.writeTableRows()
                        .to(BIG_QUERY_TABLE)
                        .withCreateDisposition(CREATE_IF_NEEDED)
                        .withWriteDisposition(WRITE_TRUNCATE)
                        .withSchema(getSchema()));
        pipeline.run();
    }

    private static TableSchema getSchema() {
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("title").setType("STRING"));
        TableSchema schema = new TableSchema().setFields(fields);
        return schema;
    }
}

Log snippet:

  2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/BatchViewOverrides.GroupByWindowHas...
  2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/Distinct Keys/Combine.perKey(Anonym...
  2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/GroupByKey/Create
  2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowH...
  2017-08-25 (21:30:23) Starting 50 workers in australia-southeast1-a...
  2017-08-25 (21:30:23) Executing operation TextIO.Read/Read+ParDo(Anonymous)+BigQueryIO.Write/PrepareWrite/ParDo(Anonymous)...
  2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/TriggerIdCreation/Read(CreateSource)+BigQueryIO.Writ...
  2017-08-25 (21:30:23) Executing operation BigQueryIO.Write/BatchLoads/Create/Read(CreateSource)+BigQueryIO.Write/BatchLoad...
  2017-08-25 (21:31:21) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/BatchViewOverrides.GroupByWindowHas...
  2017-08-25 (21:31:21) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/BatchViewOverrides.GroupByWindowHas...
  2017-08-25 (21:31:22) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowH...
  2017-08-25 (21:31:23) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/BatchViewOverrides.GroupByWindowH...
  2017-08-25 (21:38:10) Executing operation BigQueryIO.Write/BatchLoads/TempFilePrefixView/CreateDataflowView
  2017-08-25 (21:38:13) Executing operation BigQueryIO.Write/BatchLoads/View.AsSingleton/CreateDataflowView
  2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/Distinct Keys/Combine.perKey(Anonym...
  2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/GroupByKey/Close
  2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForData/BatchViewOverri...
  2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/GroupByKey/Read+BigQueryIO.Write/BatchLoads/GroupByK...
  2017-08-25 (21:38:45) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/Distinct Keys/Combine.perKey(Anonym...
  2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForData/BatchViewOverri...
  2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForSize/Create
  2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForKeys/Create
  2017-08-25 (21:38:49) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForData/BatchViewOverri...
  2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForSize/Close
  2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForKeys/Close
  2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForSize/Read+BigQueryIO...
  2017-08-25 (21:38:56) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/GBKaSVForKeys/Read+BigQueryIO...
  2017-08-25 (21:39:00) Executing operation s35-u80
  2017-08-25 (21:39:01) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/Flatten.PCollections
  2017-08-25 (21:39:03) Executing operation BigQueryIO.Write/BatchLoads/CalculateSchemas/asMap/CreateDataflowView
  2017-08-25 (21:39:06) Executing operation BigQueryIO.Write/BatchLoads/ResultsView/CreateDataflowView
  2017-08-25 (21:39:12) Executing operation BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Create
  2017-08-25 (21:39:12) Executing operation BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/GroupByKey/Create
  2017-08-25 (21:39:12) Executing operation BigQueryIO.Write/BatchLoads/Create.Values/Read(CreateSource)+BigQueryIO.Write/Ba...
  2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Close
  2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/GroupByKey/Close
  2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/SinglePartitionsReshuffle/GroupByKey/Read+BigQueryIO...
  2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/BatchViewOverrides.GroupByWindowHashA...
  2017-08-25 (21:39:33) Executing operation BigQueryIO.Write/BatchLoads/MultiPartitionsReshuffle/GroupByKey/Read+BigQueryIO....
  2017-08-25 (21:39:35) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/BatchViewOverrides.GroupByWindowHashA...
  2017-08-25 (21:39:35) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/BatchViewOverrides.GroupByWindowHashA...
  2017-08-25 (21:39:46) Executing operation BigQueryIO.Write/BatchLoads/TempTablesView/CreateDataflowView
  2017-08-25 (21:39:46) Executing operation BigQueryIO.Write/BatchLoads/WriteRename
  2017-08-25 (21:57:43) Stopping worker pool...

Overly complex looking step:

enter image description here

Job Details:

  • Dataflow Java SDK: 2.0.0
  • Job ID: 2017-08-25_04_29_54-7210937293145071720

UPDATE

I think the problem is the sheer number of files that Dataflow is producing, and subsequently, that BigQuery has to load. It may only be 1M rows, but Dataflow is producing 850+ files to load:

  "configuration" : {
    "load" : {
      "createDisposition" : "CREATE_IF_NEEDED",
      "destinationTable" : {
        "datasetId" : "dataflow_on_a_tram",
        "projectId" : "grey-sort-challenge",
        "tableId" : "melbourne_titles"
      },
      "schema" : {
        "fields" : [ {
          "name" : "year",
          "type" : "STRING"
        }, {
          "name" : "month",
          "type" : "STRING"
        }, {
          "name" : "day",
          "type" : "STRING"
        }, {
          "name" : "wikimedia_project",
          "type" : "STRING"
        }, {
          "name" : "language",
          "type" : "STRING"
        }, {
          "name" : "title",
          "type" : "STRING"
        }, {
          "name" : "views",
          "type" : "INTEGER"
        } ]
      },
      "sourceFormat" : "NEWLINE_DELIMITED_JSON",
      "sourceUris" : [
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/51221a43-8fd8-417d-90ca-2f3c3e5789d2",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/5e1c3cb8-20d1-45ef-b0bb-209645c36093",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/0ed8d240-2bc2-4c8b-808d-792540448c73",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/d7a1fefe-6dd8-4f30-bf97-040c3692e448",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/b7c4d9a8-d45d-4cc6-b375-291e6435ed53",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/17a7bbf4-5695-4188-b03a-3ef5cda8607c",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/783af461-c114-4a41-aa5f-ed1c7db86bab",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/dad046fc-eabf-4212-83f1-7d7fa71075c1",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/7b9ffec1-7424-4248-83b4-98a4ef4233b9",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/bb297232-8e84-4a14-9dc6-3efde1b2b586",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/0693972a-1319-4637-af9f-8a4a3d5cb0f7",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/41b1e722-f76c-404d-a71b-bd36c09e8a06",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/19cfd89e-c9ee-4221-aee1-b3503dbcd93b",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/574467f2-5771-479a-b213-2941225a24bd",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/4d872304-0f42-47f2-89cf-b3a3f856ca67",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/1c086246-8eec-4bbe-be98-b01abb181d33",
 "gs://<redacted>/tmp/BigQueryWriteTemp/615faf65cef743718624cbeb8fd96f14/9439f5f4-5020-471d-b631-e1a3fea1584f",

[..]851 files!

1
Looking at the worker logs, it seems that it's just waiting for a BQ load job to finish. It may have been that the BQ job was slower than usual. Is this problem usual?Pablo
I ran it again. Same problem. 2017-08-25_14_26_18-5377718284053913263Graham Polley
@Pablo - see my update. I think it's so slow because Dataflow is producing so many files for BQ to load.Graham Polley
Have you tried setting maxfiles per bundle in the BigQuerySink?Pablo
I haven't. I'll check it out..Graham Polley

1 Answers

1
votes

One thing to keep in mind, BigQuery does not guarantee latency of load jobs. If many other load jobs are issued at the same time, your job might wait in a queue waiting to be scheduled. If you can run this job again, we should be able to help you inspect the BigQuery load job itself to see what is happening.