I want to execute Spark jobs on demand. So, only when I receive a trigger event, I want to execute a Spark job, using the inputs arriving with this trigger event. Since trigger events are not frequent, I do not want to use Spark Streaming. My goal is to deploy the tool in AWS EMR cluster. I want to be able to create EMR cluster on demand (by triggers), execute Spark job there and switch off a cluster. Is there any good example of how to handle these operations from Scala?
3 Answers
AWS Data Pipeline seems to be a right solution for the problem you defined. AWS Data Pipeline allows you to connect multiple ranges of services within your AWS infrastructures such as storage and processing.
You can create a EMR job using an EMRActivity in AWS Data pipeline. The pipeline will trigger when it meets a pre-condition or at a scheduled interval.
It will set up an EMR cluster with the specification you specified and the Spark step you defined
The cluster can be terminated automatically when the job is completed.
This question on SO will get you started.
- You can also spin up an AWS Data Pipeline using this definition while creating a pipeline using
Choose a Template
option. For this option, you can use the template shared above.
A Lambda function could be a good solution as long at it allows to start EMR clusters. A Lambda function can be started on demand or using a number of different triggers to be invoked.
This could be a good template to start setting up your Lambda function:
`
import sys
import time
import boto3
def lambda_handler(event, context):
conn = boto3.client("emr")
# chooses the first cluster which is Running or Waiting
# possibly can also choose by name or already have the cluster id
clusters = conn.list_clusters()
# choose the correct cluster
clusters = [c["Id"] for c in clusters["Clusters"]
if c["Status"]["State"] in ["RUNNING", "WAITING"]]
if not clusters:
sys.stderr.write("No valid clusters\n")
sys.stderr.exit()
# take the first relevant cluster
cluster_id = clusters[0]
# code location on your emr master node
CODE_DIR = "/home/hadoop/code/"
# spark configuration example
step_args = ["/usr/bin/spark-submit", "--spark-conf", "your-configuration",
CODE_DIR + "your_file.py", '--your-parameters', 'parameters']
step = {"Name": "what_you_do-" + time.strftime("%Y%m%d-%H:%M"),
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3n://elasticmapreduce/libs/script-runner/script-runner.jar',
'Args': step_args
}
}
action = conn.add_job_flow_steps(JobFlowId=cluster_id, Steps=[step])
return "Added step: %s"%(action)
`
- instead of spinning EMR cluster via api its better to have EMR configured in cloud formation template and start the cloud formation template (CFT). by CFT u can handle IAM roles, creation of S3 buckets etc
- Spin the EMR cluster via lambda; and have another lambda monitor the EMR if there is any spark job in progress or waiting. if there is none for an idle time (say 15min), then issue a tear down on the CFT which inturn terminates the EMR
aws emr
command from Scala in order to create a cluster and run Spark job? – MarkusAmazonElasticMapReduceClient
: docs.aws.amazon.com/emr/latest/ReleaseGuide/… – Markus