0
votes

Am trying to activate the data pipeline based on the existence of *.tar files in S3. I created a Lambda function and wrote the Python Boto 3 code to activate data pipeline. I have tested the Lambda function and found it to work when the .tar file exists the data pipeline is activated, if not exist data pipeline is not activated.

I am trying to understand the cause of these issues:

  1. If no tar files exist in s3 location the print ("datapipeline not activated") is not printed in logs.
  2. If I interrupt the data pipeline in previous run and it is marked as finish before the data pipeline completes then trigger the lambda function again I am getting the below error.

    ERROR: The field 'maxActiveInstances' can be set only on the Default object for On-demand pipelines

  3. When i tried to set 'maxActiveInstances' under EMR resource in data pipeline,

    { "errorMessage": "An error occurred (InvalidRequestException) when calling the ActivatePipeline operation: Web service limit exceeded: Exceeded number of concurrent executions. Please set the field 'maxActiveInstances' to a higher value in your pipeline or wait for the currenly running executions to complete before trying again", "errorType": "InvalidRequestException", "stackTrace": [ [ "/var/task/lambda_function.py", 21, "lambda_handler", "activate = client.activate_pipeline(pipelineId=data_pipeline_id,parameterValues=[])" ], [ "/var/runtime/botocore/client.py", 314, "_api_call", "return self._make_api_call(operation_name, kwargs)" ], [ "/var/runtime/botocore/client.py", 612, "_make_api_call", "raise error_class(parsed_response, operation_name)" ] ] }

This is the Python script, please provide guidance to resolve these issues.

import boto3
import logging
logger = logging.getLogger()

def lambda_handler(event, context):
client = boto3.client('datapipeline')
s3_client = boto3.client('s3')
#client = boto3.client('datapipeline')
data_pipeline_id="df-xxxxxxxx"
bucket = 'xxxxx'
prefix = 'xxxx/xxxxx/'
paginator = s3_client.get_paginator('list_objects_v2')
response_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
response_pipeline = client.describe_pipelines(pipelineIds=[data_pipeline_id])
for response in response_iterator:
for object_data in response['Contents']:
key = object_data['Key']
    #print (key)
if key.endswith('.tar'):
if(response_pipeline):
activate = client.activate_pipeline(pipelineId=data_pipeline_id,parameterValues=[])
print ("activated")
else:
print ("datapipeline not activated")
1

1 Answers

1
votes

I think I've just seen these same symptoms, hopefully sharing our fix might help you?

We'd cancelled an instance of the pipeline and needed to re-enable the pipeline to get past this error.