Apache Airflow version: v2.1.1
Kubernetes version (if you are using kubernetes) (use kubectl version):- Client Version: version.Info{Major:"1", Minor:"21", GitVersion:"v1.21.2", GitCommit:"092fbfbf53427de67cac1e9fa54aaa09a28371d7", GitTreeState:"clean", BuildDate:"2021-06-16T12:52:14Z", GoVersion:"go1.16.5", Compiler:"gc", Platform:"darwin/amd64"} Server Version: version.Info{Major:"1", Minor:"19+", GitVersion:"v1.19.8-eks-96780e", GitCommit:"96780e1b30acbf0a52c38b6030d7853e575bcdf3", GitTreeState:"clean", BuildDate:"2021-03-10T21:32:29Z", GoVersion:"go1.15.8", Compiler:"gc", Platform:"linux/amd64"}
Environment: Development
Cloud provider or hardware configuration: AWS EKS OS (e.g. from /etc/os-release): Kernel (e.g. uname -a): Install tools: Others: What happened: I am not able to create SparkApplications on the Kubernetes cluster using SparkKubernetesOperator from Airflow DAG. I have hosted Airflow and Spark-operator on EKS. I have created a connection on Airflow to connect to the Kubernetes cluster by using "in cluster configuration". I am just running the sample application just to check the execution of spark on Kubernetes through Airflow.
Application YAML file:-
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi-airflow
namespace: spark-apps
spec:
type: Scala
mode: cluster
image: "gcr.io/spark-operator/spark:v3.1.1"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
sparkVersion: "3.1.1"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.1.1
serviceAccount: spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.1.1
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
Airflow DAG:-
from datetime import timedelta
# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.utils.dates import days_ago
# [END import_module]
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'max_active_runs': 1,
}
# [END default_args]
# [START instantiate_dag]
dag = DAG(
'spark_pi_airflow',
default_args=default_args,
description='submit spark-pi as sparkApplication on kubernetes',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
)
t1 = SparkKubernetesOperator(
task_id='spark_pi_submit',
namespace="spark-apps",
application_file="example_spark_kubernetes_spark_pi.yaml",
kubernetes_conn_id="kubernetes_default",
do_xcom_push=True,
dag=dag,
)
t2 = SparkKubernetesSensor(
task_id='spark_pi_monitor',
namespace="spark-apps",
application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
kubernetes_conn_id="kubernetes_default",
dag=dag,
)
t1 >> t2
Error Message:-
[2021-07-12 10:18:46,629] {spark_kubernetes.py:67} INFO - Creating sparkApplication
[2021-07-12 10:18:46,662] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 174, in create_custom_object
response = api.create_namespaced_custom_object(
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py", line 183, in create_namespaced_custom_object
(data) = self.create_namespaced_custom_object_with_http_info(group, version, namespace, plural, body, **kwargs) # noqa: E501
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py", line 275, in create_namespaced_custom_object_with_http_info
return self.api_client.call_api(
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 340, in call_api
return self.__call_api(resource_path, method,
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 172, in __call_api
response_data = self.request(
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 382, in request
return self.rest_client.POST(url,
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 272, in POST
return self.request("POST", url,
File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 231, in request
raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"sparkapplications.sparkoperator.k8s.io is forbidden: User \"system:serviceaccount:airflow:airflow-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace \"spark-apps\"","reason":"Forbidden","details":{"group":"sparkoperator.k8s.io","kind":"sparkapplications"},"code":403}
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 69, in execute
response = hook.create_custom_object(
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 180, in create_custom_object
raise AirflowException(f"Exception when calling -> create_custom_object: {e}\n")
airflow.exceptions.AirflowException: Exception when calling -> create_custom_object: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"sparkapplications.sparkoperator.k8s.io is forbidden: User \"system:serviceaccount:***:***-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace \"spark-apps\"","reason":"Forbidden","details":{"group":"sparkoperator.k8s.io","kind":"sparkapplications"},"code":403}
What you expected to happen: Kubernetes Airflow should schedule and run spark job using SparkKubernetesOperator.
How to reproduce it: Deploy Spark operator using helm on Kubernetes cluster. Deploy Airflow using helm on Kubernetes cluster. Deploy the above-mentioned application and Airflow DAG.
Anything else we need to know:-
I have already created service account:-
$ kubectl create serviceaccount spark
Given the service account the edit role on the cluster:-
$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=airflow:airflow-cluster --namespace=spark-apps
airflow:airflow-cluster
service account's permissions. From last two commands I see that you createdserviceaccount spark
and next one isclusterrolebinding
to service accountairflow:airflow-cluster
. Make sure thatairflow-cluster
service account exists inairflow
namespace. Also you probably would like to createrolebinding
and not acluster role binding
to grant permissions for the service account to only selected namespace. – moonkotteclusterrole
-edit
doesn't include API groupsparkoperator.k8s.io
and you will need to create a role manually. Please find roles examples – moonkotte