You can use following boto3 code ( I am sure this can be done in Java with AWS SDK for Java as well) to remove the logs, for removing periodically, you have options like
- Using workflow scheduler like Airflow, see example below
- Using it as lambda function and schedule it to run periodically (Much easier)
- With cron jon locally (not very feasible)
function to remove logs (Takes input the expiring threshold, bucket name and prefix, which can be "logs/sparksteps/j-")
def clean_s3(buck, match_prefix,exp_threshold):
s3_client = boto3.client('s3')
key_names = []
file_timestamp = []
file_size = []
kwargs = {"Bucket": buck, "Prefix": match_prefix}
while True:
result = s3_client.list_objects_v2(**kwargs)
for obj in result["Contents"]:
if "." in obj["Key"]:
key_names.append(obj["Key"])
file_timestamp.append(obj["LastModified"].timestamp())
file_size.append(obj["Size"])
try:
kwargs["ContinuationToken"] = result["NextContinuationToken"]
except KeyError:
break
key_info = {
"key_path": key_names,
"timestamp": file_timestamp,
"size": file_size
}
#print(f'All Keys in {buck} with {prefix} Prefix found!')
s3_file = key_info
for i, fs in enumerate(s3_file["timestamp"]):
#file_expired = is_expired(fs)
#print(fs)
if fs < exp_threshold: #if True is recieved
print("Deleting %s" % {s3_file["key_path"][i]})
s3_client.delete_object(Bucket=buck, Key=s3_file["key_path"][i])
You can calculate the expiry threshold (in epoch seconds) that you need to pass as below
date_now = time.time()
days = 7 # 7 days
total_time = 86400*days
exp_threshold = date_now-total_time
Now, for option 1, you can make an airflow operator like below
s3_cleanup = PythonOperator(
task_id='s3cleanup',
python_callable=clean_s3,
op_kwargs={
'buck': '<you bucket>',
'match_prefix': "logs/sparksteps/j-",
'exp_threshold':exp_threshold,
},dag=dag)
Alternatively, using apporach 2, you can schedule it with AWS lamda, See the guide for schedling with lambda here