1
votes

I am using Cloud Dataflow, PubSub & Bigquery to read JSON Pubsub messages, convert the JSON into tables using TableRowJsonCoder and then write them to Bigquery.

My issue is with consistency, the following code works sometimes. There are no errors being thrown. I am certain that I'm publishing messages correctly to the Pubsub topic. I am also certain that Dataflow is reading each message. I have tested this using the gcloud command line tools.

gcloud beta pubsub subscriptions pull --auto-ack SUBSCRIPTION-NAME

Where I have two subscriptions to the topic, one read by Dataflow and one read by me in the terminal. The code also successfully formats the JSON data into table format too and writes it to my designated dataset and table, when it feels like it :(

My assumption is that I don't really understand what is going on and I am missing something to do with windowing, where each window should be one message.

Assuming I send 50 messages, It seems like dataflow only reads in about half the elements. Which is my fist issue, is this to do with elements being considered as a certain amount of bytes or messages? How do I solve this? I am reading in the data with the TableRowJSONCoder.

Then again there seems to be a similar issue again where for X elements, only a small portion of them succeed in passing through the Groupbykey. My description of the issue would be more indepth if I could troubleshoot it further. Please note that the "id" field is always unquie, so I believe it is not to do with duplication, but I could be wrong.

Even as I write this message, the elements added has risen to 41 & the output to bigquery has risen to 12. Am I just not waiting long enough? Is my test data to small (always below 100 messages)? Even if it eventually saves all of my rows, taking over an hour to do so seems far too long.

dataflow console

The succesfully inserted data

/*
 * Copyright (C) 2015 Google Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package com.example;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;

import java.util.ArrayList;
import java.util.List;

import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A starter example for writing Google Cloud Dataflow programs.
 *
 * <p>The example takes two strings, converts them to their upper-case
 * representation and logs them.
 *
 * <p>To run this starter example locally using DirectPipelineRunner, just
 * execute it without any additional parameters from your favorite development
 * environment.
 *
 * <p>To run this starter example using managed resource in Google Cloud
 * Platform, you should specify the following command-line options:
 *   --project=<YOUR_PROJECT_ID>
 *   --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
 *   --runner=BlockingDataflowPipelineRunner
 */
public class StarterPipeline {

  private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

  static final int WINDOW_SIZE = 1;  // Default window duration in minutes

  private final static String PROJECT_ID = "dataflow-project";
  private final static String PUBSUB_TOPIC = "projects/dataflow-project/topics/pub-sub-topic";
  private final static String DATASET_ID = "test_dataset";
  private final static String TABLE_ID = "test_table_version_one";


  private static TableSchema getSchema() {
      List<TableFieldSchema> fields = new ArrayList<>();
      fields.add(new TableFieldSchema().setName("id").setType("STRING"));
      fields.add(new TableFieldSchema().setName("ip").setType("STRING"));
      fields.add(new TableFieldSchema().setName("installation_id").setType("STRING"));
      fields.add(new TableFieldSchema().setName("user_id").setType("STRING"));
      fields.add(new TableFieldSchema().setName("device_type").setType("STRING"));
      fields.add(new TableFieldSchema().setName("language").setType("STRING"));
      fields.add(new TableFieldSchema().setName("application_id").setType("STRING"));
      fields.add(new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"));
      TableSchema schema = new TableSchema().setFields(fields);
      return schema;
    }

  private static TableReference getTableReference() {
      TableReference tableRef = new TableReference();
      tableRef.setProjectId(PROJECT_ID);
      tableRef.setDatasetId(DATASET_ID);
      tableRef.setTableId(TABLE_ID);
      return tableRef;
    }

  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
    dataflowOptions.setStreaming(true);
    Pipeline pipeline = Pipeline.create(dataflowOptions);
    LOG.info("Reading from PubSub.");
    PCollection<TableRow> input = pipeline
        .apply(PubsubIO.Read.topic(PUBSUB_TOPIC).withCoder(TableRowJsonCoder.of()))
            .apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))));
    input
         .apply(BigQueryIO.Write.to(getTableReference()).withSchema(getSchema()));

    pipeline.run();
  }
}

Also of interest to me would be specifying Timestamps and Record IDs as the "timestamp" and "id" fields.

1
This should indeed be much faster. I've seen this happen before if the network settings in your project are misconfigured. Can you provide the job_id of your job so I can investigate further? For the timestamp/id question, see cloud.google.com/dataflow/model/…danielm
@danielm 2017-01-23_09_48_10-1670593411236141809, please note that the project-id above is not the correct one.Owen Monagan
After leaving the pipeline run overnight, 63 elements have been added from the Pubsub read and 17 rows have been generated. The bottleneck again is the GroupByKey, and the long time taken to read from Pubsub.Owen Monagan
I have run two more jobs: 2017-01-24_04_10_55-11493275089556109537: Where 504 messages have been published to Cloud PubSub. 2017-01-24_03_53_31-13159891042380692229: Where 100 messages have been published to Cloud PubSub, as of now the job is currently draining. But also taking a very long time. The system lag for every job I have run so far is quite close to the run time.Owen Monagan
As a control, I have run the windowed wordcount example (2017-01-24_06_20_37-11246067058927218114). It shows similar behavior to the previous runs. Pubsub not reading in the whole set. When a GroupByKey appears a bottleneck is formed. It has been running for 10 minutes now without any elements exiting the GroupByKey. The file injector code seems to have 5,525 elements. In the windowed-wordcount example only 1,325 elements are read by PubSub. In batch mode everything seems to be working as normal (2017-01-19_10_20_49-18373131302606291153).Owen Monagan

1 Answers

0
votes

The problem is misconfigured networking for your GCE VMs. Dataflow requires that the VMs be able to communicate via TCP, and your firewall rules do not allow for that. Adding a firewall rule to allow TCP connections in general between your VMs will fix this.

The reason some data is slowly getting through your pipeline is because sometimes you get lucky and the data only needs to be processed on one machine. Pubsub will eventually time out and retry messages, so they will all eventually go through.