0
votes

I am considering using AWS EMR Spark to run a Spark application against very large Parquet files stored on S3. The overall flow here is that a Java process would upload these large files to S3, and I'd like to automatically trigger the running of a Spark job (injected with the S3 keyname(s) of the files uploaded) on those files.

Ideally, there would be some kind of S3-based EMR trigger available to wire up; that is, I configure EMR/Spark to "listen" to an S3 bucket and to kick off a Spark job when an upsertis made to that bucket.

If no such trigger exists, I could probably kludge something together, such as kick off a Lambda from the S3 event, and have the Lambda somehow trigger the EMR Spark job.

However my understanding (please correct me if I'm wrong) is that the only way to kick off a Spark job is to:

  1. Package the job up as an executable JAR file; and
  2. Submit it to the cluster (EMR or otherwise) via the spark-submit shell script

So if I have to do the Lambda-based kludge, I'm not exactly sure what the best way to trigger the EMR/Spark job is, seeing that Lambdas don't natively carry spark-submit in their runtimes. And even if I configured my own Lambda runtime (which I believe is now possible to do), this solution already feels really wonky and fault-intolerant.

Anybody ever trigger an EMR/Spark job from an S3 trigger or any AWS trigger before?

1
When S3 event happen, receive it from AWS Lambda as a trigger and launch a job with aws sdk i.e. python boto3 to the EMRLamanus

1 Answers

2
votes

EMR Spark job can be executed as a step as in Adding a Spark Step. Step is not just at the EMR cluster creation time after bootstrap.

aws emr add-steps --cluster-id j-2AXXXXXXGAPLF --steps Type=Spark,Name="Spark Program",ActionOnFailure=CONTINUE,Args=[--class,org.apache.spark.examples.SparkPi,/usr/lib/spark/examples/jars/spark-examples.jar,10]

As it is a AWS CLI, you can invoke it from Lambda in which also you can upload the jar file to HDFS or S3, then point it using s3:// or hdfs://.

The document also has a Java example.

AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);

StepFactory stepFactory = new StepFactory();
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials);
AddJobFlowStepsRequest req = new AddJobFlowStepsRequest();
req.withJobFlowId("j-1K48XXXXXXHCB");

List<StepConfig> stepConfigs = new ArrayList<StepConfig>();

HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
            .withJar("command-runner.jar")
            .withArgs("spark-submit","--executor-memory","1g","--class","org.apache.spark.examples.SparkPi","/usr/lib/spark/examples/jars/spark-examples.jar","10");            

StepConfig sparkStep = new StepConfig()
            .withName("Spark Step")
            .withActionOnFailure("CONTINUE")
            .withHadoopJarStep(sparkStepConf);

stepConfigs.add(sparkStep);
req.withSteps(stepConfigs);
AddJobFlowStepsResult result = emr.addJobFlowSteps(req);