23
votes

I currently automate my Apache Spark Pyspark scripts using clusters of EC2s using Sparks preconfigured ./ec2 directory. For automation and scheduling purposes, I would like to use Boto EMR module to send scripts up to the cluster.

I was able to bootstrap and install Spark on a cluster of EMRs. I am also able to launch a script on EMR by using my local machine's version of pyspark, and setting master like such:

$: MASTER=spark://<insert EMR master node of cluster here> ./bin/pyspark <myscriptname.py>

However, this requires me to run that script locally, and thus I am not able to fully leverage Boto's ability to 1) start the cluster 2) add the script steps and 3) stop the cluster. I've found examples using script-runner.sh and emr "step" commands for spark-shell (scala), but I assume there is an easier way to do this with the Python module (pyspark). Thanks so much in advance!

3

3 Answers

17
votes

Here is a great example of how it needs to be configured. Browse to "A quick example" for Python code.

However, in order to make things working in emr-4.7.2, a few tweaks had to be made, so here is a AWS CLI command that worked for me:

aws emr add-steps --cluster-id <Your EMR cluster id> --steps Type=spark,Name=TestJob,Args=[--deploy-mode,cluster,--master,yarn,--conf,spark.yarn.submit.waitAppCompletion=true,s3a://your-source-bucket/code/pythonjob.py,s3a://your-source-bucket/data/data.csv,s3a://your-destination-bucket/test-output/],ActionOnFailure=CONTINUE

And here is a contents of pythonjob.py file:

from __future__ import print_function
from pyspark import SparkContext
import sys
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: testjob  ", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="MyTestJob")
    dataTextAll = sc.textFile(sys.argv[1])
    dataRDD = dataTextAll.map(lambda x: x.split(",")).map(lambda y: (str(y[0]), float(y[1]))).reduceByKey(lambda a, b: a + b)
    dataRDD.saveAsTextFile(sys.argv[2])
    sc.stop()

It reads the data.csv file from S3, splits every row, converts first value to string and a second to float, groups by first value and sums the values in the second column, and writes the result back to S3.

A few comments:

  • I've decided to leave spark.yarn.submit.waitAppCompletion=true so that I can monitor job execution in console.
  • Input and output paths (sys.argv[1] and sys.argv[2] respectively) are passed to the script as part of the job sumbission (Args section in add-steps command).
  • Be aware that you must use s3a:// URI instead of s3n:// and s3:// for Hadoop 2.7+ when configuring your job.
  • If your cluster is in VPC, you need to create a VPC Endpoint for Amazon S3 if you intend to read/write from there in your EMR jobs.
5
votes

This might be helpful though it does not use boto.

Use aws cli to create the cluster and add steps(spark job) to it.

1)Create the cluster:

aws emr create-cluster --name "Spark cluster" --ami-version 3.8 --applications Name=Spark --ec2-attributes KeyName=ir --log-uri s3://Path/logs --instance-type m3.xlarge  --instance-count 1 --use-default-roles 

2) add step(spark job). Note that your python script should be stored in master node(in this case it is in /home/hadoop/spark ).

aws emr add-steps --cluster-id j-xxxxxxx --steps Name=Spark,Jar=s3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/home/hadoop/spark/bin/spark-submit,--deploy-mode,client,/home/hadoop/spark/myscript.py],ActionOnFailure=CONTINUE

you can also combine two steps into one and create cluster/run job and terminate the cluster.

Few notes: 1)I have tried multiple ways to read the script from S3 but no Luck :(

so I ended up copying it using either boto or aws cli to the node. 2) since I was testing that on one node in emr the deploy mode in step is client for client you should change that to cluster.

3
votes

you need to change the deploy-mode to cluster (instead of client) to access the script from S3.