Jump to content

Data Platform/Systems/Airflow/Kubernetes

From Wikitech

This page relates specifically to our Airflow instances deployed to Kubernetes, and their specificity. We assume that the airflow instance is deployed alongside a dedicated CloudnativePG cluster, running in the same namespace.

airflow
Attribute Value
Owner Data Platform SRE
Kubernetes Cluster dse-k8s-eqiad
Kubernetes Namespace airflow-test-k8s
Chart https://gerrit.wikimedia.org/r/plugins/gitiles/operations/deployment-charts/+/refs/heads/master/charts/airflow
Helmfiles https://gerrit.wikimedia.org/r/plugins/gitiles/operations/deployment-charts/+/refs/heads/master/helmfile.d/dse-k8s-services/airflow-test-k8s/helmfile.yaml
Docker image https://gitlab.wikimedia.org/repos/data-engineering/airflow/
Internal service DNS airflow-test-k8s.discovery.wmnet
Public service URL https://airflow-test-k8s.wikimedia.org
Logs https://logstash.wikimedia.org/app/dashboards#/view/7f6bc030-8166-11ef-9967-d396febc14fa?_g=h@c823129&_a=h@fa450d4
Metrics https://grafana.wikimedia.org/d/aca34072-ad28-4f08-8468-9a28f7b45c52/airflow-on-kubernetes?orgId=1
Monitors https://gerrit.wikimedia.org/r/plugins/gitiles/operations/alerts/+/master/team-data-platform/airflow-k8s.yaml
Application documentation https://airflow.apache.org/docs/apache-airflow/stable/index.html
Paging true
Deployment Phabricator ticket https://phabricator.wikimedia.org/T362788


Note: replace airflow-test-k8s by other instance names where appropriate.

Architecture

Airflow is deployed on Kubernetes, talks to a PG cluster deployed in the same k8s namespace. Airflow task logs, PG WALs and backups are all uploaded to S3. PG data is written to Ceph persistent volumes.
Simplified architecture diagram of an airflow deployment

Components

Airflow is deployed via the airflow chart. The release is composed of multiple Deployment resources:

  • the webserver
  • the scheduler
  • the kerberos ticket renewer
  • an envoy proxy, allowing task pods to connect to internal services via the service mesh
  • the hadoop shell, a container which sole purpose is to be exec-ed into to run yarn/hdfs ad-hoc commands

Executor

All Kubernetes instances use the KubernetesExecutor, which means that DAG tasks are executed as Kubernetes Pods.

DAGs deployment

For the moment, any Airlfow instance running on Kubernetes syncs up the main branch of airflow-dags every 5 minutes using https://github.com/kubernetes/git-sync, meaning that any merged MR should be reflected in Airflow in about 5 minutes.

In the near future, the model will be turned into a push vs a pull. When an MR is merged, the Gitlab CI will send a POST request to blunderbuss , which will trigger a sync of the main branch into the volume mounted by the airflow schedulers.

Even when that is in place, we'll still be able to use git-sync to synchronize feature branches in development instances.

Logging

The logs of the airflow components themselves are sent to our observability pipeline and are accessible through logstash. However, the DAG task logs themselves are uploaded to S3 after completion. Streaming the logs of an ongoing DAG task can be done from the web UI, and relies on the Kubernetes Logs API.

Security

Webserver authentication and authorization

Access to the webserver is OIDC authenticated, and the user role is derived from its LDAP groups. For example, SREs (members of the ops LDAP group) are automatically given the Admin role. The mapping can be customized per instance, so that we can define LDAP groups for per-instance admins and per-instance members.

API access

Access to the Airflow API will be Kerberos authenticated, meaning that:

  • services will be able to access the API by authenticating to Kerberos via their own Keytab
  • users will be able to access the API by authenticated to Kerberos via their password and kinit
Kerberos

We generate a keytab for each instance. It will be stored as a base64-encoded secret, and only mounted on the airflow-kerberos pod, in charge of obtaining (as well as regularly renewing) a TGT, itself mounted into every single pod that will need to communicate with Kerberised systems (aka the worker pods).

Note: the keytab is also mounted in the webserver pod, but is only to setup the Kerberised-API access at init time.

Kubernetes RBAC

When using the KubernetesExecutor, the scheduler needs to be able to perform CRUD operations on Pods, and the webserver needs to be able to tail Pod logs. As the user used deploy charts does not have permissions to create Role and RoleBinding resources, we deploy the chart with a specific user/role that can, called deploy-airflow.

UNIX user impersonation

Each airflow instance has a dedicated keytab, with first principal of the form <user>/airflow-<instance-name>.discovery.wmnet@WIKIMEDIA. This will ensure that any interaction with HDFS, Spark, etc, will impersonate the <user> user.

For example, the first principal of airflow-test-k8s instance is analytics/airflow-test-k8s.discovery.wmnet@WIKIMEDIA, which enables impersonation of the analytics user in Hadoop.

Database access

The airflow chart was designed to run alongside a CloudnativePG cluster running in the same namespace. However, it can be configured to use an "external" PG database, such as an-db1001.eqiad.wmnet for transitioning purposes. The ultimate goal is to have each instance run alongside its own PG cluster.

When configured to use a Cloudnative PG cluster, access to the DB goes through PGBouncer, instead of hitting PG directly. This, as described in the airflow documentation, was made to mitigate the fact that:

Airflow is known - especially in high-performance setup - to open many connections to metadata database. This might cause problems for Postgres resource usage, because in Postgres, each connection creates a new process and it makes Postgres resource-hungry when a lot of connections are opened. Therefore we recommend to use PGBouncer as database proxy for all Postgres production installations. PGBouncer can handle connection pooling from multiple components, but also in case you have remote database with potentially unstable connectivity, it will make your DB connectivity much more resilient to temporary network problems.

Connections

Connections are managed via helm values, under .Values.config.airflow.connections. As such, they are managed by a LocalFilesystemBackend secret manager, and will not be visible in the web UI.

Management DAGs

The Kubernetes Airflow instances come with built-in maintenance DAGs, performing actions such as:

  • removing task logs from S3 after they reach a certain age
  • expunging DB tables from data that has reached a certain age
  • removing obsolete Airflow DAG/Task lineage data from DataHub
  • ...

These DAGs are tagged with airflow_maintenance.

You can set the following Airflow variables in your release values, under config.airflow.variables, to configure the Airflow maintenance DAGs:

  • s3_log_retention_days (default value: 30): number of days of task logs to keep in S3
  • db_cleanup_tables  : a comma-separated list of tables that will be regularly expunged of old data, to keep the database as lean as possible
  • db_cleanup_retention_days: if specified along with db_cleanup_tables, specifies the number of days after which data will be cleaned from the these tables.

Operations

Moved to Data Platform/Systems/Airflow/Kubernetes/Operations

I'm getting paged

Pods are not running

If you're getting an alert or getting paged because the app isn't running, investigate if something in the application logs (see the checklist section) could explain the crash. In case of a recurring crash, the pod would be in CrashloopBackoff state in Kubernetes. To check whether this is the case, ssh to the deployment server and run the following commands

kube_env <namespace> dse-k8s-eqiad
kubectl get pods

Then you can tail the logs as needed. Feel free to refer to the log dashboard listed in the checklist.

If no pod at all is displayed, re-deploy the app by following the Kubernetes deployment instructions.

How to

Use the airflow CLI

brouberol@deploy2002:~$ kube_env airflow-test-k8s-deploy dse-k8s-eqiad
brouberol@deploy2002:~$ kubectl exec -it $(kubectl get pod -l app=airflow,component=webserver --no-headers -o custom-columns=":metadata.name") -c airflow-production -- airflow
Usage: airflow [-h] GROUP_OR_COMMAND ...

Positional Arguments:
  GROUP_OR_COMMAND

    Groups
      config         View configuration
      connections    Manage connections
      dags           Manage DAGs
      db             Database operations
      jobs           Manage jobs
      kubernetes     Tools to help run the KubernetesExecutor
      pools          Manage pools
      providers      Display providers
      roles          Manage roles
      tasks          Manage tasks
      users          Manage users
      variables      Manage variables

    Commands:
      cheat-sheet    Display cheat sheet
      dag-processor  Start a standalone Dag Processor instance
      info           Show information about current Airflow and environment
      kerberos       Start a kerberos ticket renewer
      plugins        Dump information about loaded plugins
      rotate-fernet-key
                     Rotate encrypted connection credentials and variables
      scheduler      Start a scheduler instance
      standalone     Run an all-in-one copy of Airflow
      sync-perm      Update permissions for existing roles and optionally DAGs
      triggerer      Start a triggerer instance
      version        Show the version
      webserver      Start a Airflow webserver instance

Options:
  -h, --help         show this help message and exit

airflow command error: the following arguments are required: GROUP_OR_COMMAND, see help above.
command terminated with exit code 2

Use the yarn CLI

brouberol@deploy2002:~$ kube_env airflow-test-k8s-deploy dse-k8s-eqiad
brouberol@deploy2002:~$ kubectl exec -it $(kubectl get pod -l app=airflow,component=hadoop-shell --no-headers -o custom-columns=":metadata.name") -- yarn

Usage: yarn [--config confdir] [COMMAND | CLASSNAME]
  CLASSNAME                             run the class named CLASSNAME
 or
  where COMMAND is one of:
  resourcemanager                       run the ResourceManager
                                        Use -format-state-store for deleting the RMStateStore.
                                        Use -remove-application-from-state-store <appId> for
                                            removing application from RMStateStore.
  nodemanager                           run a nodemanager on each slave
  timelinereader                        run the timeline reader server
  timelineserver                        run the timeline server
  rmadmin                               admin tools
  router                                run the Router daemon
  sharedcachemanager                    run the SharedCacheManager daemon
  scmadmin                              SharedCacheManager admin tools
  version                               print the version
  jar <jar>                             run a jar file
  application                           prints application(s)
                                        report/kill application
  applicationattempt                    prints applicationattempt(s)
                                        report
  container                             prints container(s) report
  node                                  prints node report(s)
  queue                                 prints queue information
  logs                                  dump container logs
  schedulerconf                         updates scheduler configuration
  classpath                             prints the class path needed to
                                        get the Hadoop jar and the
                                        required libraries
  cluster                               prints cluster information
  daemonlog                             get/set the log level for each
                                        daemon
  top                                   run cluster usage tool

Most commands print help when invoked w/o parameters.
command terminated with exit code 1

Use the hdfs CLI

brouberol@deploy2002:~$ kube_env airflow-test-k8s-deploy dse-k8s-eqiad
brouberol@deploy2002:~$ kubectl exec -it $(kubectl get pod -l app=airflow,component=hadoop-shell --no-headers -o custom-columns=":metadata.name") -- hdfs
Usage: hdfs [--config confdir] [--loglevel loglevel] COMMAND
       where COMMAND is one of:
  dfs                  run a filesystem command on the file systems supported in Hadoop.
  classpath            prints the classpath
  namenode -format     format the DFS filesystem
  secondarynamenode    run the DFS secondary namenode
  namenode             run the DFS namenode
  journalnode          run the DFS journalnode
  zkfc                 run the ZK Failover Controller daemon
  datanode             run a DFS datanode
  debug                run a Debug Admin to execute HDFS debug commands
  dfsadmin             run a DFS admin client
  dfsrouter            run the DFS router
  dfsrouteradmin       manage Router-based federation
  haadmin              run a DFS HA admin client
  fsck                 run a DFS filesystem checking utility
  balancer             run a cluster balancing utility
  jmxget               get JMX exported values from NameNode or DataNode.
  mover                run a utility to move block replicas across
                       storage types
  oiv                  apply the offline fsimage viewer to an fsimage
  oiv_legacy           apply the offline fsimage viewer to an legacy fsimage
  oev                  apply the offline edits viewer to an edits file
  fetchdt              fetch a delegation token from the NameNode
  getconf              get config values from configuration
  groups               get the groups which users belong to
  snapshotDiff         diff two snapshots of a directory or diff the
                       current directory contents with a snapshot
  lsSnapshottableDir   list all snapshottable dirs owned by the current user
						Use -help to see options
  portmap              run a portmap service
  nfs3                 run an NFS version 3 gateway
  cacheadmin           configure the HDFS cache
  crypto               configure HDFS encryption zones
  storagepolicies      list/get/set block storage policies
  version              print the version

Most commands print help when invoked w/o parameters.