Mounting DAGs

DAGs can be mounted by using a ConfigMap or git-sync. This is best illustrated with an example of each, shown in the sections below.

Via ConfigMap

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: cm-dag (1)
data:
  test_airflow_dag.py: | (2)
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.operators.dummy import DummyOperator

    with DAG(
        dag_id='test_airflow_dag',
        schedule_interval='0 0 * * *',
        start_date=datetime(2021, 1, 1),
        catchup=False,
        dagrun_timeout=timedelta(minutes=60),
        tags=['example', 'example2'],
        params={"example_key": "example_value"},
    ) as dag:
        run_this_last = DummyOperator(
            task_id='run_this_last',
        )

        # [START howto_operator_bash]
        run_this = BashOperator(
            task_id='run_after_loop',
            bash_command='echo 1',
        )
        # [END howto_operator_bash]

        run_this >> run_this_last

        for i in range(3):
            task = BashOperator(
                task_id='runme_' + str(i),
                bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
            )
            task >> run_this

        # [START howto_operator_bash_template]
        also_run_this = BashOperator(
            task_id='also_run_this',
            bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
        )
        # [END howto_operator_bash_template]
        also_run_this >> run_this_last

    # [START howto_operator_bash_skip]
    this_will_skip = BashOperator(
        task_id='this_will_skip',
        bash_command='echo "hello world"; exit 99;',
        dag=dag,
    )
    # [END howto_operator_bash_skip]
    this_will_skip >> run_this_last

    if __name__ == "__main__":
        dag.cli()
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
  name: airflow
spec:
  image:
    productVersion: 2.9.3
  clusterConfig:
    loadExamples: false
    exposeConfig: false
    credentialsSecret: simple-airflow-credentials
    volumes:
      - name: cm-dag (3)
        configMap:
          name: cm-dag (4)
    volumeMounts:
      - name: cm-dag (5)
        mountPath: /dags/test_airflow_dag.py (6)
        subPath: test_airflow_dag.py (7)
  webservers:
    roleGroups:
      default:
        envOverrides:
          AIRFLOW__CORE__DAGS_FOLDER: "/dags" (8)
        replicas: 1
  celeryExecutors:
    roleGroups:
      default:
        envOverrides:
          AIRFLOW__CORE__DAGS_FOLDER: "/dags" (8)
        replicas: 2
  schedulers:
    roleGroups:
      default:
        envOverrides:
          AIRFLOW__CORE__DAGS_FOLDER: "/dags" (8)
        replicas: 1
1 The name of the ConfigMap
2 The name of the DAG (this is a renamed copy of the example_bash_operator.py from the Airflow examples)
3 The volume backed by the ConfigMap
4 The name of the ConfigMap referenced by the Airflow cluster
5 The name of the mounted volume
6 The path of the mounted resource. Note that should map to a single DAG.
7 The resource has to be defined using subPath: this is to prevent the versioning of ConfigMap elements which may cause a conflict with how Airflow propagates DAGs between its components.
8 If the mount path described above is anything other than the standard location (the default is $AIRFLOW_HOME/dags), then the location should be defined using the relevant environment variable.
If a DAG mounted via ConfigMap consists of modularized files then using the standard location is mandatory as Python uses this as a "root" directory when looking for referenced files.

The advantage of this approach is that DAGs are provided "in-line". However, handling multiple DAGs this way becomes cumbersome, as each must be mapped individually. For multiple DAGs, it is easier to expose them via a mounted volume, as shown below.

Via git-sync

git-sync is a command that pulls a git repository into a local directory and is supplied as a sidecar container for use within Kubernetes. The Stackable Airflow images already ship with git-sync included, and the operator takes care of calling the tool and mounting volumes, so that only the repository and synchronization details are required:

git-sync usage example
---
apiVersion: airflow.stackable.tech/v1alpha1
kind: AirflowCluster
metadata:
  name: airflow
spec:
  image:
    productVersion: "2.9.3"
  clusterConfig:
    loadExamples: false
    exposeConfig: false
    credentialsSecret: test-airflow-credentials (1)
    dagsGitSync: (2)
      - repo: https://github.com/stackabletech/airflow-operator (3)
        branch: "main" (4)
        gitFolder: "tests/templates/kuttl/mount-dags-gitsync/dags" (5)
        depth: 10 (6)
        wait: 20 (7)
        credentialsSecret: git-credentials (8)
        gitSyncConf: (9)
          --rev: HEAD (10)
          # --rev: git-sync-tag # N.B. tag must be covered by "depth" (the number of commits to clone)
          # --rev: 39ee3598bd9946a1d958a448c9f7d3774d7a8043 # N.B. commit must be covered by "depth"
          --git-config: http.sslCAInfo:/tmp/ca-cert/ca.crt (11)
  webservers:
    ...
1 A Secret used for accessing database and admin user details (included here to illustrate where different credential secrets are defined)
2 The git-gync configuration block that contains list of git-sync elements
3 The repository to clone (required)
4 The branch name (defaults to main)
5 The location of the DAG folder, relative to the synced repository root (required)
6 The depth of syncing i.e. the number of commits to clone (defaults to 1)
7 The synchronisation interval in seconds (defaults to 20 seconds)
8 The name of the Secret used to access the repository if it is not public. This should include two fields: user and password (which can be either a password — which is not recommended — or a GitHub token, as described here)
9 A map of optional configuration settings that are listed in this configuration section (and the ones that follow on that link)
10 An example showing how to specify a target revision (the default is HEAD). The revision can also be a tag or a commit, though this assumes that the target hash is contained within the number of commits specified by depth. If a tag or commit hash is specified, then git-sync recognizes this and does not perform further cloning.
11 Git-sync settings can be provided inline, although some of these (--dest, --root) are specified internally in the operator and are ignored if provided by the user. Git-config settings can also be specified, although a warning is logged if safe.directory is specified as this is defined internally, and should not be defined by the user.
The example shows a list of git-sync definitions, with a single element. This is to avoid breaking-changes in future releases. Currently, only one such git-sync definition is considered and processed.
git-sync can be used with DAGs that make use of Python modules, as Python is configured to use the git-sync target folder as the "root" location when looking for referenced files. See the Applying Custom Resources example for more details.