4
votes

I'm trying to build a Dataflow pipeline, that triggers on a JSON file upload to Google Cloud Storage and writes it to Cloud Datastore.

According to the Dataflow template json file must have each line in Datastore data object format, defined here.

This is how my json file looks like which I'm trying to adapt to Datastore data object:

{
  "userId": "u-skjbdw34jh3gx",
  "rowRanks:": [
    {
      "originalTrigger": "recent",
      "programmedRowPos": "VR1",
      "reoderedRowPos": 0
    },
    {
      "originalTrigger": "discovery",
      "programmedRowPos": "VR1",
      "reoderedRowPos": 1
    }
  ]
}

Following is how far I've reached trying to adapt it to the above linked data object.

{
  "key": {
    "partitionId": {
      "projectId": "gcp-project-id",
      "namespaceId": "spring-demo"
    },
    "path": 
      {
        "kind": "demo",
        "name": "userId"
      }
  },
  "properties": {
    "userId": {
      "stringValue": "01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"
    }
  }
}

Following is the error I'm getting in Dataflow when it's trying to write to Datastore:

com.google.protobuf.InvalidProtocolBufferException: java.io.EOFException: End of input at line 1 column 2 path $.
    at com.google.protobuf.util.JsonFormat$ParserImpl.merge(JsonFormat.java:1195)
    at com.google.protobuf.util.JsonFormat$Parser.merge(JsonFormat.java:370)
    at com.google.cloud.teleport.templates.common.DatastoreConverters$EntityJsonParser.merge(DatastoreConverters.java:497)
    at com.google.cloud.teleport.templates.common.DatastoreConverters$JsonToEntity.processElement(DatastoreConverters.java:351)
2
What the size of your file? And what is the reason for which you have choose Beam/dataflow for the transformation?guillaume blaquiere
It's currently around 15GB, but it'll grow very quickly since it's ML model output for entire of the user base.riser101
can you provide an example of the CSV file ? I want to understand the transformation desired; how does the datastore entity looks in the csv, and how it should be inserted in Datastore.Tlaquetzal
Edited the question to answer your query, the first code snippet shared, is the object in the json file. Each line in the file is one such object.riser101

2 Answers

4
votes

The json file should have the google cloud datastore object in a single line. Hence, the error quoted in question: End of input at line 1 column 2 path $.

It should be as follows:

{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}

Obviously, the json file will consists of thousands of objects, but each of them must be in a single line:

{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
{"key":{"partitionId":{"projectId":"gcp-project-id","namespaceId":"spring-demo"},"path":[{"kind":"demo","name":"userId"}]},"properties":{"userId":{"stringValue":"01348c2f-9a20-4ad2-b95d-b3e29f6fc2d1"},"rowRanks":{"arrayValue":{"values":[{"entityValue":{"properties":{"originalTrigger":{"stringValue":"recent"},"programmedRowPos":{"stringValue":"VR1"},"reoderedRowPos":{"integerValue":1}}}}]}}}}
1
votes

If I understood correctly your input data format and desired output this js code should do the trick:

var data = {
  "userId": "u-skjbdw34jh3gx",
  "rowRanks": [
    {
      "originalTrigger": "recent",
      "programmedRowPos": "VR1",
      "reorderedRowPos": 0
    },
    {
      "originalTrigger": "discovery",
      "programmedRowPos": "VR1",
      "reorderedRowPos": 1
    }
  ]
}

var entity = {};
entity.key = {};
entity.key.partitionId = {};
entity.key.partitionId.projectId = "gcp-project-id";
entity.key.partitionId.namespaceId = "spring-demo";

var path = {}
path.kind = "demo";
path.name = "userId";
entity.key.path = [];
entity.key.path.push(path);

entity.properties = {};
entity.properties.userId = {};
entity.properties.userId.stringValue = data.userId;
entity.properties.rowRanks = {};
entity.properties.rowRanks.arrayValue = {};

var arrayValues = [];
data.rowRanks.forEach(buildArrayValue);

function buildArrayValue(row) {
  var temp = {};
  temp.entityValue = {};
  temp.entityValue.properties = {};
  temp.entityValue.properties.originalTrigger = {};
  temp.entityValue.properties.originalTrigger.stringValue = row.originalTrigger;
  temp.entityValue.properties.programmedRowPos = {};
  temp.entityValue.properties.programmedRowPos.stringValue = row.programmedRowPos;
  temp.entityValue.properties.reorderedRowPos = {};
  temp.entityValue.properties.reorderedRowPos.integerValue = row.reorderedRowPos;
  arrayValues.push(temp);
}

entity.properties.rowRanks.arrayValue.values = arrayValues;

document.write(JSON.stringify(entity));

Basically building the rowRanks array thanks to the forEach() loop. Note that path needs to be an array though (reference).

Now we modify it slightly to run in the template code instead of a browser, upload the files to GCS and follow the instructions here to execute it:

gcloud dataflow jobs run test-datastore \
--gcs-location=gs://dataflow-templates/latest/GCS_Text_to_Datastore \
--parameters=javascriptTextTransformGcsPath=gs://$BUCKET/*.js,errorWritePath=gs://$BUCKET/errors.txt,javascriptTextTransformFunctionName=transform,textReadPattern=gs://$BUCKET/*.json,datastoreWriteProjectId=$PROJECT

the full content of the js file uploaded to GCS is:

function transform(elem) {
    var data = JSON.parse(elem);

    var entity = {};
    entity.key = {};
    entity.key.partitionId = {};
    entity.key.partitionId.projectId = "gcp-project-id";
    entity.key.partitionId.namespaceId = "spring-demo";

    var path = {}
    path.kind = "demo";
    path.name = "userId";
    entity.key.path = [];
    entity.key.path.push(path);

    entity.properties = {};
    entity.properties.userId = {};
    entity.properties.userId.stringValue = data.userId;
    entity.properties.rowRanks = {};
    entity.properties.rowRanks.arrayValue = {};

    var arrayValues = [];
    data.rowRanks.forEach(buildArrayValue);

    function buildArrayValue(row) {
      var temp = {};
      temp.entityValue = {};
      temp.entityValue.properties = {};
      temp.entityValue.properties.originalTrigger = {};
      temp.entityValue.properties.originalTrigger.stringValue = row.originalTrigger;
      temp.entityValue.properties.programmedRowPos = {};
      temp.entityValue.properties.programmedRowPos.stringValue = row.programmedRowPos;
      temp.entityValue.properties.reorderedRowPos = {};
      temp.entityValue.properties.reorderedRowPos.integerValue = row.reorderedRowPos;
      arrayValues.push(temp);
    }

    entity.properties.rowRanks.arrayValue.values = arrayValues;

    return JSON.stringify(entity);
}

The job runs successfully for me:

enter image description here

and the data is written to Datastore:

enter image description here

Let me know if that helps you.