1
votes

I am reading data from s3 bucket and inserting that data into aws elasticsearch using aws lambda .

If I use request.post then it works.But I need pipeline as a parameter.

  from elasticsearch import Elasticsearch, RequestsHttpConnection
  import boto3
  import re
  import requests
  from requests_aws4auth import AWS4Auth
  region = 'us-east-2'  # e.g. us-west-1
  service = 'es'
  credentials = boto3.Session().get_credentials()
  awsauth = AWS4Auth(credentials.access_key, credentials.secret_key,region, service, session_token=credentials.token)

  host = 'https://search-internship6-aqt7s3tuokjcvx7um3lmm7wvbe.us- east-2.es.amazonaws.com/'  # the Amazon ES domain, including https://
  index = 'lambda-s3-index11'
  type1 = 'lambda-type10'
  url1 = host + '/' + index + '/' + type1

  headers = {"Content-Type": "application/json"}

  s3 = boto3.client('s3')
  #r=requests.get(url=url1,auth=awsauth)
  #print("request ",r)
  #es = Elasticsearch([ 'https://search-internship6-aqt7s3tuokjcvx7um3lmm7wvbe.us-east-2.es.amazonaws.com'])

  print("Hello1")

   es = Elasticsearch(
     'https://search-internship6-aqt7s3tuokjcvx7um3lmm7wvbe.us-east-2.es.amazonaws.com',
    http_auth = awsauth,
    use_ssl = True,
    verify_certs = True,
    connection_class = RequestsHttpConnection
    )  

  # Lambda execution starts here
   def handler(event, context):
#r=requests.get(url= host ,auth=awsauth,headers=headers)
#print("request :",r)

print("es :",es.info())


for record in event['Records']:


    # Get the bucket name and key for the new file
    bucket = record['s3']['bucket']['name']
    key = record['s3']['object']['key']

    # Get, read, and split the file into lines
    obj = s3.get_object(Bucket=bucket, Key=key)
    body = obj['Body'].read()
    lines = body.splitlines()

    # Match the regular expressions to each line and index the JSON
    for line in lines:

        # ip = ip_pattern.search(line).group(1)
        # timestamp = time_pattern.search(line).group(1)
        # message = message_pattern.search(line).group(1)
        print(line)
        # document = { "ip": ip, "timestamp": timestamp, "message": message }
        #r = requests.post(url1, data=line, auth=awsauth, headers=headers)

        es.index(index='internship11', doc_type='packets11', body=line, pipeline='epoch-to-format')
        print("hello2")
        #r = requests.post(url1, data=line, auth=awsauth, headers=headers)
        #print(es.info())

I am getting this error.

ConnectionError(HTTPSConnectionPool(host='https', port=443): Max retries exceeded with url: //search-internship6-aqt7s3tuokjcvx7um3lmm7wvbe.us-east-2.es.amazonaws.com/:443/ (Caused by NewConnectionError (': Failed to establish a new connection: [Errno -2] Name or service not known',))) caused by: ConnectionError(HTTPSConnectionPool (host='https', port=443): Max retries exceeded with url: //search-internship6-aqt7s3tuokjcvx7um3lmm7wvbe.us-east-2.es.amazonaws.com/:443/ (Caused by NewConnectionError (': Failed to establish a new connection: [Errno -2] Name or service not known',))): ConnectionError

3

3 Answers

1
votes

Try removing the "https://" from the host name and use the below code

host = 'search-internship6-aqt7s3tuokjcvx7um3lmm7wvbe.us-east-2.es.amazonaws.com'
port = 80

auth = AWSRequestsAuth(
    aws_access_key=credentials.access_key,
    aws_secret_access_key=credentials.secret_key,
    aws_region=region,
    aws_host=host,
    aws_service="es"
)

# Use the requests connection_class and pass in our custom auth class

es = Elasticsearch(
    host=host,
    port=port,
    connection_class=RequestsHttpConnection,
    http_auth=auth
)
1
votes

I was receiving this error when my lambda was not connected to the VPC (more specifically the same VPC as my Elasticsearch Service instance). To configure VPC for your lambda go to the lambda configuration in your console:Image

0
votes

Just to complement @Ashraful Islam answer.

Running on AWS with IAM

If you want to use this client with IAM based authentication on AWS you can use the requests-aws4auth package:

from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth

host = 'YOURHOST.us-east-1.es.amazonaws.com'
awsauth = AWS4Auth(YOUR_ACCESS_KEY, YOUR_SECRET_KEY, REGION, 'es')

es = Elasticsearch(
    hosts=[{'host': host, 'port': 443}],
    http_auth=awsauth,
    use_ssl=True,
    verify_certs=True,
    connection_class=RequestsHttpConnection
)
print(es.info())

Reference: https://elasticsearch-py.readthedocs.io/en/master/