0
votes

I wonder if I've hit a bug. A wrote a Node.js piece of code to trigger a "GCS Text to PubSub" Dataflow. The function is triggered upon file upload into a GCS bucket. But it never executes successfully: "textPayload: "problem running dataflow template, error was: { Error: Invalid JSON payload received. Unknown name "staging_location": Cannot find field." It is an issue with the syntax of I specify the staging location for the job. I have tried "staginglocation", "stagingLocation", etc...none of them have worked.

Here's my code. Thanks for your help.

var {google} = require('googleapis');

exports.moveDataFromGCStoPubSub = (event, callback) => {


const file = event.data;
const context = event.context;

console.log(`Event ${context.eventId}`);
console.log(`  Event Type: ${context.eventType}`);
console.log(`  Bucket: ${file.bucket}`);
console.log(`  File: ${file.name}`);
console.log(`  Metageneration: ${file.metageneration}`);
console.log(`  Created: ${file.timeCreated}`);
console.log(`  Updated: ${file.updated}`);

  google.auth.getApplicationDefault(function (err, authClient, projectId) {
     if (err) {
       throw err;
     }

 console.log(projectId);

 const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });
 console.log(`gs://${file.bucket}/${file.name}`);
 dataflow.projects.templates.create({
   projectId: projectId,
   resource: {
    parameters: {
        inputFile: `gs://${file.bucket}/${file.name}`,
        outputTopic: `projects/iot-fitness-198120/topics/MemberFitnessData`,
      },
      jobName: 'CStoPubSub',
      gcsPath: 'gs://dataflow-templates/latest/GCS_Text_to_Cloud_PubSub',
      stagingLocation: 'gs://fitnessanalytics-tmp/tmp'    
    }
 }, function(err, response) {
   if (err) {
     console.error("problem running dataflow template, error was: ", err);
   }
   console.log("Dataflow template response: ", response);
   callback();
 });

   });

 callback();
};
2
Additionnal information: it seems everytime I pass this parameter, no matter whether I wrote staginglocation, stagingLocation or else, the system interpretes it as staging_location and is not happy with this field name.hassan hamade

2 Answers

0
votes

I don't think this is actually possible.

Looking at the documentation for the Dataflow API itself, there's nothing like a staging location in the parameter section, and the library you're using is basically a wrapper for this API.

I'm a bit surprised it changes the name of the parameter though.

0
votes

So i finally got this to work. It was indeed a syntax issue in the parameters section. The code below works like a charm:

var {google} = require('googleapis');

exports.moveDataFromGCStoPubSub = (event, callback) => {


const file = event.data;
const context = event.context;

console.log(`Event ${context.eventId}`);
console.log(`  Event Type: ${context.eventType}`);
console.log(`  Bucket: ${file.bucket}`);
console.log(`  File: ${file.name}`);
console.log(`  Metageneration: ${file.metageneration}`);
console.log(`  Created: ${file.timeCreated}`);
console.log(`  Updated: ${file.updated}`);

  google.auth.getApplicationDefault(function (err, authClient, projectId) {
     if (err) {
       throw err;
     }

 console.log(projectId);

 const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });
 console.log(`gs://${file.bucket}/${file.name}`);
 dataflow.projects.templates.create({
  gcsPath: 'gs://dataflow-templates/latest/GCS_Text_to_Cloud_PubSub', 
  projectId: projectId,
   resource: {
    parameters: {
        inputFilePattern: `gs://${file.bucket}/${file.name}`,
        outputTopic: 'projects/iot-fitness-198120/topics/MemberFitnessData2'
      },
    environment: {
      tempLocation: 'gs://fitnessanalytics-tmp/tmp'
    },
      jobName: 'CStoPubSub',
      //gcsPath: 'gs://dataflow-templates/latest/GCS_Text_to_Cloud_PubSub',    
    }
 }, function(err, response) {
   if (err) {
     console.error("problem running dataflow template, error was: ", err);
   }
   console.log("Dataflow template response: ", response);
   callback();
 });

   });

 callback();
};