2
votes

How to pass this entire document into elastic search using Python? Is this this the right way to put into elastic search?

In dynamodb id is the primary key

How to insert in to dynamodb Below is the code

import boto3
from boto3.dynamodb.conditions import Key, And, Attr
def lambda_handler(event, context):
    dynamodb = boto3.resource ('dynamodb')
    table =dynamodb.Table('newtable')
    with table.batch_writer(overwrite_by_pkeys=['id']) as batch:
            batch.put_item(
                Item={
                    'id': '1',
                    'last_name': 'V',
                    'age': '2',
                }
            )
            batch.put_item(
                Item={
                    'id': '2',
                    'last_name': 'JJ',
                    'age': '7',
                }
            )
            batch.put_item(
                Item={
                    'id': '9',
                    'last_name': 'ADD',
                    'age': '95',
                }
            )
            batch.put_item(
                Item={
                    'id': '10',
                    'last_name': 'ADD',
                    'age': '95',
                }
            )
  • How to push expected out into Elastic Search

  • How to reflect automatically in ES if dynamodb content changes

I have gone through the link https://aws.amazon.com/blogs/compute/indexing-amazon-dynamodb-content-with-amazon-elasticsearch-service-using-aws-lambda/

Below is code I am getting error ERROR: NameError("name 'event' is not defined")

Code. * Before that trigger the below lambda function from the dynamodb table

import boto3
import json
import re
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection

session = boto3.session.Session()
credentials = session.get_credentials()
# s3 = session.resource('s3')
awsauth = AWS4Auth(credentials.access_key,
                   credentials.secret_key,
                   session.region_name, 'es',
                   session_token=credentials.token)
es = Elasticsearch(
    ['https://xx-east-1.es.amazonaws.com'],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)
reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size",
                   "_timestamp", "_ttl"]


def lambda_handler(event, context):
    print(event)
    dynamodb = boto3.resource('dynamodb')

    # Loop over the DynamoDB Stream records
    for record in event['Records']:

        try:

            if record['eventName'] == "INSERT":
                insert_document(es, record)
            elif record['eventName'] == "REMOVE":
                remove_document(es, record)
            elif record['eventName'] == "MODIFY":
                modify_document(es, record)

        except Exception as e:
            print("Failed to process:")
            print(json.dumps(record))
            print("ERROR: " + repr(e))
            continue


# Process MODIFY events
def modify_document(es, record):
    table = getTable(record)
    print("Dynamo Table: " + table)

    docId = docid(record)
    print("KEY")
    print(docId)

    # Unmarshal the DynamoDB JSON to a normal JSON
    doc = json.dumps(document())

    print("Updated document:")
    print(doc)

    # We reindex the whole document as ES accepts partial docs
    es.index(index=table,
             body=doc,
             id=docId,
             doc_type=table,
             refresh=True)

    print("Successly modified - Index: " + table + " - Document ID: " + docId)


def remove_document(es, record):
    table = getTable(record)
    print("Dynamo Table: " + table)

    docId = docid(record)
    print("Deleting document ID: " + docId)

    es.delete(index=table,
              id=docId,
              doc_type=table,
              refresh=True)

    print("Successly removed - Index: " + table + " - Document ID: " + docId)


# Process INSERT events
def insert_document(es, record):
    table = getTable(record)
    print("Dynamo Table: " + table)

    # Create index if missing
    if es.indices.exists(table) == False:
        print("Create missing index: " + table)

        es.indices.create(table,
                          body='{"settings": { "index.mapping.coerce": true } }')

        print("Index created: " + table)

    # Unmarshal the DynamoDB JSON to a normal JSON
    doc = json.dumps(document())

    print("New document to Index:")
    print(doc)

    newId = docid(record)
    es.index(index=table,
             body=doc,
             id=newId,
             doc_type=table,
             refresh=True)

    print("Successly inserted - Index: " + table + " - Document ID: " + newId)


def getTable(record):
    p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+')
    m = p.match(record['eventSourceARN'])
    if m is None:
        raise Exception("Table not found in SourceARN")
    return m.group(1).lower()


def document(event):
    result = []
    for r in event['Records']:
        tmp = {}
        for k, v in r['dynamodb']['NewImage'].items():
            if "S" in v.keys() or "BOOL" in v.keys():
                tmp[k] = v.get('S', v.get('BOOL', False))
            elif 'NULL' in v:
                tmp[k] = None
        result.append(tmp)
        for i in result:
            return i


def docid(event):
    result = []
    for r in event['Records']:
        tmp = {}
        for k, v in r['dynamodb']['Keys'].items():
            if "S" in v.keys() or "BOOL" in v.keys():
                tmp[k] = v.get('S', v.get('BOOL', False))
            elif 'NULL' in v:
                tmp[k] = None
        result.append(tmp)
    for newId in result:
        return newId

Getting error at document and docid

Individually both are giving output

result = []
for r in event['Records']:
    tmp = {}

    for k, v in r['dynamodb']['NewImage'].items():
    #for k, v in r['dynamodb']['Keys'].items():
        if "S" in v.keys() or "BOOL" in v.keys():
            tmp[k] = v.get('S', v.get('BOOL', False))
        elif 'NULL' in v:
            tmp[k] = None

    result.append(tmp)
for i in result:
    print (i)

event = {'Records': [{'eventID': '2339bc590c21035b84f8cc602b12c1d2', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '9'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '9'}, 'age': {'S': '95'}}, 'SequenceNumber': '3100000000035684810908', 'SizeBytes': 23, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'xxxx', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '2'}}, 'NewImage': {'last_name': {'S': 'JJ'}, 'id': {'S': '2'}, 'age': {'S': '5'}}, 'SequenceNumber': '3200000000035684810954', 'SizeBytes': 21, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'a9c90c0c4a5a4b64d0314c4557e94e28', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '10'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '10'}, 'age': {'S': '95'}}, 'SequenceNumber': '3300000000035684810956', 'SizeBytes': 25, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': '288f4a424992e5917af0350b53f754dc', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '1'}}, 'NewImage': {'last_name': {'S': 'V'}, 'id': {'S': '1'}, 'age': {'S': '2'}}, 'SequenceNumber': '3400000000035684810957', 'SizeBytes': 20, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}]}
2
Which line does ERROR: NameError("name 'event' is not defined") come from? Also what is the example of a record in dynamodb that you are trying to process?Marcin
@Marcin added piece of code to insert into dynamoaysh
@SaiSreenivas event is parameter right in lambda handler, then how to call this locally to another variableaysh
@SaiSreenivas, already triedaysh
@SaiSreenivas document(event), docId(event)aysh

2 Answers

2
votes

You can check the following. I tried to replicate the issue and can confirm the error of

ERROR: NameError("name 'event' is not defined")

I used simulated INSERT event from DynamoDb stream, based on your example and my own table:

{
  "Records": [
    {
      "eventID": "b8b993cf16d1aacb61b40411b39e0b1f",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1595922821.0,
        "Keys": {
          "id": {
            "N": "1"
          }
        },
        "NewImage": {
          "last_name": {
            "S": "V"
          },
          "id": {
            "N": "1"
          },
          "age": {
            "S": "2"
          }
        },
        "SequenceNumber": "25200000000020406897812",
        "SizeBytes": 22,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
    },
    {
      "eventID": "e5d5bec988945c06ffc879cf16b89bf7",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1595922821.0,
        "Keys": {
          "id": {
            "N": "9"
          }
        },
        "NewImage": {
          "last_name": {
            "S": "ADD"
          },
          "id": {
            "N": "9"
          },
          "age": {
            "S": "95"
          }
        },
        "SequenceNumber": "25300000000020406897813",
        "SizeBytes": 25,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
    },
    {
      "eventID": "f1a7c9736253b5ef28ced38ed5ff645b",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1595922821.0,
        "Keys": {
          "id": {
            "N": "2"
          }
        },
        "NewImage": {
          "last_name": {
            "S": "JJ"
          },
          "id": {
            "N": "2"
          },
          "age": {
            "S": "7"
          }
        },
        "SequenceNumber": "25400000000020406897819",
        "SizeBytes": 23,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
    },
    {
      "eventID": "bfcbad9dc19883e4172e6dc25e66637b",
      "eventName": "INSERT",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1595922821.0,
        "Keys": {
          "id": {
            "N": "10"
          }
        },
        "NewImage": {
          "last_name": {
            "S": "ADD"
          },
          "id": {
            "N": "10"
          },
          "age": {
            "S": "95"
          }
        },
        "SequenceNumber": "25500000000020406897820",
        "SizeBytes": 25,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:11111:table/newtable/stream/2020-07-28T06:59:38.569"
    }
  ]
}

Example MODIFY event:

{
  "Records": [
    {
      "eventID": "4e4629c88aa00e366c89a293d9c82d54",
      "eventName": "MODIFY",
      "eventVersion": "1.1",
      "eventSource": "aws:dynamodb",
      "awsRegion": "us-east-1",
      "dynamodb": {
        "ApproximateCreationDateTime": 1595924589.0,
        "Keys": {
          "id": {
            "N": "2"
          }
        },
        "NewImage": {
          "last_name": {
            "S": "zhgdhfgdh"
          },
          "id": {
            "N": "2"
          },
          "age": {
            "S": "7"
          }
        },
        "OldImage": {
          "last_name": {
            "S": "JJ"
          },
          "id": {
            "N": "2"
          },
          "age": {
            "S": "7"
          }
        },
        "SequenceNumber": "25600000000020408264140",
        "SizeBytes": 49,
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "eventSourceARN": "arn:aws:dynamodb:us-east-1:34234:table/newtable/stream/2020-07-28T06:59:38.569"
    }
  ]
}

Modified code of lambda function, which I can confirm does not produce errors now:

import boto3
import json
import re

from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection

session = boto3.session.Session()
credentials = session.get_credentials()

s3 = session.resource('s3')

awsauth = AWS4Auth(credentials.access_key,
                  credentials.secret_key,
                  session.region_name, 'es',
                  session_token=credentials.token)

    
es = Elasticsearch(
    ['https://vpc-test-dmamain-452frn764ggb4a.us-east-1.es.amazonaws.com'],
    use_ssl=True,
    verify_certs=True,
    http_auth=awsauth,
    connection_class=RequestsHttpConnection
)
reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size",
                   "_timestamp", "_ttl"]


def lambda_handler(event, context):
    print(event)
    #dynamodb = boto3.resource('dynamodb')

    # Loop over the DynamoDB Stream records
    for record in event['Records']:
            
        if record['eventName'] == "INSERT":
            insert_document(event, es, record)
        elif record['eventName'] == "REMOVE":
            remove_document(event, es, record)
        elif record['eventName'] == "MODIFY":
            modify_document(event, es, record)


# Process MODIFY events
def modify_document(event, es, record):
    table = getTable(record)
    print("Dynamo Table: " + table)

    docId = docid(event, event)
    print("KEY")
    print(docId)

    # Unmarshal the DynamoDB JSON to a normal JSON
    doc = json.dumps(document(event))

    print("Updated document:")
    print(doc)

    # We reindex the whole document as ES accepts partial docs
    es.index(index=table,
             body=doc,
             id=docId,
             doc_type=table,
             refresh=True)

    print("Successly modified - Index: " , table , " - Document ID: " , docId)


def remove_document(event, es, record):
    
    table = getTable(record)
    
    print("Dynamo Table: " + table)

    docId = docid(event, event)
    print("Deleting document ID: ", docId)

    es.delete(index=table,
              id=docId,
              doc_type=table,
              refresh=True)

    print("Successly removed - Index: ", table, " - Document ID: " , docId)


# Process INSERT events
def insert_document(event, es, record):
    table = getTable(record)
    print("Dynamo Table: " + table)

    # Create index if missing
    if es.indices.exists(table) == False:
        print("Create missing index: " + table)

        es.indices.create(table,
                          body='{"settings": { "index.mapping.coerce": true } }')

        print("Index created: " + table)

    # Unmarshal the DynamoDB JSON to a normal JSON
    doc = json.dumps(document(event))

    print("New document to Index:")
    print(doc)

    newId = docid(event, record)
    
    es.index(index=table,
             body=doc,
             id=newId,
             doc_type=table,
             refresh=True)

    print("Successly inserted - Index: " , table + " - Document ID: " , newId)


def getTable(record):
    p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+')
    m = p.match(record['eventSourceARN'])
    if m is None:
        raise Exception("Table not found in SourceARN")
    return m.group(1).lower()


def document(event):
    result = []
    for r in event['Records']:
        tmp = {}
        for k, v in r['dynamodb']['NewImage'].items():
            if "S" in v.keys() or "BOOL" in v.keys():
                tmp[k] = v.get('S', v.get('BOOL', False))
            elif 'NULL' in v:
                tmp[k] = None
        result.append(tmp)
        for i in result:
            return i


def docid(event, record):
    result = []
    for r in event['Records']:
        tmp = {}
        for k, v in r['dynamodb']['Keys'].items():
            if "S" in v.keys() or "BOOL" in v.keys():
                tmp[k] = v.get('S', v.get('BOOL', False))
            elif 'NULL' in v:
                tmp[k] = None
        result.append(tmp)
    for newId in result:
        return newId

I haven't verified if data is correctly written, modified or inserted into ElasticSearch. But I had ES domain running and used in the lambda to verify if lambda can connect to it and run the queries.

Example output from lambda for INSERT event:

Dynamo Table: newtable
New document to Index:
{"last_name": "V", "age": "2"}
Successly inserted - Index:  newtable - Document ID:  {}
Dynamo Table: newtable
New document to Index:
{"last_name": "V", "age": "2"}
Successly inserted - Index:  newtable - Document ID:  {}
Dynamo Table: newtable
New document to Index:
{"last_name": "V", "age": "2"}
Successly inserted - Index:  newtable - Document ID:  {}
Dynamo Table: newtable
New document to Index:
{"last_name": "V", "age": "2"}
Successly inserted - Index:  newtable - Document ID:  {}

Example output from lambda from MODIFY event:

Updated document:

{
    "last_name": "zhgdhfgdh",
    "age": "7"
}
Successly modified - Index:  newtable  - Document ID:  
{}

I think docid requires further investigation if it works correctly as it seems to return empty dict:

 Document ID:  {}
1
votes

Most people use DynamoDB Streams with Lambda which pushes it into ElasticSearch.

Here is a blog post about it. it is a little old, so you might need to play with things a bit.