1
votes

I'm created a Google Cloud Platform Function that listen to a Pub/Sub Topic and insert the data in BigQuery. I managed to have some code almost working. Almost: the insert instruction is reporting no error, but the row inserted in BigQuery has all the columns null.

Here is the code of the Cloud Function, running on NodeJs 6, 128Mb of mem, triggered by Pub/Sub

I've tried all the combination of the following two variables, with 2 different error messages with the ignore setting is set to false (see at the bottom of the post)

'ignoreUnknownValues':true, 'raw':false

package.json

{
  "name": "sample-pubsub",
  "version": "0.0.1",
  "dependencies": {
    "@google-cloud/bigquery": "^1.3.0"
  }
}

Function Body

/**
 * Triggered from a message on a Cloud Pub/Sub topic.
 *
 * @param {!Object} event Event payload and metadata.
 * @param {!Function} callback Callback function to signal completion.
 */
exports.helloPubSub = (event, callback) => {
  const pubsubMessage = event.data;
  console.log(Buffer.from(pubsubMessage.data, 'base64').toString());

  const BigQuery = require('@google-cloud/bigquery');
  const bigquery = new BigQuery();


  bigquery
    .dataset("init_data")
    .table  ("tronc_queteur")
    .insert ([pubsubMessage], {'ignoreUnknownValues':true, 'raw':false})
    .then   ((data) => {
      console.log(`Inserted 1 rows`);
      console.log(data);
    })
    .catch(err => {
      if (err && err.name === 'PartialFailureError') {
        if (err.errors && err.errors.length > 0) {
          console.log('Insert errors:');
          err.errors.forEach(err => console.error(err));
        }
      } else {
        console.error('ERROR:', err);
      }
    });



  callback();
};

The data passed to the function is the following (as seen from the 1st console.log() of the function)

** DATA **

{  
   "id":9999,
   "queteur_id":552,
   "point_quete_id":49,
   "tronc_id":281,
   "depart_theorique":"2018-06-17 08:09:33",
   "depart":"2018-06-17 08:09:33",
   "retour":"2018-06-17 10:26:20",
   "comptage":"2018-11-08 21:23:02",
   "last_update":"2018-11-08 21:23:02",
   "last_update_user_id":1,
   "euro500":0,
   "euro200":0,
   "euro100":0,
   "euro50":0,
   "euro20":1,
   "euro10":3,
   "euro5":1,
   "euro2":0,
   "euro1":37,
   "cents50":12,
   "cents20":0,
   "cents10":0,
   "cents5":0,
   "cents2":0,
   "cent1":93,
   "don_cheque":0,
   "don_creditcard":0,
   "foreign_coins":null,
   "foreign_banknote":null,
   "notes_depart_theorique":null,
   "notes_retour":null,
   "notes_retour_comptage_pieces":null,
   "notes_update":null,
   "deleted":false,
   "coins_money_bag_id":"2018-PIECE-059",
   "bills_money_bag_id":"2018-BILLET-013",
   "don_cb_sans_contact_amount":0,
   "don_cb_sans_contact_number":0,
   "don_cb_total_number":0,
   "don_cheque_number":0
}

And here is the table schema, as I used to create the table and load the data in BigQuery :

** BigQuery table definition **

[
{"name": "id","type":"INTEGER"},
{"name": "queteur_id","type":"INTEGER"},
{"name": "point_quete_id","type":"INTEGER"},
{"name": "tronc_id","type":"INTEGER"},
{"name": "depart_theorique","type":"STRING"},
{"name": "depart","type":"STRING"},
{"name": "retour","type":"STRING"},
{"name": "comptage","type":"STRING"},
{"name": "last_update","type":"STRING"},
{"name": "last_update_user_id","type":"INTEGER"},
{"name": "euro500","type":"INTEGER"},
{"name": "euro200","type":"INTEGER"},
{"name": "euro100","type":"INTEGER"},
{"name": "euro50","type":"INTEGER"},
{"name": "euro20","type":"INTEGER"},
{"name": "euro10","type":"INTEGER"},
{"name": "euro5","type":"INTEGER"},
{"name": "euro2","type":"INTEGER"},
{"name": "euro1","type":"INTEGER"},
{"name": "cents50","type":"INTEGER"},
{"name": "cents20","type":"INTEGER"},
{"name": "cents10","type":"INTEGER"},
{"name": "cents5","type":"INTEGER"},
{"name": "cents2","type":"INTEGER"},
{"name": "cent1","type":"INTEGER"},
{"name": "foreign_coins","type":"INTEGER"},
{"name": "foreign_banknote","type":"INTEGER"},
{"name": "notes_depart_theorique","type":"STRING"},
{"name": "notes_retour","type":"STRING"},
{"name": "notes_retour_comptage_pieces","type":"STRING"},
{"name": "notes_update","type":"STRING"},
{"name": "deleted","type":"INTEGER"},
{"name": "don_creditcard","type":"FLOAT"},
{"name": "don_cheque","type":"FLOAT"},
{"name": "coins_money_bag_id","type":"STRING"},
{"name": "bills_money_bag_id","type":"STRING"},
{"name": "don_cb_sans_contact_amount","type":"FLOAT"},
{"name": "don_cb_sans_contact_number","type":"INTEGER"},
{"name": "don_cb_total_number","type":"INTEGER"},
{"name": "don_cheque_number","type":"INTEGER"}
]

with 'ignoreUnknownValues':false, 'raw':false

severity:  "ERROR"  
 textPayload:  "{ errors: [ { message: 'no such field.', reason: 'invalid' } ],
  row: 
   { '@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage',
     attributes: { location: 'Detroit' },
     data: 'eyJpZCI6OT...'

data is base64 encoding of the following:

{"id":9999,"queteur_id":552,"point_quete_id":49,"tronc_id":281,"depart_theorique":"2018-06-17 08:09:33","depart":"2018-06-17 08:09:33","retour":"2018-06-17 10:26:20","comptage":"2018-11-08 22:18:59","last_update":"2018-11-08 22:18:59","last_update_user_id":1,"euro500":0,"euro200":0,"euro100":0,"euro50":0,"euro20":1,"euro10":3,"euro5":1,"euro2":0,"euro1":37,"cents50":12,"cents20":0,"cents10":0,"cents5":0,"cents2":0,"cent1":93,"don_cheque":0,"don_creditcard":0,"foreign_coins":null,"foreign_banknote":null,"notes_depart_theorique":null,"notes_retour":null,"notes_retour_comptage_pieces":null,"notes_update":null,"deleted":false,"coins_money_bag_id":"2018-PIECE-059","bills_money_bag_id":"2018-BILLET-013","don_cb_sans_contact_amount":0,"don_cb_sans_contact_number":0,"don_cb_total_number":0,"don_cheque_number":0}

with 'ignoreUnknownValues':false, 'raw':true

Message:''

textPayload:  "{ errors: [ { message: '', reason: 'invalid' } ],
  row: 
   { '@type': 'type.googleapis.com/google.pubsub.v1.PubsubMessage',
     attributes: { location: 'Detroit' },
     data: 'eyJpZCI6O...'

data is the exact same payload (same base64 screen as above)

** on the bigquery side **

the following query return a row count continuously increasing as I'm testing with only null values in each columns

the select is :

select *
from `init_data.tronc_queteur` as tq
where tq.id is null

the result is as follow :

Row id  queteur_id  point_quete_id  tronc_id    depart_theorique    depart  retour  comptage    last_update last_update_user_id euro500 euro200 euro100 euro50  euro20  euro10  euro5   euro2   euro1   cents50 cents20 cents10 cents5  cents2  cent1   foreign_coins   foreign_banknote    notes_depart_theorique  notes_retour    notes_retour_comptage_pieces    notes_update    deleted don_creditcard  don_cheque  coins_money_bag_id  bills_money_bag_id  don_cb_sans_contact_amount  don_cb_sans_contact_number  don_cb_total_number don_cheque_number   
1   null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    
2   null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    
3   null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    null    
2

2 Answers

1
votes

The issue comes from how you are providing the data to the Cloud Big Query insert function.

In the line .insert ([pubsubMessage], {'ignoreUnknownValues':true, 'raw':false}) you are sending the encoded message, the Big Query library can't find the column values ihat it needs, because it's expecting a JSON object (for your case) so it inserts all null values. You'll have to decode the message as a String and parse it to JSON.

The working insert I got was looking like this:

  bigquery
    .dataset("init_data")
    .table  ("tronc_queteur")
    .insert (JSON.parse(Buffer.from(pubsubMessage.data, 'base64').toString()), 
       {'ignoreUnknownValues':true, 'raw':false})
    .then   ((data) => {
      console.log(`Inserted 1 rows`);
      console.log(data);
    })
    .catch(err => {
      if (err && err.name === 'PartialFailureError') {
        if (err.errors && err.errors.length > 0) {
          console.log('Insert errors:');
          err.errors.forEach(err => console.error(err));
        }
      } else {
        console.error('ERROR:', err);
      }
    });

This works, but I'm not fully satisfied with the code. I'll look more into how Pub/Sub messages are received by Cloud Functions. If I find anything relevant, I'll edit this answer.

1
votes

I believe you need to have the pub/sub dependency listed in your package.json file, like so:

{
 "name": "sample-pubsub",
 "version": "0.0.1",
 "dependencies": {
  "@google-cloud/pubsub": "^0.18.0",
  "@google-cloud/bigquery": "^4.1.1"
  }
}

I've missed this a few times too. It's so easy to forget!

Hope this solves your issue! If not, let me know!