1
votes

I have an EMR cluster that runs a spark streaming job successfully for a few days. But after a few days the cluster is terminated with step failure. I checked the logs and it says

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f8cb0854000, 12288, 0) failed; error='Cannot allocate memory' (errno=12)
Command exiting with ret '1'

For this error, I checked and found that, for JRE the memory is not sufficient.
I found that cluster creates EMR steps logs and store on path /mnt/var/logs/hadoop/steps/step_id/ and while cluster creation I've given a logUri path due to which the logs are copied to s3 location. So my guessing is that, due to these logs the step failure is occuring.

Can anyone suggest me how can I remove these emr step logs periodically from cluster so that the cluster will not go out of memory ?

1

1 Answers

0
votes

You can use following boto3 code ( I am sure this can be done in Java with AWS SDK for Java as well) to remove the logs, for removing periodically, you have options like

  1. Using workflow scheduler like Airflow, see example below
  2. Using it as lambda function and schedule it to run periodically (Much easier)
  3. With cron jon locally (not very feasible)

function to remove logs (Takes input the expiring threshold, bucket name and prefix, which can be "logs/sparksteps/j-")

def clean_s3(buck, match_prefix,exp_threshold):

    s3_client = boto3.client('s3')
    key_names = []
    file_timestamp = []
    file_size = []
    kwargs = {"Bucket": buck, "Prefix": match_prefix}
    while True:
        result = s3_client.list_objects_v2(**kwargs)
        for obj in result["Contents"]:

            if "." in obj["Key"]:
                key_names.append(obj["Key"])
                file_timestamp.append(obj["LastModified"].timestamp())
                file_size.append(obj["Size"])
        try:
            kwargs["ContinuationToken"] = result["NextContinuationToken"]
        except KeyError:
            break

    key_info = {
        "key_path": key_names,
        "timestamp": file_timestamp,
        "size": file_size
    }
    #print(f'All Keys in {buck} with {prefix} Prefix found!')
    s3_file = key_info
    for i, fs in enumerate(s3_file["timestamp"]):
            #file_expired = is_expired(fs)
            #print(fs)
            if fs < exp_threshold: #if True is recieved
                    print("Deleting %s" % {s3_file["key_path"][i]})
                    s3_client.delete_object(Bucket=buck, Key=s3_file["key_path"][i])

You can calculate the expiry threshold (in epoch seconds) that you need to pass as below

date_now = time.time()
days = 7 # 7 days
total_time = 86400*days 
exp_threshold = date_now-total_time

Now, for option 1, you can make an airflow operator like below

  s3_cleanup = PythonOperator(
        task_id='s3cleanup',
        python_callable=clean_s3,
        op_kwargs={
            'buck': '<you bucket>',
            'match_prefix': "logs/sparksteps/j-",
            'exp_threshold':exp_threshold,
            
    },dag=dag)

Alternatively, using apporach 2, you can schedule it with AWS lamda, See the guide for schedling with lambda here