0
votes

For my workflow, I need to run a job with spark2. I did not find any example or good documentation for the SparkSubmitOperator but tried to use it anyway with

spark_submit = SparkSubmitOperator(
task_id='task_id',
application=string_with_path_to_jar_file,
conf={
    'spark.sql.warehouse.dir': 'file:/tmp/',
    'spark.hadoop.fs.permissions.umask-mode': '002',
    'spark.serializer': 'org.apache.spark.serializer.KryoSerializer',
    'spark.network.timeout': '360s',
    'spark.yarn.executor.memoryOverhead': '5g',
    'spark.dynamicAllocation.maxExecutors': '100'
},
env_vars={
    'master': 'yarn',
    'deploy-mode': 'client'
},
java_class=some_java_class,
executor_memory='12G',
driver_memory='3G',
num_executors=50,
application_args={'app.properties'})

When I run my job I get the following warning:

.local/lib/python2.7/site-packages/airflow/models.py:2160: PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:  
[2018-07-09 18:01:53,947] {base_task_runner.py:98} INFO - Subtask: *args: ()  
[2018-07-09 18:01:53,947] {base_task_runner.py:98} INFO - Subtask: **kwargs: {'env_vars': {'deploy-mode': 'client', 'master': 'yarn'}}  
[2018-07-09 18:01:53,947] {base_task_runner.py:98} INFO - Subtask:   category=PendingDeprecationWarning  

Now my question is:

I am probably using the SparkSubmitOperator in a wrong way, is there any good example/documentation for using it or does someone know what I am doing wrong?

1
Is it neccessary to use env_vars? This seems to me to be the whole problem here. - tobi6
That was the problem, I had to put that into the Connections->spark_default configuration of the webUI. - Christopher Beck
but it would be nice to have the option to set deploy mode from airflow. is there a recommended way for setting that directly from the operator? - horatio1701d

1 Answers

0
votes

Here you go, snippet of SparkSubmitOperator:

   SparkSubmitOperator(
   task_id='Extraction',
   application='../scala-2.11/ssot_2.11-0.1.jar',
   conn_id='spark_default',
   driver_class_path='../mysql-connector-java/jars/mysql-connector-java-8.0.17.jar',
   jars='../mysql-connector-java/jars/mysql-connector-java-8.0.17.jar',
   dag=dag)

Also spark_default details on airflow connections.