How to pass this entire document into elastic search using Python? Is this this the right way to put into elastic search?
In dynamodb id
is the primary key
How to insert in to dynamodb Below is the code
import boto3
from boto3.dynamodb.conditions import Key, And, Attr
def lambda_handler(event, context):
dynamodb = boto3.resource ('dynamodb')
table =dynamodb.Table('newtable')
with table.batch_writer(overwrite_by_pkeys=['id']) as batch:
batch.put_item(
Item={
'id': '1',
'last_name': 'V',
'age': '2',
}
)
batch.put_item(
Item={
'id': '2',
'last_name': 'JJ',
'age': '7',
}
)
batch.put_item(
Item={
'id': '9',
'last_name': 'ADD',
'age': '95',
}
)
batch.put_item(
Item={
'id': '10',
'last_name': 'ADD',
'age': '95',
}
)
How to push expected out into Elastic Search
How to reflect automatically in ES if dynamodb content changes
I have gone through the link https://aws.amazon.com/blogs/compute/indexing-amazon-dynamodb-content-with-amazon-elasticsearch-service-using-aws-lambda/
Below is code I am getting error ERROR: NameError("name 'event' is not defined")
Code. * Before that trigger the below lambda function from the dynamodb table
import boto3
import json
import re
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection
session = boto3.session.Session()
credentials = session.get_credentials()
# s3 = session.resource('s3')
awsauth = AWS4Auth(credentials.access_key,
credentials.secret_key,
session.region_name, 'es',
session_token=credentials.token)
es = Elasticsearch(
['https://xx-east-1.es.amazonaws.com'],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
reserved_fields = ["uid", "_id", "_type", "_source", "_all", "_parent", "_fieldnames", "_routing", "_index", "_size",
"_timestamp", "_ttl"]
def lambda_handler(event, context):
print(event)
dynamodb = boto3.resource('dynamodb')
# Loop over the DynamoDB Stream records
for record in event['Records']:
try:
if record['eventName'] == "INSERT":
insert_document(es, record)
elif record['eventName'] == "REMOVE":
remove_document(es, record)
elif record['eventName'] == "MODIFY":
modify_document(es, record)
except Exception as e:
print("Failed to process:")
print(json.dumps(record))
print("ERROR: " + repr(e))
continue
# Process MODIFY events
def modify_document(es, record):
table = getTable(record)
print("Dynamo Table: " + table)
docId = docid(record)
print("KEY")
print(docId)
# Unmarshal the DynamoDB JSON to a normal JSON
doc = json.dumps(document())
print("Updated document:")
print(doc)
# We reindex the whole document as ES accepts partial docs
es.index(index=table,
body=doc,
id=docId,
doc_type=table,
refresh=True)
print("Successly modified - Index: " + table + " - Document ID: " + docId)
def remove_document(es, record):
table = getTable(record)
print("Dynamo Table: " + table)
docId = docid(record)
print("Deleting document ID: " + docId)
es.delete(index=table,
id=docId,
doc_type=table,
refresh=True)
print("Successly removed - Index: " + table + " - Document ID: " + docId)
# Process INSERT events
def insert_document(es, record):
table = getTable(record)
print("Dynamo Table: " + table)
# Create index if missing
if es.indices.exists(table) == False:
print("Create missing index: " + table)
es.indices.create(table,
body='{"settings": { "index.mapping.coerce": true } }')
print("Index created: " + table)
# Unmarshal the DynamoDB JSON to a normal JSON
doc = json.dumps(document())
print("New document to Index:")
print(doc)
newId = docid(record)
es.index(index=table,
body=doc,
id=newId,
doc_type=table,
refresh=True)
print("Successly inserted - Index: " + table + " - Document ID: " + newId)
def getTable(record):
p = re.compile('arn:aws:dynamodb:.*?:.*?:table/([0-9a-zA-Z_-]+)/.+')
m = p.match(record['eventSourceARN'])
if m is None:
raise Exception("Table not found in SourceARN")
return m.group(1).lower()
def document(event):
result = []
for r in event['Records']:
tmp = {}
for k, v in r['dynamodb']['NewImage'].items():
if "S" in v.keys() or "BOOL" in v.keys():
tmp[k] = v.get('S', v.get('BOOL', False))
elif 'NULL' in v:
tmp[k] = None
result.append(tmp)
for i in result:
return i
def docid(event):
result = []
for r in event['Records']:
tmp = {}
for k, v in r['dynamodb']['Keys'].items():
if "S" in v.keys() or "BOOL" in v.keys():
tmp[k] = v.get('S', v.get('BOOL', False))
elif 'NULL' in v:
tmp[k] = None
result.append(tmp)
for newId in result:
return newId
Getting error at document and docid
Individually both are giving output
result = []
for r in event['Records']:
tmp = {}
for k, v in r['dynamodb']['NewImage'].items():
#for k, v in r['dynamodb']['Keys'].items():
if "S" in v.keys() or "BOOL" in v.keys():
tmp[k] = v.get('S', v.get('BOOL', False))
elif 'NULL' in v:
tmp[k] = None
result.append(tmp)
for i in result:
print (i)
event = {'Records': [{'eventID': '2339bc590c21035b84f8cc602b12c1d2', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '9'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '9'}, 'age': {'S': '95'}}, 'SequenceNumber': '3100000000035684810908', 'SizeBytes': 23, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'xxxx', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '2'}}, 'NewImage': {'last_name': {'S': 'JJ'}, 'id': {'S': '2'}, 'age': {'S': '5'}}, 'SequenceNumber': '3200000000035684810954', 'SizeBytes': 21, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': 'a9c90c0c4a5a4b64d0314c4557e94e28', 'eventName': 'INSERT', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '10'}}, 'NewImage': {'last_name': {'S': 'Hus'}, 'id': {'S': '10'}, 'age': {'S': '95'}}, 'SequenceNumber': '3300000000035684810956', 'SizeBytes': 25, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}, {'eventID': '288f4a424992e5917af0350b53f754dc', 'eventName': 'MODIFY', 'eventVersion': '1.1', 'eventSource': 'aws:dynamodb', 'awsRegion': 'us-east-1', 'dynamodb': {'ApproximateCreationDateTime': 1595908037.0, 'Keys': {'id': {'S': '1'}}, 'NewImage': {'last_name': {'S': 'V'}, 'id': {'S': '1'}, 'age': {'S': '2'}}, 'SequenceNumber': '3400000000035684810957', 'SizeBytes': 20, 'StreamViewType': 'NEW_IMAGE'}, 'eventSourceARN': 'arn:aws:dynamodb:us-east-1:xxxx:table/glossary/stream/2020-07-28T00:26:55.462'}]}
ERROR: NameError("name 'event' is not defined")
come from? Also what is the example of a record in dynamodb that you are trying to process? – Marcin