
Apache Airflow version: v2.1.1

Kubernetes version (if you are using kubernetes) (use kubectl version):- Client Version: version.Info{Major:"1", Minor:"21", GitVersion:"v1.21.2", GitCommit:"092fbfbf53427de67cac1e9fa54aaa09a28371d7", GitTreeState:"clean", BuildDate:"2021-06-16T12:52:14Z", GoVersion:"go1.16.5", Compiler:"gc", Platform:"darwin/amd64"} Server Version: version.Info{Major:"1", Minor:"19+", GitVersion:"v1.19.8-eks-96780e", GitCommit:"96780e1b30acbf0a52c38b6030d7853e575bcdf3", GitTreeState:"clean", BuildDate:"2021-03-10T21:32:29Z", GoVersion:"go1.15.8", Compiler:"gc", Platform:"linux/amd64"}

Environment: Development

Cloud provider or hardware configuration: AWS EKS OS (e.g. from /etc/os-release): Kernel (e.g. uname -a): Install tools: Others: What happened: I am not able to create SparkApplications on the Kubernetes cluster using SparkKubernetesOperator from Airflow DAG. I have hosted Airflow and Spark-operator on EKS. I have created a connection on Airflow to connect to the Kubernetes cluster by using "in cluster configuration". I am just running the sample application just to check the execution of spark on Kubernetes through Airflow.

Application YAML file:-

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
  name: spark-pi-airflow
  namespace: spark-apps
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v3.1.1"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
  sparkVersion: "3.1.1"
    type: Never
    - name: "test-volume"
        path: "/tmp"
        type: Directory
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
      version: 3.1.1
    serviceAccount: spark
      - name: "test-volume"
        mountPath: "/tmp"
    cores: 1
    instances: 1
    memory: "512m"
      version: 3.1.1
      - name: "test-volume"
        mountPath: "/tmp"

Airflow DAG:-

from datetime import timedelta

# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.utils.dates import days_ago

# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'max_active_runs': 1,
# [END default_args]

# [START instantiate_dag]

dag = DAG(
    description='submit spark-pi as sparkApplication on kubernetes',

t1 = SparkKubernetesOperator(

t2 = SparkKubernetesSensor(
    application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
t1 >> t2

Error Message:-

[2021-07-12 10:18:46,629] {spark_kubernetes.py:67} INFO - Creating sparkApplication
[2021-07-12 10:18:46,662] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 174, in create_custom_object
    response = api.create_namespaced_custom_object(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py", line 183, in create_namespaced_custom_object
    (data) = self.create_namespaced_custom_object_with_http_info(group, version, namespace, plural, body, **kwargs)  # noqa: E501
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api/custom_objects_api.py", line 275, in create_namespaced_custom_object_with_http_info
    return self.api_client.call_api(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 340, in call_api
    return self.__call_api(resource_path, method,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 172, in __call_api
    response_data = self.request(
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/api_client.py", line 382, in request
    return self.rest_client.POST(url,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 272, in POST
    return self.request("POST", url,
  File "/home/airflow/.local/lib/python3.8/site-packages/kubernetes/client/rest.py", line 231, in request
    raise ApiException(http_resp=r)
kubernetes.client.rest.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"sparkapplications.sparkoperator.k8s.io is forbidden: User \"system:serviceaccount:airflow:airflow-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace \"spark-apps\"","reason":"Forbidden","details":{"group":"sparkoperator.k8s.io","kind":"sparkapplications"},"code":403}

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1361, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py", line 69, in execute
    response = hook.create_custom_object(
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py", line 180, in create_custom_object
    raise AirflowException(f"Exception when calling -> create_custom_object: {e}\n")
airflow.exceptions.AirflowException: Exception when calling -> create_custom_object: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': '45712aa7-85e3-4beb-85f7-b94a77cda196', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 12 Jul 2021 10:18:46 GMT', 'Content-Length': '406'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"sparkapplications.sparkoperator.k8s.io is forbidden: User \"system:serviceaccount:***:***-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace \"spark-apps\"","reason":"Forbidden","details":{"group":"sparkoperator.k8s.io","kind":"sparkapplications"},"code":403}

What you expected to happen: Kubernetes Airflow should schedule and run spark job using SparkKubernetesOperator.

How to reproduce it: Deploy Spark operator using helm on Kubernetes cluster. Deploy Airflow using helm on Kubernetes cluster. Deploy the above-mentioned application and Airflow DAG.

Anything else we need to know:-

I have already created service account:-

$ kubectl create serviceaccount spark

Given the service account the edit role on the cluster:-

$ kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=airflow:airflow-cluster --namespace=spark-apps
Based on error there's a permission issue with airflow:airflow-cluster service account's permissions. From last two commands I see that you created serviceaccount spark and next one is clusterrolebinding to service account airflow:airflow-cluster. Make sure that airflow-cluster service account exists in airflow namespace. Also you probably would like to create rolebinding and not a cluster role binding to grant permissions for the service account to only selected namespace.moonkotte
Another option is clusterrole - edit doesn't include API group sparkoperator.k8s.io and you will need to create a role manually. Please find roles examplesmoonkotte
Thanks for your suggestion. My issue has been resolved after granting the appropriate permissions to the service account on the airflow namespace.Jitendra Patel
You may consider posting your own answer and accept it since this may be helpful for other community members who may face the same issue.moonkotte

2 Answers


Here are kube cluster role resources. Create with kubectl -n <namespace> apply -f <filename.yaml>

# Role for spark-on-k8s-operator to create resources on cluster
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
  name: spark-cluster-cr
    rbac.authorization.kubeflow.org/aggregate-to-kubeflow-edit: "true"
  - apiGroups:
      - sparkoperator.k8s.io
      - sparkapplications
      - '*'
# Allow airflow-worker service account access for spark-on-k8s
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
  name: airflow-spark-crb
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: spark-cluster-cr
  - kind: ServiceAccount
    name: airflow-cluster
    namespace: airflow


  • The above is assuming the error message
    sparkapplications.sparkoperator.k8s.io is forbidden: User "system:serviceaccount:airflow:airflow-cluster\" cannot create resource \"sparkapplications\" in API group \"sparkoperator.k8s.io\" in the namespace spark-apps
    • Airflow namespace: airflow
    • Airflow serviceaccount: airflow-cluster
    • Spark jobs namespace: spark-apps
  • You should also have spark-on-k8s-operator installed
    • With helm --set webhook.enable=true if you want to use env in your spec.driver

Thanks for your suggestion. My issue has been resolved after granting the appropriate permissions to the service account on the airflow namespace.