3
votes

Using Apache Beam(Python 2.7 SDK) I am trying to write JSON files as entities into Google Cloud Datastore.

Sample JSON:

{
"CustId": "005056B81111",
"Name": "John Smith", 
"Phone": "827188111",
"Email": "[email protected]", 
"addresses": [
    {"type": "Billing", "streetAddress": "Street 7", "city": "Malmo", "postalCode": "CR0 4UZ"},
    {"type": "Shipping", "streetAddress": "Street 6", "city": "Stockholm", "postalCode": "YYT IKO"}
]
}

I have written a Apache Beam pipeline with mainly 3 steps,

  1. beam.io.ReadFromText(input_file_path)

  2. beam.ParDo(CreateEntities())

  3. WriteToDatastore(PROJECT)

In step 2, I am converting JSON object(dict) into an entity,

class CreateEntities(beam.DoFn):
  def process(self, element):
    element = element.encode('ascii','ignore')
    element = json.loads(element)
    Id = element.pop('CustId')
    entity = entity_pb2.Entity()
    datastore_helper.add_key_path(entity.key, 'CustomerDF', Id)
    datastore_helper.add_properties(entity, element)
    return [entity]

This works fine for basic properties. However since address is a dict object itself it fails. I have read a similar post.

However did not get the exact code to convert dict -> entity

Tried below to set address element as entity but does not work,

element['addresses'] = entity_pb2.Entity()

Other References:

2

2 Answers

2
votes

Are you trying to store this as a repeated structured property?

ndb.StructuredPropertys appear in dataflow with the keys flattened, and for repeated structured properties, each individual property within the structured property object becomes an array. So I think you would need to write it like this:

datastore_helper.add_properties(entity, {
    ...
    "addresses.type": ["Billing", "Shipping"],
    "addresses.streetAddress": ["Street 7", "Street 6"],
    "addresses.city": ["Malmo", "Stockholm"],
    "addresses.postalCode": ["CR0 4UZ", "YYT IKO"],
})

Alternatively, if youre trying to save this as a ndb.JsonProperty, you can do this:

datastore_helper.add_properties(entity, {
        ...
        "addresses": json.dumps(element['addresses']),
    })
0
votes

I know this is an old question, but I had a similar issue (although Python 3.6 and NDB) and wrote a function to convert all dicts inside a dict into Entity. This uses recursion to go through all nodes converting as necessary:

def dict_to_entity(data):

    # the data can be a dict or a list, and they are iterated over differently
    # also create a new object to store the child objects
    if type(data) == dict:
        childiterator = data.items()
        new_data = {}
    elif type(data) == list:
        childiterator = enumerate(data)
        new_data = []
    else:
        return

    for i, child in childiterator:

        # if the child is a dict or a list, continue drilling...
        if type(child) in [dict, list]:
            new_child = dict_to_entity(child)
        else:
            new_child = child

        # add the child data to the new object
        if type(data) == dict:
            new_data[i] = new_child
        else:
            new_data.append(new_child)

    # convert the new object to Entity if needed
    if type(data) == dict:
        child_entity = datastore.Entity()
        child_entity.update(new_data)
        return child_entity
    else:
        return new_data