1
votes

I have a small pipeline im trying to execute:

  1. file placed into GCS Bucket > 2. Cloud Function triggers Dataflow job when file is placed in GCS bucket (not working) > 3. Writes to Big Query table (this part working)

I've created a Dataflow job through Dataprep as it has nice UI to do all my transformations before writing to a BigQuery table (writing to BigQuery works fine), and the Cloud function triggers when a file is uploaded to the GCS bucket. However the Cloud Function doesn't trigger the Dataflow job (which I wrote in Dataprep).

Please, have a look at my sample code below of my Cloud Function, if I can get any pointers as to why the Dataflow job is not triggering.

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * @param {!Object} event The Cloud Functions event.
 * @param {!Function} The callback function.
 */
exports.processFile = (event, callback) => {
  console.log('Processing file: ' + event.data.name);
  callback();

  const google = require('googleapis');

 exports.CF_GCStoDataFlow_v2 = function(event, callback) {
  const file = event.data;
  if (file.resourceState === 'exists' && file.name) {
    google.auth.getApplicationDefault(function (err, authClient, projectId) {
      if (err) {
        throw err;
      }

      if (authClient.createScopedRequired && authClient.createScopedRequired()) {
        authClient = authClient.createScoped([
          'https://www.googleapis.com/auth/cloud-platform',
          'https://www.googleapis.com/auth/userinfo.email'
        ]);
      }

      const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });

      dataflow.projects.templates.create({
        projectId: projectId,
        resource: {
          parameters: {
            inputFile: `gs://${file.bucket}/${file.name}`,
            outputFile: `gs://${file.bucket}/${file.name}`
          },
          jobName: 'cloud-dataprep-csvtobq-v2-281345',
          gcsPath: 'gs://mygcstest-pipeline-staging/temp/'
        }
      }, function(err, response) {
        if (err) {
          console.error("problem running dataflow template, error was: ", err);
        }
        console.log("Dataflow template response: ", response);
        callback();
      });

    });
  }
 };
};

DataProc job

3
You have attached Dtatproc job submission UI screenshot. Is this a mistake or do you use Dataproc in your workflow somehow?Igor Dvorzhak
this was for a previous commentor, who suggested activating dataproc jobs, (see below)BenAhm
for this line, console.log('Processing file: ' + event.data.name); i got error "Cannot read property 'name' of undefined"softdev

3 Answers

2
votes

This snippet may help, it uses a different method of the dataflow api (launch), it worked for me, be aware you need to specify template's url and also check the metadata file (you can find it in the same directory as the template when executed through the dataprep interface) file you are including the right parameters

dataflow.projects.templates.launch({
   projectId: projectId,
   location: location,
   gcsPath: jobTemplateUrl,
   resource: {
     parameters: {
       inputLocations : `{"location1" :"gs://${file.bucket}/${file.name}"}`,
       outputLocations: `{"location1" : "gs://${destination.bucket}/${destination.name}"}"}`,
     },
      environment: {
        tempLocation: `gs://${destination.bucket}/${destination.tempFolder}`,
        zone: "us-central1-f"
     },
     jobName: 'my-job-name',

   }
 }
1
votes

Have you submitted you Dataproc job? Has it started running? The below documentation can give some idea to get started!

https://cloud.google.com/dataproc/docs/concepts/jobs/life-of-a-job

1
votes

Looks like you are putting CF_GCStoDataFlow_v2 inside processFile, so the Dataflow part of the code is not executing.

Your function should look like this:

/**
 * Triggered from a message on a Cloud Storage bucket.
 *
 * @param {!Object} event The Cloud Functions event.
 * @param {!Function} The callback function.
 */
exports.CF_GCStoDataFlow_v2 = (event, callback) => {

  const google = require('googleapis');

  if (file.resourceState === 'exists' && file.name) {
    google.auth.getApplicationDefault(function (err, authClient, projectId) {
      if (err) {
        throw err;
      }

      if (authClient.createScopedRequired && authClient.createScopedRequired()) {
        authClient = authClient.createScoped([
          'https://www.googleapis.com/auth/cloud-platform',
          'https://www.googleapis.com/auth/userinfo.email'
        ]);
      }

      const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });

      dataflow.projects.templates.create({
        projectId: projectId,
        resource: {
          parameters: {
            inputFile: `gs://${file.bucket}/${file.name}`,
            outputFile: `gs://${file.bucket}/${file.name}`
          },
          jobName: '<JOB_NAME>',
          gcsPath: '<BUCKET_NAME>'
        }
      }, function(err, response) {
        if (err) {
          console.error("problem running dataflow template, error was: ", err);
        }
        console.log("Dataflow template response: ", response);
        callback();
      });

    });
  }

  callback();
};

Make sure you change the value under “Function to execute” to CF_GCStoDataFlow_v2