0
votes

I've exported a Cloud Dataflow template from Dataprep as outlined here:

https://cloud.google.com/dataprep/docs/html/Export-Basics_57344556

In Dataprep, the flow pulls in text files via wildcard from Google Cloud Storage, transforms the data, and appends it to an existing BigQuery table. All works as intended.

However, when trying to start a Dataflow job from the exported template, I can't seem to get the startup parameters right. The error messages aren't overly specific but it's clear that for one thing, I'm not getting the locations (input and output) right.

The only Google-provided template for this use case (found at https://cloud.google.com/dataflow/docs/guides/templates/provided-templates#cloud-storage-text-to-bigquery) doesn't apply as it uses a UDF and also runs in Batch mode, overwriting any existing BigQuery table rather than append.

Inspecting the original Dataflow job details from Dataprep shows a number of parameters (found in the metadata file) but I haven't been able to get those to work within my code. Here's an example of one such failed configuration:

import time
from google.cloud import storage
from googleapiclient.discovery import build
from oauth2client.client import GoogleCredentials

def dummy(event, context):
    pass

def process_data(event, context):
    credentials = GoogleCredentials.get_application_default()
    service = build('dataflow', 'v1b3', credentials=credentials)

    data = event
    gsclient = storage.Client()
    file_name = data['name']

    time_stamp = time.time()

    GCSPATH="gs://[path to template]
    BODY = {
        "jobName": "GCS2BigQuery_{tstamp}".format(tstamp=time_stamp),
        "parameters": {
            "inputLocations" : '{{\"location1\":\"[my bucket]/{filename}\"}}'.format(filename=file_name),
            "outputLocations": '{{\"location1\":\"[project]:[dataset].[table]\", [... other locations]"}}',
            "customGcsTempLocation": "gs://[my bucket]/dataflow"
         },
         "environment": {
            "zone": "us-east1-b"
         }
    }

    print(BODY["parameters"])

    request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
    response = request.execute()

    print(response)

The above example indicates invalid field ("location1", which I pulled from a completed Dataflow job. I know I need to specify the GCS location, the template location, and the BigQuery table but haven't found the correct syntax anywhere. As mentioned above, I found the field names and sample values in the job's generated metadata file.

I realize that this specific use case may not ring any bells but in general if anyone has had success determining and using the correct startup parameters for a Dataflow job exported from Dataprep, I'd be most grateful to learn more about that. Thx.

1
The problem is most likely how you setup your template (you need to use ValueProvider, ...) for your arguments. Show the code that manages arguments before your pipeline is created. Also review: cloud.google.com/dataflow/docs/guides/templates/… - John Hanley
@JohnHanley thanks for the response. In this case, I'm not using the Beam SDK, just generic Python in a Cloud Function (via Google API Client module). It seems like the errors might be as dumb as just not escaping / quoting values in the BODY json, as I found some examples elsewhere that have gotten me 80% of the way. The Dataflow job is now kicking off (and seemingly running) correctly up to the output step where it fails... Error in Stackdriver is 'Cannot get value for location1' which makes me think that the output parameter is wrong.... I updated my code above to show most recent version. - scb
Digging further into the errors in Stackdriver, I see this as being the proximate cause of the errors: 'Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting double-quote to start field name' So it's clear that the approach I found elsewhere is not quite working. The question remains as to how to put these strings together correctly so that they are parsed by the job launcher.... - scb
Why do you have this: "inputLocations" : '{{\"location1\":\"[my bucket]/{filename}\"}}'.format(filename=file_name),? Dataflow args are just like command line args (--input=example.csv). Nothing complicated. Show your code that processes the command line so that we are not guessing. - John Hanley
There is no command line -- this is the syntax (albeit broken at the moment) used by the REST API template launch method: cloud.google.com/dataflow/docs/reference/rest/v1b3/… The code which invokes this is in the request and response bit at the end. - scb

1 Answers

0
votes

I think you need to review this document it explains exactly the syntax required for passing the various pipeline options available including the location parameters needed... 1

Specifically with your code snippet the following does not follow the correct syntax

""inputLocations" : '{{\"location1\":\"[my bucket]/{filename}\"}}'.format(filename=file_name)" In addition to document1, you should also review the available pipeline options and their correct syntax 2

Please use the links...They are the official documentation links from Google.These links will never go stale or be removed they are actively monitored and maintained by a dedicated team