From 02674ca211c71d263314c9881476b65301c99f39 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 29 Aug 2025 13:05:51 +0200 Subject: [PATCH 1/9] wip: for 3.x stabilisation changes --- deploy/helm/airflow-operator/crds/crds.yaml | 830 ++++++++++++++++++ .../operator-binary/src/airflow_controller.rs | 5 +- rust/operator-binary/src/crd/mod.rs | 99 ++- rust/operator-binary/src/env_vars.rs | 99 ++- rust/operator-binary/src/operations/pdb.rs | 10 + tests/templates/kuttl/smoke/02-s3-secret.yaml | 25 + tests/templates/kuttl/smoke/03-assert.yaml | 20 + .../templates/kuttl/smoke/03-setup-minio.yaml | 55 ++ .../kuttl/smoke/04-prepare-bucket.yaml.j2 | 8 + .../smoke/40-install-airflow-cluster.yaml.j2 | 26 + .../smoke/helm-bitnami-minio-values.yaml | 47 + 11 files changed, 1175 insertions(+), 49 deletions(-) create mode 100644 tests/templates/kuttl/smoke/02-s3-secret.yaml create mode 100644 tests/templates/kuttl/smoke/03-assert.yaml create mode 100644 tests/templates/kuttl/smoke/03-setup-minio.yaml create mode 100644 tests/templates/kuttl/smoke/04-prepare-bucket.yaml.j2 create mode 100644 tests/templates/kuttl/smoke/helm-bitnami-minio-values.yaml diff --git a/deploy/helm/airflow-operator/crds/crds.yaml b/deploy/helm/airflow-operator/crds/crds.yaml index 79612fca..de6cfa08 100644 --- a/deploy/helm/airflow-operator/crds/crds.yaml +++ b/deploy/helm/airflow-operator/crds/crds.yaml @@ -881,6 +881,421 @@ spec: type: object x-kubernetes-preserve-unknown-fields: true type: object + processors: + description: The `processor` role runs the DAG processor routine for DAG preparation. + nullable: true + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + affinity: + default: + nodeAffinity: null + nodeSelector: null + podAffinity: null + podAntiAffinity: null + description: These configuration settings control [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement). + properties: + nodeAffinity: + description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + nodeSelector: + additionalProperties: + type: string + description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + podAffinity: + description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + podAntiAffinity: + description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + gracefulShutdownTimeout: + description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. + nullable: true + type: string + logging: + default: + containers: {} + enableVectorAgent: null + description: Logging configuration, learn more in the [logging concept documentation](https://docs.stackable.tech/home/nightly/concepts/logging). + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + description: Log configuration per container. + type: object + enableVectorAgent: + description: Wether or not to deploy a container with the Vector log agent. + nullable: true + type: boolean + type: object + resources: + default: + cpu: + max: null + min: null + memory: + limit: null + runtimeLimits: {} + storage: {} + description: Resource usage is configured here, this includes CPU usage, memory usage and disk storage usage, if this role needs any. + properties: + cpu: + default: + max: null + min: null + properties: + max: + description: The maximum amount of CPU cores that can be requested by Pods. Equivalent to the `limit` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + min: + description: The minimal amount of CPU cores that Pods need to run. Equivalent to the `request` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + type: object + memory: + properties: + limit: + description: 'The maximum amount of memory that should be available to the Pod. Specified as a byte [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/), which means these suffixes are supported: E, P, T, G, M, k. You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. For example, the following represent roughly the same value: `128974848, 129e6, 129M, 128974848000m, 123Mi`' + nullable: true + type: string + runtimeLimits: + description: Additional options that can be specified. + type: object + type: object + storage: + type: object + type: object + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + description: The `configOverrides` can be used to configure properties in product config files that are not exposed in the CRD. Read the [config overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#config-overrides) and consult the operator specific usage guide documentation for details on the available config files and settings for the specific product. + type: object + envOverrides: + additionalProperties: + type: string + default: {} + description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' + type: object + podOverrides: + default: {} + description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. + type: object + x-kubernetes-preserve-unknown-fields: true + roleConfig: + default: + podDisruptionBudget: + enabled: true + maxUnavailable: null + description: This is a product-agnostic RoleConfig, which is sufficient for most of the products. + properties: + podDisruptionBudget: + default: + enabled: true + maxUnavailable: null + description: |- + This struct is used to configure: + + 1. If PodDisruptionBudgets are created by the operator 2. The allowed number of Pods to be unavailable (`maxUnavailable`) + + Learn more in the [allowed Pod disruptions documentation](https://docs.stackable.tech/home/nightly/concepts/operations/pod_disruptions). + properties: + enabled: + default: true + description: Whether a PodDisruptionBudget should be written out for this role. Disabling this enables you to specify your own - custom - one. Defaults to true. + type: boolean + maxUnavailable: + description: The number of Pods that are allowed to be down because of voluntary disruptions. If you don't explicitly set this, the operator will use a sane default based upon knowledge about the individual product. + format: uint16 + minimum: 0.0 + nullable: true + type: integer + type: object + type: object + roleGroups: + additionalProperties: + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + affinity: + default: + nodeAffinity: null + nodeSelector: null + podAffinity: null + podAntiAffinity: null + description: These configuration settings control [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement). + properties: + nodeAffinity: + description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + nodeSelector: + additionalProperties: + type: string + description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + podAffinity: + description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + podAntiAffinity: + description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + gracefulShutdownTimeout: + description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. + nullable: true + type: string + logging: + default: + containers: {} + enableVectorAgent: null + description: Logging configuration, learn more in the [logging concept documentation](https://docs.stackable.tech/home/nightly/concepts/logging). + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + description: Log configuration per container. + type: object + enableVectorAgent: + description: Wether or not to deploy a container with the Vector log agent. + nullable: true + type: boolean + type: object + resources: + default: + cpu: + max: null + min: null + memory: + limit: null + runtimeLimits: {} + storage: {} + description: Resource usage is configured here, this includes CPU usage, memory usage and disk storage usage, if this role needs any. + properties: + cpu: + default: + max: null + min: null + properties: + max: + description: The maximum amount of CPU cores that can be requested by Pods. Equivalent to the `limit` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + min: + description: The minimal amount of CPU cores that Pods need to run. Equivalent to the `request` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + type: object + memory: + properties: + limit: + description: 'The maximum amount of memory that should be available to the Pod. Specified as a byte [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/), which means these suffixes are supported: E, P, T, G, M, k. You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. For example, the following represent roughly the same value: `128974848, 129e6, 129M, 128974848000m, 123Mi`' + nullable: true + type: string + runtimeLimits: + description: Additional options that can be specified. + type: object + type: object + storage: + type: object + type: object + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + description: The `configOverrides` can be used to configure properties in product config files that are not exposed in the CRD. Read the [config overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#config-overrides) and consult the operator specific usage guide documentation for details on the available config files and settings for the specific product. + type: object + envOverrides: + additionalProperties: + type: string + default: {} + description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' + type: object + podOverrides: + default: {} + description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. + type: object + x-kubernetes-preserve-unknown-fields: true + replicas: + format: uint16 + minimum: 0.0 + nullable: true + type: integer + type: object + type: object + required: + - roleGroups + type: object schedulers: description: The `scheduler` is responsible for triggering jobs and persisting their metadata to the backend database. Jobs are scheduled on the workers/executors. nullable: true @@ -1296,6 +1711,421 @@ spec: required: - roleGroups type: object + triggerers: + description: The `triggerer` role runs the triggerer process for use with deferrable DAG operators. + nullable: true + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + affinity: + default: + nodeAffinity: null + nodeSelector: null + podAffinity: null + podAntiAffinity: null + description: These configuration settings control [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement). + properties: + nodeAffinity: + description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + nodeSelector: + additionalProperties: + type: string + description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + podAffinity: + description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + podAntiAffinity: + description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + gracefulShutdownTimeout: + description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. + nullable: true + type: string + logging: + default: + containers: {} + enableVectorAgent: null + description: Logging configuration, learn more in the [logging concept documentation](https://docs.stackable.tech/home/nightly/concepts/logging). + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + description: Log configuration per container. + type: object + enableVectorAgent: + description: Wether or not to deploy a container with the Vector log agent. + nullable: true + type: boolean + type: object + resources: + default: + cpu: + max: null + min: null + memory: + limit: null + runtimeLimits: {} + storage: {} + description: Resource usage is configured here, this includes CPU usage, memory usage and disk storage usage, if this role needs any. + properties: + cpu: + default: + max: null + min: null + properties: + max: + description: The maximum amount of CPU cores that can be requested by Pods. Equivalent to the `limit` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + min: + description: The minimal amount of CPU cores that Pods need to run. Equivalent to the `request` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + type: object + memory: + properties: + limit: + description: 'The maximum amount of memory that should be available to the Pod. Specified as a byte [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/), which means these suffixes are supported: E, P, T, G, M, k. You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. For example, the following represent roughly the same value: `128974848, 129e6, 129M, 128974848000m, 123Mi`' + nullable: true + type: string + runtimeLimits: + description: Additional options that can be specified. + type: object + type: object + storage: + type: object + type: object + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + description: The `configOverrides` can be used to configure properties in product config files that are not exposed in the CRD. Read the [config overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#config-overrides) and consult the operator specific usage guide documentation for details on the available config files and settings for the specific product. + type: object + envOverrides: + additionalProperties: + type: string + default: {} + description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' + type: object + podOverrides: + default: {} + description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. + type: object + x-kubernetes-preserve-unknown-fields: true + roleConfig: + default: + podDisruptionBudget: + enabled: true + maxUnavailable: null + description: This is a product-agnostic RoleConfig, which is sufficient for most of the products. + properties: + podDisruptionBudget: + default: + enabled: true + maxUnavailable: null + description: |- + This struct is used to configure: + + 1. If PodDisruptionBudgets are created by the operator 2. The allowed number of Pods to be unavailable (`maxUnavailable`) + + Learn more in the [allowed Pod disruptions documentation](https://docs.stackable.tech/home/nightly/concepts/operations/pod_disruptions). + properties: + enabled: + default: true + description: Whether a PodDisruptionBudget should be written out for this role. Disabling this enables you to specify your own - custom - one. Defaults to true. + type: boolean + maxUnavailable: + description: The number of Pods that are allowed to be down because of voluntary disruptions. If you don't explicitly set this, the operator will use a sane default based upon knowledge about the individual product. + format: uint16 + minimum: 0.0 + nullable: true + type: integer + type: object + type: object + roleGroups: + additionalProperties: + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + affinity: + default: + nodeAffinity: null + nodeSelector: null + podAffinity: null + podAntiAffinity: null + description: These configuration settings control [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement). + properties: + nodeAffinity: + description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + nodeSelector: + additionalProperties: + type: string + description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + podAffinity: + description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + podAntiAffinity: + description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + gracefulShutdownTimeout: + description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. + nullable: true + type: string + logging: + default: + containers: {} + enableVectorAgent: null + description: Logging configuration, learn more in the [logging concept documentation](https://docs.stackable.tech/home/nightly/concepts/logging). + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + description: Log configuration per container. + type: object + enableVectorAgent: + description: Wether or not to deploy a container with the Vector log agent. + nullable: true + type: boolean + type: object + resources: + default: + cpu: + max: null + min: null + memory: + limit: null + runtimeLimits: {} + storage: {} + description: Resource usage is configured here, this includes CPU usage, memory usage and disk storage usage, if this role needs any. + properties: + cpu: + default: + max: null + min: null + properties: + max: + description: The maximum amount of CPU cores that can be requested by Pods. Equivalent to the `limit` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + min: + description: The minimal amount of CPU cores that Pods need to run. Equivalent to the `request` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + type: object + memory: + properties: + limit: + description: 'The maximum amount of memory that should be available to the Pod. Specified as a byte [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/), which means these suffixes are supported: E, P, T, G, M, k. You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. For example, the following represent roughly the same value: `128974848, 129e6, 129M, 128974848000m, 123Mi`' + nullable: true + type: string + runtimeLimits: + description: Additional options that can be specified. + type: object + type: object + storage: + type: object + type: object + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + description: The `configOverrides` can be used to configure properties in product config files that are not exposed in the CRD. Read the [config overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#config-overrides) and consult the operator specific usage guide documentation for details on the available config files and settings for the specific product. + type: object + envOverrides: + additionalProperties: + type: string + default: {} + description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' + type: object + podOverrides: + default: {} + description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. + type: object + x-kubernetes-preserve-unknown-fields: true + replicas: + format: uint16 + minimum: 0.0 + nullable: true + type: integer + type: object + type: object + required: + - roleGroups + type: object webservers: description: The `webserver` role provides the main UI for user interaction. nullable: true diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index bb2435eb..6d52a3ad 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -1168,7 +1168,10 @@ fn build_server_rolegroup_statefulset( AirflowRole::Scheduler => { "OrderedReady" // Scheduler pods should start after another, since part of their startup phase is initializing the database, see crd/src/lib.rs } - AirflowRole::Webserver | AirflowRole::Worker => "Parallel", + AirflowRole::Webserver + | AirflowRole::Worker + | AirflowRole::Processor + | AirflowRole::Triggerer => "Parallel", } .to_string(), ), diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index db7cd806..28949905 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -217,6 +217,14 @@ pub mod versioned { #[serde(flatten)] pub executor: AirflowExecutor, + + /// The `processor` role runs the DAG processor routine for DAG preparation. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub processors: Option>, + + /// The `triggerer` role runs the triggerer process for use with deferrable DAG operators. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub triggerers: Option>, } #[derive(Clone, Deserialize, Debug, JsonSchema, PartialEq, Serialize)] @@ -343,7 +351,10 @@ impl v1alpha1::AirflowCluster { pub fn group_listener_name(&self, role: &AirflowRole) -> Option { match role { AirflowRole::Webserver => Some(role_service_name(&self.name_any(), &role.to_string())), - AirflowRole::Scheduler | AirflowRole::Worker => None, + AirflowRole::Scheduler + | AirflowRole::Worker + | AirflowRole::Processor + | AirflowRole::Triggerer => None, } } @@ -357,6 +368,8 @@ impl v1alpha1::AirflowCluster { .to_owned() .map(extract_role_from_webserver_config), AirflowRole::Scheduler => self.spec.schedulers.to_owned(), + AirflowRole::Processor => self.spec.processors.to_owned(), + AirflowRole::Triggerer => self.spec.triggerers.to_owned(), AirflowRole::Worker => { if let AirflowExecutor::CeleryExecutor { config } = &self.spec.executor { Some(config.clone()) @@ -413,6 +426,24 @@ impl v1alpha1::AirflowCluster { roles: AirflowRole::roles(), })? } + AirflowRole::Processor => { + self.spec + .processors + .as_ref() + .context(UnknownAirflowRoleSnafu { + role: role.to_string(), + roles: AirflowRole::roles(), + })? + } + AirflowRole::Triggerer => { + self.spec + .triggerers + .as_ref() + .context(UnknownAirflowRoleSnafu { + role: role.to_string(), + roles: AirflowRole::roles(), + })? + } }; // Retrieve role resource config @@ -561,6 +592,12 @@ pub enum AirflowRole { #[strum(serialize = "worker")] Worker, + + #[strum(serialize = "processor")] + Processor, + + #[strum(serialize = "triggerer")] + Triggerer, } impl AirflowRole { @@ -622,10 +659,27 @@ impl AirflowRole { command.extend(vec![ "prepare_signal_handlers".to_string(), container_debug_command(), - "airflow dag-processor &".to_string(), "airflow scheduler &".to_string(), ]); + if airflow.spec.processors.is_none() { + // If no processors role has been specified, the + // process needs to be included with the scheduler + // (with 3.x there is no longer the possibility of + // starting it as a subprocess, so it has to be + // explicitly started *somewhere*) + command.extend(vec!["airflow dag-processor &".to_string()]); + } } + AirflowRole::Processor => command.extend(vec![ + "prepare_signal_handlers".to_string(), + container_debug_command(), + "airflow dag-processor &".to_string(), + ]), + AirflowRole::Triggerer => command.extend(vec![ + "prepare_signal_handlers".to_string(), + container_debug_command(), + "airflow triggerer &".to_string(), + ]), AirflowRole::Worker => command.extend(vec![ "prepare_signal_handlers".to_string(), container_debug_command(), @@ -668,6 +722,16 @@ impl AirflowRole { "airflow scheduler &".to_string(), ]); } + AirflowRole::Processor => command.extend(vec![ + "prepare_signal_handlers".to_string(), + container_debug_command(), + "airflow dag-processor &".to_string(), + ]), + AirflowRole::Triggerer => command.extend(vec![ + "prepare_signal_handlers".to_string(), + container_debug_command(), + "airflow triggerer &".to_string(), + ]), AirflowRole::Worker => command.extend(vec![ "prepare_signal_handlers".to_string(), container_debug_command(), @@ -724,6 +788,8 @@ impl AirflowRole { AirflowRole::Webserver => Some(HTTP_PORT), AirflowRole::Scheduler => None, AirflowRole::Worker => None, + AirflowRole::Processor => None, + AirflowRole::Triggerer => None, } } @@ -742,7 +808,7 @@ impl AirflowRole { .webservers .to_owned() .map(|webserver| webserver.role_config.listener_class), - Self::Worker | Self::Scheduler => None, + Self::Worker | Self::Scheduler | Self::Processor | Self::Triggerer => None, } } } @@ -878,9 +944,10 @@ impl AirflowConfig { logging: product_logging::spec::default_logging(), affinity: get_affinity(cluster_name, role), graceful_shutdown_timeout: Some(match role { - AirflowRole::Webserver | AirflowRole::Scheduler => { - DEFAULT_AIRFLOW_GRACEFUL_SHUTDOWN_TIMEOUT - } + AirflowRole::Webserver + | AirflowRole::Scheduler + | AirflowRole::Processor + | AirflowRole::Triggerer => DEFAULT_AIRFLOW_GRACEFUL_SHUTDOWN_TIMEOUT, AirflowRole::Worker => DEFAULT_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT, }), } @@ -953,6 +1020,26 @@ fn default_resources(role: &AirflowRole) -> ResourcesFragment ( + CpuLimitsFragment { + min: Some(Quantity("1".to_owned())), + max: Some(Quantity("2".to_owned())), + }, + MemoryLimitsFragment { + limit: Some(Quantity("1Gi".to_owned())), + runtime_limits: NoRuntimeLimitsFragment {}, + }, + ), + AirflowRole::Triggerer => ( + CpuLimitsFragment { + min: Some(Quantity("1".to_owned())), + max: Some(Quantity("2".to_owned())), + }, + MemoryLimitsFragment { + limit: Some(Quantity("1Gi".to_owned())), + runtime_limits: NoRuntimeLimitsFragment {}, + }, + ), }; ResourcesFragment { diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index 7e70491f..44e7ba05 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -15,8 +15,8 @@ use stackable_operator::{ use crate::{ crd::{ - AirflowConfig, AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR, - STACKABLE_LOG_DIR, TEMPLATE_LOCATION, TEMPLATE_NAME, + AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR, STACKABLE_LOG_DIR, + TEMPLATE_LOCATION, TEMPLATE_NAME, authentication::{ AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved, }, @@ -61,7 +61,7 @@ const PYTHONPATH: &str = "PYTHONPATH"; /// This key is only intended for use during experimental support and will /// be replaced with a secret at a later stage. See the issue covering /// this at . -const JWT_KEY: &str = "ThisKeyIsNotIntendedForProduction!"; +//const JWT_KEY: &str = "ThisKeyIsNotIntendedForProduction!"; #[derive(Snafu, Debug)] pub enum Error { @@ -86,57 +86,53 @@ pub fn build_airflow_statefulset_envs( resolved_product_image: &ResolvedProductImage, ) -> Result, Error> { let mut env: BTreeMap = BTreeMap::new(); + let secret = airflow.spec.cluster_config.credentials_secret.as_str(); env.extend(static_envs(git_sync_resources)); - add_version_specific_env_vars(airflow, airflow_role, resolved_product_image, &mut env); - // environment variables let env_vars = rolegroup_config.get(&PropertyNameKind::Env); - let secret_prop = - env_vars.and_then(|vars| vars.get(AirflowConfig::CREDENTIALS_SECRET_PROPERTY)); + add_version_specific_env_vars(airflow, airflow_role, resolved_product_image, &mut env); - if let Some(secret) = secret_prop { + env.insert( + AIRFLOW_WEBSERVER_SECRET_KEY.into(), + // The secret key is used to run the webserver flask app and also used to authorize + // requests to Celery workers when logs are retrieved. + env_var_from_secret( + AIRFLOW_WEBSERVER_SECRET_KEY, + secret, + "connections.secretKey", + ), + ); + env.insert( + AIRFLOW_DATABASE_SQL_ALCHEMY_CONN.into(), + env_var_from_secret( + AIRFLOW_DATABASE_SQL_ALCHEMY_CONN, + secret, + "connections.sqlalchemyDatabaseUri", + ), + ); + + // Redis is only needed when celery executors are used + // see https://github.com/stackabletech/airflow-operator/issues/424 for details + if matches!(executor, AirflowExecutor::CeleryExecutor { .. }) { env.insert( - AIRFLOW_WEBSERVER_SECRET_KEY.into(), - // The secret key is used to run the webserver flask app and also used to authorize - // requests to Celery workers when logs are retrieved. + AIRFLOW_CELERY_RESULT_BACKEND.into(), env_var_from_secret( - AIRFLOW_WEBSERVER_SECRET_KEY, + AIRFLOW_CELERY_RESULT_BACKEND, secret, - "connections.secretKey", + "connections.celeryResultBackend", ), ); env.insert( - AIRFLOW_DATABASE_SQL_ALCHEMY_CONN.into(), + AIRFLOW_CELERY_BROKER_URL.into(), env_var_from_secret( - AIRFLOW_DATABASE_SQL_ALCHEMY_CONN, + AIRFLOW_CELERY_BROKER_URL, secret, - "connections.sqlalchemyDatabaseUri", + "connections.celeryBrokerUrl", ), ); - - // Redis is only needed when celery executors are used - // see https://github.com/stackabletech/airflow-operator/issues/424 for details - if matches!(executor, AirflowExecutor::CeleryExecutor { .. }) { - env.insert( - AIRFLOW_CELERY_RESULT_BACKEND.into(), - env_var_from_secret( - AIRFLOW_CELERY_RESULT_BACKEND, - secret, - "connections.celeryResultBackend", - ), - ); - env.insert( - AIRFLOW_CELERY_BROKER_URL.into(), - env_var_from_secret( - AIRFLOW_CELERY_BROKER_URL, - secret, - "connections.celeryBrokerUrl", - ), - ); - } } let dags_folder = get_dags_folder(git_sync_resources); @@ -454,6 +450,8 @@ fn add_version_specific_env_vars( resolved_product_image: &ResolvedProductImage, env: &mut BTreeMap, ) { + let secret = airflow.spec.cluster_config.credentials_secret.as_str(); + if resolved_product_image.product_version.starts_with("3.") { env.extend(execution_server_env_vars(airflow)); env.insert( @@ -483,13 +481,17 @@ fn add_version_specific_env_vars( // See issue : // later it will be accessed from a secret to avoid cluster restarts // being triggered by an operator restart. + // env.insert( + // "AIRFLOW__API_AUTH__JWT_SECRET".into(), + // EnvVar { + // name: "AIRFLOW__API_AUTH__JWT_SECRET".into(), + // value: Some(JWT_KEY.into()), + // ..Default::default() + // }, + // ); env.insert( "AIRFLOW__API_AUTH__JWT_SECRET".into(), - EnvVar { - name: "AIRFLOW__API_AUTH__JWT_SECRET".into(), - value: Some(JWT_KEY.into()), - ..Default::default() - }, + env_var_from_secret("AIRFLOW__API_AUTH__JWT_SECRET", secret, "jwt.key"), ); if airflow_role == &AirflowRole::Webserver { // Sometimes a race condition can arise when both scheduler and @@ -527,6 +529,19 @@ fn add_version_specific_env_vars( ..Default::default() }, ); + if airflow.spec.processors.is_some() { + // In airflow 2.x the dag-processor can optionally be started as a + // standalone process (rather then as a scheduler subprocess), + // governed by this env-var + env.insert( + "AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR".into(), + EnvVar { + name: "AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR".into(), + value: Some("True".into()), + ..Default::default() + }, + ); + } } } diff --git a/rust/operator-binary/src/operations/pdb.rs b/rust/operator-binary/src/operations/pdb.rs index 9bfdd0aa..eceec248 100644 --- a/rust/operator-binary/src/operations/pdb.rs +++ b/rust/operator-binary/src/operations/pdb.rs @@ -37,6 +37,8 @@ pub async fn add_pdbs( let max_unavailable = pdb.max_unavailable.unwrap_or(match role { AirflowRole::Scheduler => max_unavailable_schedulers(), AirflowRole::Webserver => max_unavailable_webservers(), + AirflowRole::Processor => max_unavailable_processors(), + AirflowRole::Triggerer => max_unavailable_triggerers(), AirflowRole::Worker => match airflow.spec.executor { AirflowExecutor::CeleryExecutor { .. } => max_unavailable_workers(), AirflowExecutor::KubernetesExecutor { .. } => { @@ -77,3 +79,11 @@ fn max_unavailable_workers() -> u16 { fn max_unavailable_webservers() -> u16 { 1 } + +fn max_unavailable_processors() -> u16 { + 1 +} + +fn max_unavailable_triggerers() -> u16 { + 1 +} diff --git a/tests/templates/kuttl/smoke/02-s3-secret.yaml b/tests/templates/kuttl/smoke/02-s3-secret.yaml new file mode 100644 index 00000000..2898ff1a --- /dev/null +++ b/tests/templates/kuttl/smoke/02-s3-secret.yaml @@ -0,0 +1,25 @@ +--- +apiVersion: v1 +kind: Secret +metadata: + name: minio-credentials + labels: + secrets.stackable.tech/class: spark-pi-private-s3-credentials-class +timeout: 240 +stringData: + accessKey: minioAccessKey + secretKey: minioSecretKey + # The following two entries are used by the Bitnami chart for MinIO to + # set up credentials for accessing buckets managed by the MinIO tenant. + root-user: minioAccessKey + root-password: minioSecretKey +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: SecretClass +metadata: + name: spark-pi-private-s3-credentials-class +spec: + backend: + k8sSearch: + searchNamespace: + pod: {} diff --git a/tests/templates/kuttl/smoke/03-assert.yaml b/tests/templates/kuttl/smoke/03-assert.yaml new file mode 100644 index 00000000..fbbea3bd --- /dev/null +++ b/tests/templates/kuttl/smoke/03-assert.yaml @@ -0,0 +1,20 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 900 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: test-minio +status: + readyReplicas: 1 +--- +apiVersion: v1 +kind: Pod +metadata: + name: minio-client + labels: + app: minio-client +status: + phase: Running diff --git a/tests/templates/kuttl/smoke/03-setup-minio.yaml b/tests/templates/kuttl/smoke/03-setup-minio.yaml new file mode 100644 index 00000000..42571093 --- /dev/null +++ b/tests/templates/kuttl/smoke/03-setup-minio.yaml @@ -0,0 +1,55 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install test-minio + --namespace $NAMESPACE + --version 14.6.16 + -f helm-bitnami-minio-values.yaml + --repo https://charts.bitnami.com/bitnami minio + timeout: 240 +--- +apiVersion: v1 +kind: Pod +metadata: + name: minio-client + labels: + app: minio-client +spec: + selector: + matchLabels: + app: minio-client + restartPolicy: Never + containers: + - name: minio-client + image: docker.io/bitnamilegacy/minio-client:2024-debian-12 + command: ["bash", "-c", "sleep infinity"] + stdin: true + tty: true + resources: + requests: + memory: 1Gi + cpu: "1" + limits: + memory: 1Gi + cpu: "2" + env: + - name: MINIO_SERVER_ACCESS_KEY + valueFrom: + secretKeyRef: + name: minio-credentials + key: root-user + optional: false + - name: MINIO_SERVER_SECRET_KEY + valueFrom: + secretKeyRef: + name: minio-credentials + key: root-password + optional: false + - name: MINIO_SERVER_HOST + value: test-minio + - name: MINIO_SERVER_PORT_NUMBER + value: "9000" + - name: MINIO_SERVER_SCHEME + value: http diff --git a/tests/templates/kuttl/smoke/04-prepare-bucket.yaml.j2 b/tests/templates/kuttl/smoke/04-prepare-bucket.yaml.j2 new file mode 100644 index 00000000..4e8cc30d --- /dev/null +++ b/tests/templates/kuttl/smoke/04-prepare-bucket.yaml.j2 @@ -0,0 +1,8 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + # give minio enough time to start + - command: sleep 10 + - command: kubectl exec -n $NAMESPACE minio-client -- sh -c 'mc alias set test-minio http://test-minio:9000 $$MINIO_SERVER_ACCESS_KEY $$MINIO_SERVER_SECRET_KEY' + - command: kubectl exec -n $NAMESPACE minio-client -- mc mb test-minio/my-bucket diff --git a/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 index a4793606..f7d4db03 100644 --- a/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 @@ -15,6 +15,7 @@ stringData: adminUser.lastname: Admin adminUser.email: airflow@airflow.com adminUser.password: airflow + jwt.key: thisISaJwTSECRET_1234 connections.secretKey: thisISaSECRET_1234 connections.sqlalchemyDatabaseUri: postgresql+psycopg2://airflow:airflow@airflow-postgresql/airflow {% if test_scenario['values']['executor'] == 'celery' %} @@ -61,6 +62,12 @@ spec: webserver_config.py: EXPERIMENTAL_FILE_HEADER: | COMMON_HEADER_VAR = "group-value" + envOverrides: &envOverrides + AIRFLOW_CONN_MINIO_CONN: "aws://minioAccessKey:minioSecretKey@/?endpoint_url=http%3A%2F%2Ftest-minio%3A9000" + AIRFLOW__LOGGING__REMOTE_LOGGING: "True" + AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: s3://my-bucket/ + AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: minio_conn + AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS: "" {% if test_scenario['values']['executor'] == 'celery' %} celeryExecutors: config: @@ -69,11 +76,13 @@ spec: roleGroups: default: replicas: 2 + envOverrides: *envOverrides {% elif test_scenario['values']['executor'] == 'kubernetes' %} kubernetesExecutors: config: logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + envOverrides: *envOverrides {% endif %} schedulers: config: @@ -82,3 +91,20 @@ spec: roleGroups: default: replicas: 1 + envOverrides: *envOverrides + processors: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 1 + envOverrides: *envOverrides + triggerers: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 1 + envOverrides: *envOverrides diff --git a/tests/templates/kuttl/smoke/helm-bitnami-minio-values.yaml b/tests/templates/kuttl/smoke/helm-bitnami-minio-values.yaml new file mode 100644 index 00000000..4db10306 --- /dev/null +++ b/tests/templates/kuttl/smoke/helm-bitnami-minio-values.yaml @@ -0,0 +1,47 @@ +--- +global: + security: + allowInsecureImages: true # needed starting with Chart version 14.9.0 if modifying images + +image: + repository: bitnamilegacy/minio +clientImage: + repository: bitnamilegacy/minio-client +defaultInitContainers: + volumePermissions: # volumePermissions moved under defaultInitContainers starting with Chart version 17.0.0 + enabled: false + image: + repository: bitnamilegacy/os-shell +console: + image: + repository: bitnamilegacy/minio-object-browser + +# volumePermissions can be removed starting with Chart version 17.0.0, moved under defaultInitContainers +volumePermissions: + enabled: false + image: + repository: bitnamilegacy/os-shell + +podSecurityContext: + enabled: false + +containerSecurityContext: + enabled: false + +mode: standalone + +disableWebUI: false + +persistence: + enabled: false + +resources: + requests: + memory: 1Gi + cpu: "1" + limits: + memory: 1Gi + cpu: "2" + +auth: + existingSecret: minio-credentials From 91ed76206518bcf7d3486056bd1939b72576aa98 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 29 Aug 2025 14:57:38 +0200 Subject: [PATCH 2/9] added roles for triggerer and dag-processor --- deploy/helm/airflow-operator/crds/crds.yaml | 482 +++++++++--------- .../operator-binary/src/airflow_controller.rs | 2 +- rust/operator-binary/src/crd/mod.rs | 38 +- rust/operator-binary/src/env_vars.rs | 22 +- rust/operator-binary/src/operations/pdb.rs | 4 +- tests/templates/kuttl/smoke/02-s3-secret.yaml | 25 - tests/templates/kuttl/smoke/03-assert.yaml | 20 - .../templates/kuttl/smoke/03-setup-minio.yaml | 55 -- .../kuttl/smoke/04-prepare-bucket.yaml.j2 | 8 - tests/templates/kuttl/smoke/40-assert.yaml.j2 | 42 ++ .../smoke/40-install-airflow-cluster.yaml.j2 | 14 +- .../smoke/helm-bitnami-minio-values.yaml | 47 -- 12 files changed, 314 insertions(+), 445 deletions(-) delete mode 100644 tests/templates/kuttl/smoke/02-s3-secret.yaml delete mode 100644 tests/templates/kuttl/smoke/03-assert.yaml delete mode 100644 tests/templates/kuttl/smoke/03-setup-minio.yaml delete mode 100644 tests/templates/kuttl/smoke/04-prepare-bucket.yaml.j2 delete mode 100644 tests/templates/kuttl/smoke/helm-bitnami-minio-values.yaml diff --git a/deploy/helm/airflow-operator/crds/crds.yaml b/deploy/helm/airflow-operator/crds/crds.yaml index de6cfa08..06b76491 100644 --- a/deploy/helm/airflow-operator/crds/crds.yaml +++ b/deploy/helm/airflow-operator/crds/crds.yaml @@ -645,244 +645,8 @@ spec: description: Flag to stop the cluster. This means all deployed resources (e.g. Services, StatefulSets, ConfigMaps) are kept but all deployed Pods (e.g. replicas from a StatefulSet) are scaled to 0 and therefore stopped and removed. If applied at the same time with `reconciliationPaused`, the latter will pause reconciliation and `stopped` will take no effect until `reconciliationPaused` is set to false or removed. type: boolean type: object - image: - anyOf: - - required: - - custom - - productVersion - - required: - - productVersion - description: |- - Specify which image to use, the easiest way is to only configure the `productVersion`. You can also configure a custom image registry to pull from, as well as completely custom images. - - Consult the [Product image selection documentation](https://docs.stackable.tech/home/nightly/concepts/product_image_selection) for details. - properties: - custom: - description: Overwrite the docker image. Specify the full docker image name, e.g. `oci.stackable.tech/sdp/superset:1.4.1-stackable2.1.0` - type: string - productVersion: - description: Version of the product, e.g. `1.4.1`. - type: string - pullPolicy: - default: Always - description: '[Pull policy](https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy) used when pulling the image.' - enum: - - IfNotPresent - - Always - - Never - type: string - pullSecrets: - description: '[Image pull secrets](https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod) to pull images from a private registry.' - items: - description: LocalObjectReference contains enough information to let you locate the referenced object inside the same namespace. - properties: - name: - description: 'Name of the referent. This field is effectively required, but due to backwards compatibility is allowed to be empty. Instances of this type with an empty value here are almost certainly wrong. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' - type: string - required: - - name - type: object - nullable: true - type: array - repo: - description: Name of the docker repo, e.g. `oci.stackable.tech/sdp` - nullable: true - type: string - stackableVersion: - description: Stackable version of the product, e.g. `23.4`, `23.4.1` or `0.0.0-dev`. If not specified, the operator will use its own version, e.g. `23.4.1`. When using a nightly operator or a pr version, it will use the nightly `0.0.0-dev` image. - nullable: true - type: string - type: object - kubernetesExecutors: - description: With the Kuberentes executor, executor Pods are created on demand. - properties: - cliOverrides: - additionalProperties: - type: string - default: {} - type: object - config: - default: {} - properties: - affinity: - default: - nodeAffinity: null - nodeSelector: null - podAffinity: null - podAntiAffinity: null - description: These configuration settings control [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement). - properties: - nodeAffinity: - description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) - nullable: true - type: object - x-kubernetes-preserve-unknown-fields: true - nodeSelector: - additionalProperties: - type: string - description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) - nullable: true - type: object - podAffinity: - description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) - nullable: true - type: object - x-kubernetes-preserve-unknown-fields: true - podAntiAffinity: - description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) - nullable: true - type: object - x-kubernetes-preserve-unknown-fields: true - type: object - gracefulShutdownTimeout: - description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. - nullable: true - type: string - logging: - default: - containers: {} - enableVectorAgent: null - description: Logging configuration, learn more in the [logging concept documentation](https://docs.stackable.tech/home/nightly/concepts/logging). - properties: - containers: - additionalProperties: - anyOf: - - required: - - custom - - {} - description: Log configuration of the container - properties: - console: - description: Configuration for the console appender - nullable: true - properties: - level: - description: The log level threshold. Log events with a lower log level are discarded. - enum: - - TRACE - - DEBUG - - INFO - - WARN - - ERROR - - FATAL - - NONE - nullable: true - type: string - type: object - custom: - description: Custom log configuration provided in a ConfigMap - properties: - configMap: - description: ConfigMap containing the log configuration files - nullable: true - type: string - type: object - file: - description: Configuration for the file appender - nullable: true - properties: - level: - description: The log level threshold. Log events with a lower log level are discarded. - enum: - - TRACE - - DEBUG - - INFO - - WARN - - ERROR - - FATAL - - NONE - nullable: true - type: string - type: object - loggers: - additionalProperties: - description: Configuration of a logger - properties: - level: - description: The log level threshold. Log events with a lower log level are discarded. - enum: - - TRACE - - DEBUG - - INFO - - WARN - - ERROR - - FATAL - - NONE - nullable: true - type: string - type: object - default: {} - description: Configuration per logger - type: object - type: object - description: Log configuration per container. - type: object - enableVectorAgent: - description: Wether or not to deploy a container with the Vector log agent. - nullable: true - type: boolean - type: object - resources: - default: - cpu: - max: null - min: null - memory: - limit: null - runtimeLimits: {} - storage: {} - description: Resource usage is configured here, this includes CPU usage, memory usage and disk storage usage, if this role needs any. - properties: - cpu: - default: - max: null - min: null - properties: - max: - description: The maximum amount of CPU cores that can be requested by Pods. Equivalent to the `limit` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. - nullable: true - type: string - min: - description: The minimal amount of CPU cores that Pods need to run. Equivalent to the `request` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. - nullable: true - type: string - type: object - memory: - properties: - limit: - description: 'The maximum amount of memory that should be available to the Pod. Specified as a byte [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/), which means these suffixes are supported: E, P, T, G, M, k. You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. For example, the following represent roughly the same value: `128974848, 129e6, 129M, 128974848000m, 123Mi`' - nullable: true - type: string - runtimeLimits: - description: Additional options that can be specified. - type: object - type: object - storage: - type: object - type: object - type: object - configOverrides: - additionalProperties: - additionalProperties: - type: string - type: object - default: {} - description: The `configOverrides` can be used to configure properties in product config files that are not exposed in the CRD. Read the [config overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#config-overrides) and consult the operator specific usage guide documentation for details on the available config files and settings for the specific product. - type: object - envOverrides: - additionalProperties: - type: string - default: {} - description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' - type: object - podOverrides: - default: {} - description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. - type: object - x-kubernetes-preserve-unknown-fields: true - type: object - processors: - description: The `processor` role runs the DAG processor routine for DAG preparation. + dagProcessors: + description: The `dagProcessors` role runs the DAG processor routine for DAG preparation. nullable: true properties: cliOverrides: @@ -1296,8 +1060,244 @@ spec: required: - roleGroups type: object + image: + anyOf: + - required: + - custom + - productVersion + - required: + - productVersion + description: |- + Specify which image to use, the easiest way is to only configure the `productVersion`. You can also configure a custom image registry to pull from, as well as completely custom images. + + Consult the [Product image selection documentation](https://docs.stackable.tech/home/nightly/concepts/product_image_selection) for details. + properties: + custom: + description: Overwrite the docker image. Specify the full docker image name, e.g. `oci.stackable.tech/sdp/superset:1.4.1-stackable2.1.0` + type: string + productVersion: + description: Version of the product, e.g. `1.4.1`. + type: string + pullPolicy: + default: Always + description: '[Pull policy](https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy) used when pulling the image.' + enum: + - IfNotPresent + - Always + - Never + type: string + pullSecrets: + description: '[Image pull secrets](https://kubernetes.io/docs/concepts/containers/images/#specifying-imagepullsecrets-on-a-pod) to pull images from a private registry.' + items: + description: LocalObjectReference contains enough information to let you locate the referenced object inside the same namespace. + properties: + name: + description: 'Name of the referent. This field is effectively required, but due to backwards compatibility is allowed to be empty. Instances of this type with an empty value here are almost certainly wrong. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names' + type: string + required: + - name + type: object + nullable: true + type: array + repo: + description: Name of the docker repo, e.g. `oci.stackable.tech/sdp` + nullable: true + type: string + stackableVersion: + description: Stackable version of the product, e.g. `23.4`, `23.4.1` or `0.0.0-dev`. If not specified, the operator will use its own version, e.g. `23.4.1`. When using a nightly operator or a pr version, it will use the nightly `0.0.0-dev` image. + nullable: true + type: string + type: object + kubernetesExecutors: + description: With the Kuberentes executor, executor Pods are created on demand. + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + affinity: + default: + nodeAffinity: null + nodeSelector: null + podAffinity: null + podAntiAffinity: null + description: These configuration settings control [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement). + properties: + nodeAffinity: + description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + nodeSelector: + additionalProperties: + type: string + description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + podAffinity: + description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + podAntiAffinity: + description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + gracefulShutdownTimeout: + description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. + nullable: true + type: string + logging: + default: + containers: {} + enableVectorAgent: null + description: Logging configuration, learn more in the [logging concept documentation](https://docs.stackable.tech/home/nightly/concepts/logging). + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + description: Log configuration per container. + type: object + enableVectorAgent: + description: Wether or not to deploy a container with the Vector log agent. + nullable: true + type: boolean + type: object + resources: + default: + cpu: + max: null + min: null + memory: + limit: null + runtimeLimits: {} + storage: {} + description: Resource usage is configured here, this includes CPU usage, memory usage and disk storage usage, if this role needs any. + properties: + cpu: + default: + max: null + min: null + properties: + max: + description: The maximum amount of CPU cores that can be requested by Pods. Equivalent to the `limit` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + min: + description: The minimal amount of CPU cores that Pods need to run. Equivalent to the `request` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + type: object + memory: + properties: + limit: + description: 'The maximum amount of memory that should be available to the Pod. Specified as a byte [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/), which means these suffixes are supported: E, P, T, G, M, k. You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. For example, the following represent roughly the same value: `128974848, 129e6, 129M, 128974848000m, 123Mi`' + nullable: true + type: string + runtimeLimits: + description: Additional options that can be specified. + type: object + type: object + storage: + type: object + type: object + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + description: The `configOverrides` can be used to configure properties in product config files that are not exposed in the CRD. Read the [config overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#config-overrides) and consult the operator specific usage guide documentation for details on the available config files and settings for the specific product. + type: object + envOverrides: + additionalProperties: + type: string + default: {} + description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' + type: object + podOverrides: + default: {} + description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. + type: object + x-kubernetes-preserve-unknown-fields: true + type: object schedulers: - description: The `scheduler` is responsible for triggering jobs and persisting their metadata to the backend database. Jobs are scheduled on the workers/executors. + description: The `schedulers` is responsible for triggering jobs and persisting their metadata to the backend database. Jobs are scheduled on the workers/executors. nullable: true properties: cliOverrides: @@ -1712,7 +1712,7 @@ spec: - roleGroups type: object triggerers: - description: The `triggerer` role runs the triggerer process for use with deferrable DAG operators. + description: The `triggerers` role runs the triggerer process for use with deferrable DAG operators. nullable: true properties: cliOverrides: @@ -2127,7 +2127,7 @@ spec: - roleGroups type: object webservers: - description: The `webserver` role provides the main UI for user interaction. + description: The `webservers` role provides the main UI for user interaction. nullable: true properties: cliOverrides: diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index 058f9bd5..3927d122 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -1169,7 +1169,7 @@ fn build_server_rolegroup_statefulset( } AirflowRole::Webserver | AirflowRole::Worker - | AirflowRole::Processor + | AirflowRole::DagProcessor | AirflowRole::Triggerer => "Parallel", } .to_string(), diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index fd423269..f89d2ec3 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -205,11 +205,11 @@ pub mod versioned { #[serde(default)] pub cluster_operation: ClusterOperation, - /// The `webserver` role provides the main UI for user interaction. + /// The `webservers` role provides the main UI for user interaction. #[serde(default, skip_serializing_if = "Option::is_none")] pub webservers: Option>, - /// The `scheduler` is responsible for triggering jobs and persisting their metadata to the backend database. + /// The `schedulers` is responsible for triggering jobs and persisting their metadata to the backend database. /// Jobs are scheduled on the workers/executors. #[serde(default, skip_serializing_if = "Option::is_none")] pub schedulers: Option>, @@ -217,11 +217,11 @@ pub mod versioned { #[serde(flatten)] pub executor: AirflowExecutor, - /// The `processor` role runs the DAG processor routine for DAG preparation. + /// The `dagProcessors` role runs the DAG processor routine for DAG preparation. #[serde(default, skip_serializing_if = "Option::is_none")] - pub processors: Option>, + pub dag_processors: Option>, - /// The `triggerer` role runs the triggerer process for use with deferrable DAG operators. + /// The `triggerers` role runs the triggerer process for use with deferrable DAG operators. #[serde(default, skip_serializing_if = "Option::is_none")] pub triggerers: Option>, } @@ -352,7 +352,7 @@ impl v1alpha1::AirflowCluster { AirflowRole::Webserver => Some(role_service_name(&self.name_any(), &role.to_string())), AirflowRole::Scheduler | AirflowRole::Worker - | AirflowRole::Processor + | AirflowRole::DagProcessor | AirflowRole::Triggerer => None, } } @@ -367,7 +367,7 @@ impl v1alpha1::AirflowCluster { .to_owned() .map(extract_role_from_webserver_config), AirflowRole::Scheduler => self.spec.schedulers.to_owned(), - AirflowRole::Processor => self.spec.processors.to_owned(), + AirflowRole::DagProcessor => self.spec.dag_processors.to_owned(), AirflowRole::Triggerer => self.spec.triggerers.to_owned(), AirflowRole::Worker => { if let AirflowExecutor::CeleryExecutor { config } = &self.spec.executor { @@ -429,9 +429,9 @@ impl v1alpha1::AirflowCluster { roles: AirflowRole::roles(), })? } - AirflowRole::Processor => { + AirflowRole::DagProcessor => { self.spec - .processors + .dag_processors .as_ref() .context(UnknownAirflowRoleSnafu { role: role.to_string(), @@ -596,8 +596,8 @@ pub enum AirflowRole { #[strum(serialize = "worker")] Worker, - #[strum(serialize = "processor")] - Processor, + #[strum(serialize = "dagprocessor")] + DagProcessor, #[strum(serialize = "triggerer")] Triggerer, @@ -664,8 +664,8 @@ impl AirflowRole { container_debug_command(), "airflow scheduler &".to_string(), ]); - if airflow.spec.processors.is_none() { - // If no processors role has been specified, the + if airflow.spec.dag_processors.is_none() { + // If no dag_processors role has been specified, the // process needs to be included with the scheduler // (with 3.x there is no longer the possibility of // starting it as a subprocess, so it has to be @@ -673,7 +673,7 @@ impl AirflowRole { command.extend(vec!["airflow dag-processor &".to_string()]); } } - AirflowRole::Processor => command.extend(vec![ + AirflowRole::DagProcessor => command.extend(vec![ "prepare_signal_handlers".to_string(), container_debug_command(), "airflow dag-processor &".to_string(), @@ -725,7 +725,7 @@ impl AirflowRole { "airflow scheduler &".to_string(), ]); } - AirflowRole::Processor => command.extend(vec![ + AirflowRole::DagProcessor => command.extend(vec![ "prepare_signal_handlers".to_string(), container_debug_command(), "airflow dag-processor &".to_string(), @@ -791,7 +791,7 @@ impl AirflowRole { AirflowRole::Webserver => Some(HTTP_PORT), AirflowRole::Scheduler => None, AirflowRole::Worker => None, - AirflowRole::Processor => None, + AirflowRole::DagProcessor => None, AirflowRole::Triggerer => None, } } @@ -811,7 +811,7 @@ impl AirflowRole { .webservers .to_owned() .map(|webserver| webserver.role_config.listener_class), - Self::Worker | Self::Scheduler | Self::Processor | Self::Triggerer => None, + Self::Worker | Self::Scheduler | Self::DagProcessor | Self::Triggerer => None, } } } @@ -949,7 +949,7 @@ impl AirflowConfig { graceful_shutdown_timeout: Some(match role { AirflowRole::Webserver | AirflowRole::Scheduler - | AirflowRole::Processor + | AirflowRole::DagProcessor | AirflowRole::Triggerer => DEFAULT_AIRFLOW_GRACEFUL_SHUTDOWN_TIMEOUT, AirflowRole::Worker => DEFAULT_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT, }), @@ -1023,7 +1023,7 @@ fn default_resources(role: &AirflowRole) -> ResourcesFragment ( + AirflowRole::DagProcessor => ( CpuLimitsFragment { min: Some(Quantity("1".to_owned())), max: Some(Quantity("2".to_owned())), diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index 44e7ba05..610d9afe 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -61,7 +61,7 @@ const PYTHONPATH: &str = "PYTHONPATH"; /// This key is only intended for use during experimental support and will /// be replaced with a secret at a later stage. See the issue covering /// this at . -//const JWT_KEY: &str = "ThisKeyIsNotIntendedForProduction!"; +const JWT_KEY: &str = "ThisKeyIsNotIntendedForProduction!"; #[derive(Snafu, Debug)] pub enum Error { @@ -450,8 +450,6 @@ fn add_version_specific_env_vars( resolved_product_image: &ResolvedProductImage, env: &mut BTreeMap, ) { - let secret = airflow.spec.cluster_config.credentials_secret.as_str(); - if resolved_product_image.product_version.starts_with("3.") { env.extend(execution_server_env_vars(airflow)); env.insert( @@ -481,17 +479,13 @@ fn add_version_specific_env_vars( // See issue : // later it will be accessed from a secret to avoid cluster restarts // being triggered by an operator restart. - // env.insert( - // "AIRFLOW__API_AUTH__JWT_SECRET".into(), - // EnvVar { - // name: "AIRFLOW__API_AUTH__JWT_SECRET".into(), - // value: Some(JWT_KEY.into()), - // ..Default::default() - // }, - // ); env.insert( "AIRFLOW__API_AUTH__JWT_SECRET".into(), - env_var_from_secret("AIRFLOW__API_AUTH__JWT_SECRET", secret, "jwt.key"), + EnvVar { + name: "AIRFLOW__API_AUTH__JWT_SECRET".into(), + value: Some(JWT_KEY.into()), + ..Default::default() + }, ); if airflow_role == &AirflowRole::Webserver { // Sometimes a race condition can arise when both scheduler and @@ -529,10 +523,10 @@ fn add_version_specific_env_vars( ..Default::default() }, ); - if airflow.spec.processors.is_some() { + if airflow.spec.dag_processors.is_some() { // In airflow 2.x the dag-processor can optionally be started as a // standalone process (rather then as a scheduler subprocess), - // governed by this env-var + // accompanied by this env-var being set to True. env.insert( "AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR".into(), EnvVar { diff --git a/rust/operator-binary/src/operations/pdb.rs b/rust/operator-binary/src/operations/pdb.rs index eceec248..8b471082 100644 --- a/rust/operator-binary/src/operations/pdb.rs +++ b/rust/operator-binary/src/operations/pdb.rs @@ -37,7 +37,7 @@ pub async fn add_pdbs( let max_unavailable = pdb.max_unavailable.unwrap_or(match role { AirflowRole::Scheduler => max_unavailable_schedulers(), AirflowRole::Webserver => max_unavailable_webservers(), - AirflowRole::Processor => max_unavailable_processors(), + AirflowRole::DagProcessor => max_unavailable_dag_processors(), AirflowRole::Triggerer => max_unavailable_triggerers(), AirflowRole::Worker => match airflow.spec.executor { AirflowExecutor::CeleryExecutor { .. } => max_unavailable_workers(), @@ -80,7 +80,7 @@ fn max_unavailable_webservers() -> u16 { 1 } -fn max_unavailable_processors() -> u16 { +fn max_unavailable_dag_processors() -> u16 { 1 } diff --git a/tests/templates/kuttl/smoke/02-s3-secret.yaml b/tests/templates/kuttl/smoke/02-s3-secret.yaml deleted file mode 100644 index 2898ff1a..00000000 --- a/tests/templates/kuttl/smoke/02-s3-secret.yaml +++ /dev/null @@ -1,25 +0,0 @@ ---- -apiVersion: v1 -kind: Secret -metadata: - name: minio-credentials - labels: - secrets.stackable.tech/class: spark-pi-private-s3-credentials-class -timeout: 240 -stringData: - accessKey: minioAccessKey - secretKey: minioSecretKey - # The following two entries are used by the Bitnami chart for MinIO to - # set up credentials for accessing buckets managed by the MinIO tenant. - root-user: minioAccessKey - root-password: minioSecretKey ---- -apiVersion: secrets.stackable.tech/v1alpha1 -kind: SecretClass -metadata: - name: spark-pi-private-s3-credentials-class -spec: - backend: - k8sSearch: - searchNamespace: - pod: {} diff --git a/tests/templates/kuttl/smoke/03-assert.yaml b/tests/templates/kuttl/smoke/03-assert.yaml deleted file mode 100644 index fbbea3bd..00000000 --- a/tests/templates/kuttl/smoke/03-assert.yaml +++ /dev/null @@ -1,20 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestAssert -timeout: 900 ---- -apiVersion: apps/v1 -kind: Deployment -metadata: - name: test-minio -status: - readyReplicas: 1 ---- -apiVersion: v1 -kind: Pod -metadata: - name: minio-client - labels: - app: minio-client -status: - phase: Running diff --git a/tests/templates/kuttl/smoke/03-setup-minio.yaml b/tests/templates/kuttl/smoke/03-setup-minio.yaml deleted file mode 100644 index 42571093..00000000 --- a/tests/templates/kuttl/smoke/03-setup-minio.yaml +++ /dev/null @@ -1,55 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestStep -commands: - - script: >- - helm install test-minio - --namespace $NAMESPACE - --version 14.6.16 - -f helm-bitnami-minio-values.yaml - --repo https://charts.bitnami.com/bitnami minio - timeout: 240 ---- -apiVersion: v1 -kind: Pod -metadata: - name: minio-client - labels: - app: minio-client -spec: - selector: - matchLabels: - app: minio-client - restartPolicy: Never - containers: - - name: minio-client - image: docker.io/bitnamilegacy/minio-client:2024-debian-12 - command: ["bash", "-c", "sleep infinity"] - stdin: true - tty: true - resources: - requests: - memory: 1Gi - cpu: "1" - limits: - memory: 1Gi - cpu: "2" - env: - - name: MINIO_SERVER_ACCESS_KEY - valueFrom: - secretKeyRef: - name: minio-credentials - key: root-user - optional: false - - name: MINIO_SERVER_SECRET_KEY - valueFrom: - secretKeyRef: - name: minio-credentials - key: root-password - optional: false - - name: MINIO_SERVER_HOST - value: test-minio - - name: MINIO_SERVER_PORT_NUMBER - value: "9000" - - name: MINIO_SERVER_SCHEME - value: http diff --git a/tests/templates/kuttl/smoke/04-prepare-bucket.yaml.j2 b/tests/templates/kuttl/smoke/04-prepare-bucket.yaml.j2 deleted file mode 100644 index 4e8cc30d..00000000 --- a/tests/templates/kuttl/smoke/04-prepare-bucket.yaml.j2 +++ /dev/null @@ -1,8 +0,0 @@ ---- -apiVersion: kuttl.dev/v1beta1 -kind: TestStep -commands: - # give minio enough time to start - - command: sleep 10 - - command: kubectl exec -n $NAMESPACE minio-client -- sh -c 'mc alias set test-minio http://test-minio:9000 $$MINIO_SERVER_ACCESS_KEY $$MINIO_SERVER_SECRET_KEY' - - command: kubectl exec -n $NAMESPACE minio-client -- mc mb test-minio/my-bucket diff --git a/tests/templates/kuttl/smoke/40-assert.yaml.j2 b/tests/templates/kuttl/smoke/40-assert.yaml.j2 index 81337261..0a2aed84 100644 --- a/tests/templates/kuttl/smoke/40-assert.yaml.j2 +++ b/tests/templates/kuttl/smoke/40-assert.yaml.j2 @@ -51,6 +51,30 @@ status: readyReplicas: 1 replicas: 1 --- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-dagprocessor-default +spec: + template: + spec: + terminationGracePeriodSeconds: 120 +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-triggerer-default +spec: + template: + spec: + terminationGracePeriodSeconds: 120 +status: + readyReplicas: 1 + replicas: 1 +--- apiVersion: policy/v1 kind: PodDisruptionBudget metadata: @@ -79,3 +103,21 @@ status: expectedPods: 1 currentHealthy: 1 disruptionsAllowed: 1 +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: airflow-dagprocessor +status: + expectedPods: 1 + currentHealthy: 1 + disruptionsAllowed: 1 +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: airflow-triggerer +status: + expectedPods: 1 + currentHealthy: 1 + disruptionsAllowed: 1 diff --git a/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 index f7d4db03..67cc7c8b 100644 --- a/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 @@ -15,7 +15,6 @@ stringData: adminUser.lastname: Admin adminUser.email: airflow@airflow.com adminUser.password: airflow - jwt.key: thisISaJwTSECRET_1234 connections.secretKey: thisISaSECRET_1234 connections.sqlalchemyDatabaseUri: postgresql+psycopg2://airflow:airflow@airflow-postgresql/airflow {% if test_scenario['values']['executor'] == 'celery' %} @@ -62,12 +61,6 @@ spec: webserver_config.py: EXPERIMENTAL_FILE_HEADER: | COMMON_HEADER_VAR = "group-value" - envOverrides: &envOverrides - AIRFLOW_CONN_MINIO_CONN: "aws://minioAccessKey:minioSecretKey@/?endpoint_url=http%3A%2F%2Ftest-minio%3A9000" - AIRFLOW__LOGGING__REMOTE_LOGGING: "True" - AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: s3://my-bucket/ - AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: minio_conn - AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS: "" {% if test_scenario['values']['executor'] == 'celery' %} celeryExecutors: config: @@ -76,13 +69,11 @@ spec: roleGroups: default: replicas: 2 - envOverrides: *envOverrides {% elif test_scenario['values']['executor'] == 'kubernetes' %} kubernetesExecutors: config: logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} - envOverrides: *envOverrides {% endif %} schedulers: config: @@ -91,15 +82,13 @@ spec: roleGroups: default: replicas: 1 - envOverrides: *envOverrides - processors: + dagProcessors: config: logging: enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} roleGroups: default: replicas: 1 - envOverrides: *envOverrides triggerers: config: logging: @@ -107,4 +96,3 @@ spec: roleGroups: default: replicas: 1 - envOverrides: *envOverrides diff --git a/tests/templates/kuttl/smoke/helm-bitnami-minio-values.yaml b/tests/templates/kuttl/smoke/helm-bitnami-minio-values.yaml deleted file mode 100644 index 4db10306..00000000 --- a/tests/templates/kuttl/smoke/helm-bitnami-minio-values.yaml +++ /dev/null @@ -1,47 +0,0 @@ ---- -global: - security: - allowInsecureImages: true # needed starting with Chart version 14.9.0 if modifying images - -image: - repository: bitnamilegacy/minio -clientImage: - repository: bitnamilegacy/minio-client -defaultInitContainers: - volumePermissions: # volumePermissions moved under defaultInitContainers starting with Chart version 17.0.0 - enabled: false - image: - repository: bitnamilegacy/os-shell -console: - image: - repository: bitnamilegacy/minio-object-browser - -# volumePermissions can be removed starting with Chart version 17.0.0, moved under defaultInitContainers -volumePermissions: - enabled: false - image: - repository: bitnamilegacy/os-shell - -podSecurityContext: - enabled: false - -containerSecurityContext: - enabled: false - -mode: standalone - -disableWebUI: false - -persistence: - enabled: false - -resources: - requests: - memory: 1Gi - cpu: "1" - limits: - memory: 1Gi - cpu: "2" - -auth: - existingSecret: minio-credentials From 5a4e753d07ea160937d9bdee5516a34541a26437 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Fri, 29 Aug 2025 18:01:57 +0200 Subject: [PATCH 3/9] added triggerer test --- .../kuttl/triggerer/00-patch-ns.yaml.j2 | 9 + .../templates/kuttl/triggerer/05-assert.yaml | 14 ++ .../triggerer/05-install-postgresql.yaml | 11 ++ .../kuttl/triggerer/10-assert.yaml.j2 | 24 +++ .../kuttl/triggerer/10-install-redis.yaml.j2 | 13 ++ .../kuttl/triggerer/30-assert.yaml.j2 | 32 ++++ .../30-install-airflow-cluster.yaml.j2 | 147 +++++++++++++++ .../templates/kuttl/triggerer/40-assert.yaml | 14 ++ .../triggerer/40-install-airflow-python.yaml | 23 +++ .../kuttl/triggerer/50-assert.yaml.j2 | 12 ++ .../kuttl/triggerer/50-health-check.yaml | 7 + .../kuttl/triggerer/60-assert.yaml.j2 | 12 ++ .../triggerer/60-install-metrics-script.yaml | 8 + .../kuttl/triggerer/70-assert.yaml.j2 | 25 +++ .../helm-bitnami-postgresql-values.yaml.j2 | 37 ++++ .../helm-bitnami-redis-values.yaml.j2 | 49 +++++ .../kuttl/triggerer/triggerer_metrics.py | 175 ++++++++++++++++++ tests/test-definition.yaml | 5 + 18 files changed, 617 insertions(+) create mode 100644 tests/templates/kuttl/triggerer/00-patch-ns.yaml.j2 create mode 100644 tests/templates/kuttl/triggerer/05-assert.yaml create mode 100644 tests/templates/kuttl/triggerer/05-install-postgresql.yaml create mode 100644 tests/templates/kuttl/triggerer/10-assert.yaml.j2 create mode 100644 tests/templates/kuttl/triggerer/10-install-redis.yaml.j2 create mode 100644 tests/templates/kuttl/triggerer/30-assert.yaml.j2 create mode 100644 tests/templates/kuttl/triggerer/30-install-airflow-cluster.yaml.j2 create mode 100644 tests/templates/kuttl/triggerer/40-assert.yaml create mode 100644 tests/templates/kuttl/triggerer/40-install-airflow-python.yaml create mode 100644 tests/templates/kuttl/triggerer/50-assert.yaml.j2 create mode 100644 tests/templates/kuttl/triggerer/50-health-check.yaml create mode 100644 tests/templates/kuttl/triggerer/60-assert.yaml.j2 create mode 100644 tests/templates/kuttl/triggerer/60-install-metrics-script.yaml create mode 100644 tests/templates/kuttl/triggerer/70-assert.yaml.j2 create mode 100644 tests/templates/kuttl/triggerer/helm-bitnami-postgresql-values.yaml.j2 create mode 100644 tests/templates/kuttl/triggerer/helm-bitnami-redis-values.yaml.j2 create mode 100755 tests/templates/kuttl/triggerer/triggerer_metrics.py diff --git a/tests/templates/kuttl/triggerer/00-patch-ns.yaml.j2 b/tests/templates/kuttl/triggerer/00-patch-ns.yaml.j2 new file mode 100644 index 00000000..67185acf --- /dev/null +++ b/tests/templates/kuttl/triggerer/00-patch-ns.yaml.j2 @@ -0,0 +1,9 @@ +{% if test_scenario['values']['openshift'] == 'true' %} +# see https://github.com/stackabletech/issues/issues/566 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}' + timeout: 120 +{% endif %} diff --git a/tests/templates/kuttl/triggerer/05-assert.yaml b/tests/templates/kuttl/triggerer/05-assert.yaml new file mode 100644 index 00000000..319e927a --- /dev/null +++ b/tests/templates/kuttl/triggerer/05-assert.yaml @@ -0,0 +1,14 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-postgresql +timeout: 480 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-postgresql +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/triggerer/05-install-postgresql.yaml b/tests/templates/kuttl/triggerer/05-install-postgresql.yaml new file mode 100644 index 00000000..dc25ba20 --- /dev/null +++ b/tests/templates/kuttl/triggerer/05-install-postgresql.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install airflow-postgresql + --namespace $NAMESPACE + --version 16.4.2 + -f helm-bitnami-postgresql-values.yaml + oci://registry-1.docker.io/bitnamicharts/postgresql + timeout: 600 diff --git a/tests/templates/kuttl/triggerer/10-assert.yaml.j2 b/tests/templates/kuttl/triggerer/10-assert.yaml.j2 new file mode 100644 index 00000000..8d585401 --- /dev/null +++ b/tests/templates/kuttl/triggerer/10-assert.yaml.j2 @@ -0,0 +1,24 @@ +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-redis +timeout: 360 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-redis-master +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-redis-replicas +status: + readyReplicas: 1 + replicas: 1 +{% endif %} diff --git a/tests/templates/kuttl/triggerer/10-install-redis.yaml.j2 b/tests/templates/kuttl/triggerer/10-install-redis.yaml.j2 new file mode 100644 index 00000000..aae9a14f --- /dev/null +++ b/tests/templates/kuttl/triggerer/10-install-redis.yaml.j2 @@ -0,0 +1,13 @@ +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install airflow-redis + --namespace $NAMESPACE + --version 17.11.3 + -f helm-bitnami-redis-values.yaml + --repo https://charts.bitnami.com/bitnami redis + timeout: 600 +{% endif %} diff --git a/tests/templates/kuttl/triggerer/30-assert.yaml.j2 b/tests/templates/kuttl/triggerer/30-assert.yaml.j2 new file mode 100644 index 00000000..6ffaabb2 --- /dev/null +++ b/tests/templates/kuttl/triggerer/30-assert.yaml.j2 @@ -0,0 +1,32 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-cluster +timeout: 1200 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-webserver-default +status: + readyReplicas: 1 + replicas: 1 +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-worker-default +status: + readyReplicas: 1 + replicas: 1 +{% endif %} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-scheduler-default +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/triggerer/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/triggerer/30-install-airflow-cluster.yaml.j2 new file mode 100644 index 00000000..9ebaf074 --- /dev/null +++ b/tests/templates/kuttl/triggerer/30-install-airflow-cluster.yaml.j2 @@ -0,0 +1,147 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +metadata: + name: install-airflow-db +timeout: 480 +--- +apiVersion: v1 +kind: Secret +metadata: + name: test-airflow-credentials +type: Opaque +stringData: + adminUser.username: airflow + adminUser.firstname: Airflow + adminUser.lastname: Admin + adminUser.email: airflow@airflow.com + adminUser.password: airflow + connections.secretKey: thisISaSECRET_1234 + connections.sqlalchemyDatabaseUri: postgresql+psycopg2://airflow:airflow@airflow-postgresql/airflow +{% if test_scenario['values']['executor'] == 'celery' %} + connections.celeryResultBackend: db+postgresql://airflow:airflow@airflow-postgresql/airflow + connections.celeryBrokerUrl: redis://:redis@airflow-redis-master:6379/0 +{% endif %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: triggerer-dag +data: + triggerer_dag.py: | + from datetime import datetime, timedelta + + from airflow import DAG + from airflow.models.baseoperator import BaseOperator + from airflow.triggers.temporal import TimeDeltaTrigger + from airflow.utils.context import Context + from airflow.operators.empty import EmptyOperator + + # ------------------------------------------------------ + # Custom deferrable operator - does a simple async sleep + # ------------------------------------------------------ + class CoreDeferrableSleepOperator(BaseOperator): + """ + Sleeps for ``duration`` seconds without occupying a worker. + The async hand-off happens via ``self.defer`` + ``TimeDeltaTrigger``. + """ + ui_color = "#ffefeb" + + def __init__(self, *, duration: int, **kwargs): + super().__init__(**kwargs) + self.duration = duration + + def execute(self, context: Context): + """Run on a worker, then hand control to the Triggerer.""" + # Build the trigger that will fire after `duration` seconds. + trigger = TimeDeltaTrigger(timedelta(seconds=self.duration)) + + # *** Asynchronous hand-off *** + # This tells the scheduler: “pause this task, let the Triggerer watch the timer”. + self.defer(trigger=trigger, method_name="execute_complete") + + def execute_complete(self, context: Context, event=None): + """Resumes here once the Triggerer fires.""" + self.log.info("Deferrable sleep of %s seconds finished.", self.duration) + return "DONE" + + default_args = {"owner": "stackable", "retries": 0} + + with DAG( + dag_id="core_deferrable_sleep_demo", + schedule=None, + # N.B. this be earlier than the current timestamp! + start_date=datetime(2025, 8, 1), + catchup=False, + default_args=default_args, + tags=["example", "triggerer"], + ) as dag: + + start = EmptyOperator(task_id="start") + + sleep = CoreDeferrableSleepOperator( + task_id="deferrable_sleep", + duration=10, + ) + + finish = EmptyOperator(task_id="finish") + + start >> sleep >> finish +--- +apiVersion: airflow.stackable.tech/v1alpha1 +kind: AirflowCluster +metadata: + name: airflow +spec: + image: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + custom: "{{ test_scenario['values']['airflow-latest'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} + pullPolicy: IfNotPresent + clusterConfig: + credentialsSecret: test-airflow-credentials + volumes: + - name: triggerer-dag + configMap: + name: triggerer-dag + volumeMounts: + - name: triggerer-dag + mountPath: /dags/triggerer_dag.py + subPath: triggerer_dag.py + webservers: + roleConfig: + listenerClass: external-unstable + roleGroups: + default: + envOverrides: &envOverrides + AIRFLOW__CORE__DAGS_FOLDER: "/dags" + replicas: 1 +{% if test_scenario['values']['executor'] == 'celery' %} + celeryExecutors: + roleGroups: + default: + envOverrides: *envOverrides + replicas: 1 +{% elif test_scenario['values']['executor'] == 'kubernetes' %} + kubernetesExecutors: + envOverrides: *envOverrides +{% endif %} + schedulers: + config: + gracefulShutdownTimeout: 10s + roleGroups: + default: + envOverrides: *envOverrides + replicas: 1 + dagProcessors: + roleGroups: + default: + envOverrides: *envOverrides + replicas: 1 + triggerers: + roleGroups: + default: + envOverrides: *envOverrides + replicas: 1 diff --git a/tests/templates/kuttl/triggerer/40-assert.yaml b/tests/templates/kuttl/triggerer/40-assert.yaml new file mode 100644 index 00000000..6edaa3c3 --- /dev/null +++ b/tests/templates/kuttl/triggerer/40-assert.yaml @@ -0,0 +1,14 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-python +timeout: 240 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-airflow-python +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/triggerer/40-install-airflow-python.yaml b/tests/templates/kuttl/triggerer/40-install-airflow-python.yaml new file mode 100644 index 00000000..c3f865a0 --- /dev/null +++ b/tests/templates/kuttl/triggerer/40-install-airflow-python.yaml @@ -0,0 +1,23 @@ +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-airflow-python + labels: + app: test-airflow-python +spec: + replicas: 1 + selector: + matchLabels: + app: test-airflow-python + template: + metadata: + labels: + app: test-airflow-python + spec: + containers: + - name: test-airflow-python + image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent + stdin: true + tty: true diff --git a/tests/templates/kuttl/triggerer/50-assert.yaml.j2 b/tests/templates/kuttl/triggerer/50-assert.yaml.j2 new file mode 100644 index 00000000..b85052aa --- /dev/null +++ b/tests/templates/kuttl/triggerer/50-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-webserver-health-check +timeout: 480 +commands: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} diff --git a/tests/templates/kuttl/triggerer/50-health-check.yaml b/tests/templates/kuttl/triggerer/50-health-check.yaml new file mode 100644 index 00000000..5d3b329f --- /dev/null +++ b/tests/templates/kuttl/triggerer/50-health-check.yaml @@ -0,0 +1,7 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +timeout: 480 +commands: + - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/health.py test-airflow-python-0:/tmp + timeout: 240 diff --git a/tests/templates/kuttl/triggerer/60-assert.yaml.j2 b/tests/templates/kuttl/triggerer/60-assert.yaml.j2 new file mode 100644 index 00000000..b9ce5d22 --- /dev/null +++ b/tests/templates/kuttl/triggerer/60-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: triggerer_metrics +timeout: 480 +commands: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/triggerer_metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/triggerer_metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} diff --git a/tests/templates/kuttl/triggerer/60-install-metrics-script.yaml b/tests/templates/kuttl/triggerer/60-install-metrics-script.yaml new file mode 100644 index 00000000..377566b1 --- /dev/null +++ b/tests/templates/kuttl/triggerer/60-install-metrics-script.yaml @@ -0,0 +1,8 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +metadata: + name: metrics +commands: + - script: kubectl cp -n $NAMESPACE triggerer_metrics.py test-airflow-python-0:/tmp + timeout: 240 diff --git a/tests/templates/kuttl/triggerer/70-assert.yaml.j2 b/tests/templates/kuttl/triggerer/70-assert.yaml.j2 new file mode 100644 index 00000000..7227e06f --- /dev/null +++ b/tests/templates/kuttl/triggerer/70-assert.yaml.j2 @@ -0,0 +1,25 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-log-endpoint +timeout: 240 +commands: +{% if test_scenario['values']['executor'] == 'celery' %} + - script: | + set -eu + + # Log-Endpoint Test: + # This is executed from the Webserver as JWT keys must be present. + # A small server is started on each worker that serves the logs on its + # 8793 port for the Webserver: we don't use the token as that is an + # internal implementation, but check that the endpoint is reachable, + # indicated by a 403. + CURL_RESPONSE=$( + kubectl -n $NAMESPACE exec airflow-webserver-default-0 -- sh -c 'CODE=$(curl -s -o /dev/null -w "%{http_code}" http://airflow-worker-default-headless:8793 2>/dev/null || true);echo "$CODE"' + ) + + # Log-Endpoint Test Assertion: + echo "The HTTP Code is $CURL_RESPONSE (an internal JWT token is needed for full access)" + [ "$CURL_RESPONSE" -eq 403 ] +{% endif %} diff --git a/tests/templates/kuttl/triggerer/helm-bitnami-postgresql-values.yaml.j2 b/tests/templates/kuttl/triggerer/helm-bitnami-postgresql-values.yaml.j2 new file mode 100644 index 00000000..80c50924 --- /dev/null +++ b/tests/templates/kuttl/triggerer/helm-bitnami-postgresql-values.yaml.j2 @@ -0,0 +1,37 @@ +--- +global: + security: + allowInsecureImages: true + +image: + repository: bitnamilegacy/postgresql + +volumePermissions: + enabled: false + image: + repository: bitnamilegacy/os-shell + securityContext: + runAsUser: auto + +metrics: + image: + repository: bitnamilegacy/postgres-exporter + +primary: + podSecurityContext: +{% if test_scenario['values']['openshift'] == 'true' %} + enabled: false +{% else %} + enabled: true +{% endif %} + containerSecurityContext: + enabled: false + +shmVolume: + chmod: + enabled: false + +auth: + username: airflow + password: airflow + database: airflow diff --git a/tests/templates/kuttl/triggerer/helm-bitnami-redis-values.yaml.j2 b/tests/templates/kuttl/triggerer/helm-bitnami-redis-values.yaml.j2 new file mode 100644 index 00000000..0198dc51 --- /dev/null +++ b/tests/templates/kuttl/triggerer/helm-bitnami-redis-values.yaml.j2 @@ -0,0 +1,49 @@ +--- +global: + security: + allowInsecureImages: true # needed starting with Chart version 20.5.0 if modifying images +image: + repository: bitnamilegacy/redis +sentinel: + image: + repository: bitnamilegacy/redis-sentinel +metrics: + image: + repository: bitnamilegacy/redis-exporter +kubectl: + image: + repository: bitnamilegacy/kubectl +sysctl: + image: + repository: bitnamilegacy/os-shell + +volumePermissions: + enabled: false + image: + repository: bitnamilegacy/os-shell + containerSecurityContext: + runAsUser: auto + +master: + podSecurityContext: +{% if test_scenario['values']['openshift'] == 'true' %} + enabled: false +{% else %} + enabled: true +{% endif %} + containerSecurityContext: + enabled: false + +replica: + replicaCount: 1 + podSecurityContext: +{% if test_scenario['values']['openshift'] == 'true' %} + enabled: false +{% else %} + enabled: true +{% endif %} + containerSecurityContext: + enabled: false + +auth: + password: redis diff --git a/tests/templates/kuttl/triggerer/triggerer_metrics.py b/tests/templates/kuttl/triggerer/triggerer_metrics.py new file mode 100755 index 00000000..51a417a4 --- /dev/null +++ b/tests/templates/kuttl/triggerer/triggerer_metrics.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python + +import requests +import time +import sys +from datetime import datetime, timezone +import argparse +import logging + + +def exception_handler(exception_type, exception, traceback): + print(f"{exception_type.__name__}: {exception.args}") + + +sys.excepthook = exception_handler + + +def assert_metric(role, role_group, metric): + metric_response = requests.get( + f"http://airflow-{role}-{role_group}-metrics:9102/metrics" + ) + assert metric_response.status_code == 200, ( + f"Metrics could not be retrieved from the {role}-{role_group}." + ) + return metric in metric_response.text + + +def metrics_v3(role_group: str) -> None: + now = datetime.now(timezone.utc) + ts = now.strftime("%Y-%m-%dT%H:%M:%S.%f") + now.strftime("%z") + + # Trigger a deferrable DAG run to create metrics + dag_id = "core_deferrable_sleep_demo" + dag_data = {"logical_date": f"{ts}"} + + # allow a few moments for the DAGs to be registered to all roles + time.sleep(10) + + rest_url = "http://airflow-webserver:8080/api/v2" + token_url = "http://airflow-webserver:8080/auth/token" + + data = {"username": "airflow", "password": "airflow"} + + headers = {"Content-Type": "application/json"} + + response = requests.post(token_url, headers=headers, json=data) + + if response.status_code == 200 or response.status_code == 201: + token_data = response.json() + access_token = token_data["access_token"] + print(f"Access Token: {access_token}") + else: + print( + f"Failed to obtain access token: {response.status_code} - {response.text}" + ) + sys.exit(1) + + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + } + + # activate DAG + response = requests.patch( + f"{rest_url}/dags/{dag_id}", headers=headers, json={"is_paused": False} + ) + # trigger DAG + response = requests.post( + f"{rest_url}/dags/{dag_id}/dagRuns", headers=headers, json=dag_data + ) + + # Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid + # or minimize this by looping over the check instead. + iterations = 4 + loop = 0 + while True: + assert response.status_code == 200, "DAG run could not be triggered." + # Wait for the metrics to be consumed by the statsd-exporter + time.sleep(5) + # (disable line-break flake checks) + if ( + (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) + and ( + assert_metric( + "webserver", + role_group, + "airflow_task_instance_created_CoreDeferrableSleepOperator", + ) + ) # noqa: W503, W504 + and ( + assert_metric( + "scheduler", + role_group, + "airflow_dagrun_duration_success_core_deferrable_sleep_demo_count", + ) + ) + ): # noqa: W503, W504 + break + time.sleep(10) + loop += 1 + if loop == iterations: + # force re-try of script + sys.exit(1) + + +def metrics_v2(role_group: str) -> None: + # Trigger a DAG run to create metrics + dag_id = "core_deferrable_sleep_demo" + + rest_url = "http://airflow-webserver:8080/api/v1" + auth = ("airflow", "airflow") + + # allow a few moments for the DAGs to be registered to all roles + time.sleep(10) + + response = requests.patch( + f"{rest_url}/dags/{dag_id}", auth=auth, json={"is_paused": False} + ) + response = requests.post( + f"{rest_url}/dags/{dag_id}/dagRuns", auth=auth, json={"conf": {}} + ) + + # Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid + # or minimize this by looping over the check instead. + iterations = 4 + loop = 0 + while True: + assert response.status_code == 200, "DAG run could not be triggered." + # Wait for the metrics to be consumed by the statsd-exporter + time.sleep(5) + # (disable line-break flake checks) + if ( + (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) + and ( + assert_metric( + "webserver", + role_group, + "airflow_task_instance_created_CoreDeferrableSleepOperator", + ) + ) # noqa: W503, W504 + and ( + assert_metric( + "scheduler", + role_group, + "airflow_dagrun_duration_success_core_deferrable_sleep_demo_count", + ) + ) + ): # noqa: W503, W504 + break + time.sleep(10) + loop += 1 + if loop == iterations: + # force re-try of script + sys.exit(1) + + +if __name__ == "__main__": + log_level = "DEBUG" + logging.basicConfig( + level=log_level, + format="%(asctime)s %(levelname)s: %(message)s", + stream=sys.stdout, + ) + + parser = argparse.ArgumentParser(description="Airflow metrics script") + parser.add_argument( + "--role-group", type=str, default="default", help="Role group to check" + ) + parser.add_argument("--airflow-version", type=str, help="Airflow version") + opts = parser.parse_args() + + if opts.airflow_version and opts.airflow_version.startswith("3"): + metrics_v3(opts.role_group) + else: + metrics_v2(opts.role_group) diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index d2860cd8..e1cb64ec 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -90,6 +90,11 @@ tests: - airflow - openshift - executor + - name: triggerer + dimensions: + - airflow-latest + - openshift + - executor suites: - name: nightly # Run nightly with the latest airflow From 155a62d66387bdf660efdc96ab4526573d1d63f6 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Mon, 8 Sep 2025 13:58:23 +0200 Subject: [PATCH 4/9] changelog, docs, getting-started --- CHANGELOG.md | 2 + .../getting_started/code/airflow.yaml | 10 ++++- .../getting_started/code/getting_started.sh | 2 + .../code/getting_started.sh.j2 | 2 + .../pages/getting_started/first_steps.adoc | 10 ++++- docs/modules/airflow/pages/index.adoc | 16 +++++++- .../pages/usage-guide/storage-resources.adoc | 41 ++++++++++++++++--- 7 files changed, 73 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d181fdd..53156d1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Added - Add a flag to determine if database initialization steps should be executed ([#669]). +- Add new roles for dag-processor and triggerer processes ([#679]). ### Fixed @@ -19,6 +20,7 @@ [#668]: https://github.com/stackabletech/airflow-operator/pull/668 [#669]: https://github.com/stackabletech/airflow-operator/pull/669 [#678]: https://github.com/stackabletech/airflow-operator/pull/678 +[#679]: https://github.com/stackabletech/airflow-operator/pull/679 [#683]: https://github.com/stackabletech/airflow-operator/pull/683 ## [25.7.0] - 2025-07-23 diff --git a/docs/modules/airflow/examples/getting_started/code/airflow.yaml b/docs/modules/airflow/examples/getting_started/code/airflow.yaml index c3522e2b..5039134c 100644 --- a/docs/modules/airflow/examples/getting_started/code/airflow.yaml +++ b/docs/modules/airflow/examples/getting_started/code/airflow.yaml @@ -19,8 +19,16 @@ spec: celeryExecutors: roleGroups: default: - replicas: 2 + replicas: 1 schedulers: roleGroups: default: replicas: 1 + dagProcessors: + roleGroups: + default: + replicas: 1 + triggerers: + roleGroups: + default: + replicas: 1 diff --git a/docs/modules/airflow/examples/getting_started/code/getting_started.sh b/docs/modules/airflow/examples/getting_started/code/getting_started.sh index 2cd9f692..4bed0189 100755 --- a/docs/modules/airflow/examples/getting_started/code/getting_started.sh +++ b/docs/modules/airflow/examples/getting_started/code/getting_started.sh @@ -88,6 +88,8 @@ echo "Awaiting Airflow rollout finish ..." kubectl rollout status --watch --timeout=5m statefulset/airflow-webserver-default kubectl rollout status --watch --timeout=5m statefulset/airflow-worker-default kubectl rollout status --watch --timeout=5m statefulset/airflow-scheduler-default +kubectl rollout status --watch --timeout=5m statefulset/airflow-dagprocessor-default +kubectl rollout status --watch --timeout=5m statefulset/airflow-triggerer-default # end::watch-airflow-rollout[] echo "Starting port-forwarding of port 8080" diff --git a/docs/modules/airflow/examples/getting_started/code/getting_started.sh.j2 b/docs/modules/airflow/examples/getting_started/code/getting_started.sh.j2 index 00adc93e..beaab225 100755 --- a/docs/modules/airflow/examples/getting_started/code/getting_started.sh.j2 +++ b/docs/modules/airflow/examples/getting_started/code/getting_started.sh.j2 @@ -88,6 +88,8 @@ echo "Awaiting Airflow rollout finish ..." kubectl rollout status --watch --timeout=5m statefulset/airflow-webserver-default kubectl rollout status --watch --timeout=5m statefulset/airflow-worker-default kubectl rollout status --watch --timeout=5m statefulset/airflow-scheduler-default +kubectl rollout status --watch --timeout=5m statefulset/airflow-dagprocessor-default +kubectl rollout status --watch --timeout=5m statefulset/airflow-triggerer-default # end::watch-airflow-rollout[] echo "Starting port-forwarding of port 8080" diff --git a/docs/modules/airflow/pages/getting_started/first_steps.adoc b/docs/modules/airflow/pages/getting_started/first_steps.adoc index 37e395f1..9b249ae1 100644 --- a/docs/modules/airflow/pages/getting_started/first_steps.adoc +++ b/docs/modules/airflow/pages/getting_started/first_steps.adoc @@ -38,11 +38,15 @@ NOTE: The admin user is disabled if you use a non-default authentication mechani == Airflow -An Airflow cluster is made of up three components: +An Airflow cluster is made of up five components, two of which are optional: * `webserver`: this provides the main UI for user-interaction * `executors`: the CeleryExecutor or KubernetesExecutor nodes over which the job workload is distributed by the scheduler * `scheduler`: responsible for triggering jobs and persisting their metadata to the backend database +* `dagProcessors`: (Optional) responsible for monitoring, parsing and preparing DAGs for processing. +If this role is not specified then the process will be started as a scheduler subprocess (Airflow 2.x), or as a standalone process in the same container as the scheduler (Airflow 3.x+) +* `triggerers`: (Optional) DAGs making use of deferrable operators can be used together with one or more triggerer processes to free up worker slots. +This deferral process is also useful for providing a measure of high availability Create a file named `airflow.yaml` with the following contents: @@ -92,7 +96,9 @@ airflow-redis-master 1/1 16m airflow-redis-replicas 1/1 16m airflow-scheduler-default 1/1 11m airflow-webserver-default 1/1 11m -airflow-celery-executor-default 2/2 11m +airflow-celery-executor-default 1/1 11m +airflow-dagprocessor-default 1/1 11m +airflow-triggerer-default 1/1 11m ---- When the Airflow cluster has been created and the database is initialized, Airflow can be opened in the diff --git a/docs/modules/airflow/pages/index.adoc b/docs/modules/airflow/pages/index.adoc index 58a32b69..dc47b127 100644 --- a/docs/modules/airflow/pages/index.adoc +++ b/docs/modules/airflow/pages/index.adoc @@ -3,11 +3,11 @@ :keywords: Stackable Operator, Apache Airflow, Kubernetes, k8s, operator, job pipeline, scheduler, ETL :airflow: https://airflow.apache.org/ :dags: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html -:k8s-crs: https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/ :github: https://github.com/stackabletech/airflow-operator/ :crd: {crd-docs-base-url}/airflow-operator/{crd-docs-version}/ :crd-airflowcluster: {crd-docs}/airflow.stackable.tech/airflowcluster/v1alpha1/ :feature-tracker: https://features.stackable.tech/unified +:deferrable-operators: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html#deferrable-operators-triggers [.link-bar] * {github}[GitHub {external-link-icon}^] @@ -27,7 +27,8 @@ It guides you through installing the operator alongside a PostgreSQL database an === Custom resources The AirflowCluster is the resource for the configuration of the Airflow instance. -The resource defines three xref:concepts:roles-and-role-groups.adoc[roles]: `webserver`, `worker` and `scheduler` (the `worker` role is embedded within `spec.celeryExecutors`: this is described in the next section). +The resource defines five xref:concepts:roles-and-role-groups.adoc[roles]: `webserver`, `worker`, `scheduler`, `dagProcessor` and `triggerer` (the `worker` role is embedded within `spec.celeryExecutors`: this is described in the next section). +The `dagProcessor` and `triggerer` roles are optional. The various configuration options are explained in the xref:usage-guide/index.adoc[]. It helps you tune your cluster to your needs by configuring xref:usage-guide/storage-resources.adoc[resource usage], xref:usage-guide/security.adoc[security], xref:usage-guide/logging.adoc[logging] and more. @@ -70,6 +71,17 @@ kubernetesExecutors: ... ---- +=== DAG-Processors + +In Airflow 2.x, a DAG-Processor can be started either as a standalone process or a subprocess within the scheduler component. +For Airflow 3.x+ it _must_ be started as a standalone process, either in a separate container or in the scheduler container. +In each case the default will be applied (subprocess or combined in the scheduler container) if the role is not specified. + +=== Triggerers + +DAGs using deferrable operators can be combined with the triggerer component to free up worker slots and/or provide high availability. +For more information, please refer to the {deferrable-operators}[documentation {external-link-icon}^]. + === Kubernetes resources Based on the custom resources you define, the operator creates ConfigMaps, StatefulSets and Services. diff --git a/docs/modules/airflow/pages/usage-guide/storage-resources.adoc b/docs/modules/airflow/pages/usage-guide/storage-resources.adoc index df37b4b7..20133720 100644 --- a/docs/modules/airflow/pages/usage-guide/storage-resources.adoc +++ b/docs/modules/airflow/pages/usage-guide/storage-resources.adoc @@ -1,13 +1,13 @@ = Resource Requests -:description: Find out about minimal HA Airflow requirements for CPU and memory, with defaults for schedulers, Celery executors, webservers using Kubernetes resource limits. +:description: Find out about minimal HA Airflow requirements for CPU and memory, with defaults for schedulers, Celery executors, webservers, dagProcessors and triggerers using Kubernetes resource limits. include::home:concepts:stackable_resource_requests.adoc[] -A minimal HA setup consisting of 2 schedulers, 2 workers and 2 webservers has the following https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/[resource requirements]: +A minimal HA setup consisting of 2 schedulers, 2 workers, 2 webservers, 2 dag-processors and 1 triggerer has the following https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/[resource requirements]: -* `8700m` CPU request -* `17400m` CPU limit -* `15872Mi` memory request and limit +* `11600` CPU request +* `23200` CPU limit +* `18432Mi` memory request and limit Corresponding to the values above, the operator uses the following resource defaults: @@ -22,6 +22,9 @@ spec: max: "2" memory: limit: 1Gi + roleGroups: + default: + replicas: 2 celeryExecutors: config: resources: @@ -30,6 +33,9 @@ spec: max: "2" memory: limit: 3Gi + roleGroups: + default: + replicas: 2 webservers: config: resources: @@ -38,4 +44,29 @@ spec: max: "2" memory: limit: 3Gi + roleGroups: + default: + replicas: 2 + dagProcessors: + config: + resources: + cpu: + min: "1" + max: "2" + memory: + limit: 1Gi + roleGroups: + default: + replicas: 2 + triggerers: + config: + resources: + cpu: + min: "1" + max: "2" + memory: + limit: 1Gi + roleGroups: + default: + replicas: 1 ---- From d835db77893930a6a12bd8d0c5f9d2046987f314 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Tue, 9 Sep 2025 09:34:44 +0200 Subject: [PATCH 5/9] remove start & finish task --- .../kuttl/triggerer/30-install-airflow-cluster.yaml.j2 | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/templates/kuttl/triggerer/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/triggerer/30-install-airflow-cluster.yaml.j2 index 9ebaf074..ae9b08a6 100644 --- a/tests/templates/kuttl/triggerer/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/triggerer/30-install-airflow-cluster.yaml.j2 @@ -76,16 +76,12 @@ data: tags=["example", "triggerer"], ) as dag: - start = EmptyOperator(task_id="start") - sleep = CoreDeferrableSleepOperator( task_id="deferrable_sleep", duration=10, ) - finish = EmptyOperator(task_id="finish") - - start >> sleep >> finish + sleep --- apiVersion: airflow.stackable.tech/v1alpha1 kind: AirflowCluster From 59ab5b6c7a3068378e70b65ac159c11127d9b9fa Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy <1712947+adwk67@users.noreply.github.com> Date: Tue, 9 Sep 2025 10:39:23 +0200 Subject: [PATCH 6/9] Update docs/modules/airflow/pages/getting_started/first_steps.adoc Co-authored-by: Malte Sander --- docs/modules/airflow/pages/getting_started/first_steps.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/airflow/pages/getting_started/first_steps.adoc b/docs/modules/airflow/pages/getting_started/first_steps.adoc index 9b249ae1..3ee569ff 100644 --- a/docs/modules/airflow/pages/getting_started/first_steps.adoc +++ b/docs/modules/airflow/pages/getting_started/first_steps.adoc @@ -38,7 +38,7 @@ NOTE: The admin user is disabled if you use a non-default authentication mechani == Airflow -An Airflow cluster is made of up five components, two of which are optional: +An Airflow cluster is made up of several components, two of which are optional: * `webserver`: this provides the main UI for user-interaction * `executors`: the CeleryExecutor or KubernetesExecutor nodes over which the job workload is distributed by the scheduler From 17da28662b7250f5d91053b7070171929a8350f7 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy <1712947+adwk67@users.noreply.github.com> Date: Tue, 9 Sep 2025 10:40:16 +0200 Subject: [PATCH 7/9] Update docs/modules/airflow/pages/index.adoc Co-authored-by: Malte Sander --- docs/modules/airflow/pages/index.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/modules/airflow/pages/index.adoc b/docs/modules/airflow/pages/index.adoc index dc47b127..29bb0663 100644 --- a/docs/modules/airflow/pages/index.adoc +++ b/docs/modules/airflow/pages/index.adoc @@ -27,7 +27,7 @@ It guides you through installing the operator alongside a PostgreSQL database an === Custom resources The AirflowCluster is the resource for the configuration of the Airflow instance. -The resource defines five xref:concepts:roles-and-role-groups.adoc[roles]: `webserver`, `worker`, `scheduler`, `dagProcessor` and `triggerer` (the `worker` role is embedded within `spec.celeryExecutors`: this is described in the next section). +The custom resource defines the following xref:concepts:roles-and-role-groups.adoc[roles]: `webserver`, `worker`, `scheduler`, `dagProcessor` and `triggerer` (the `worker` role is embedded within `spec.celeryExecutors`: this is described in the next section). The `dagProcessor` and `triggerer` roles are optional. The various configuration options are explained in the xref:usage-guide/index.adoc[]. It helps you tune your cluster to your needs by configuring xref:usage-guide/storage-resources.adoc[resource usage], xref:usage-guide/security.adoc[security], xref:usage-guide/logging.adoc[logging] and more. From b635cb9d32703f4db276649f400b4d9c335120d0 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 9 Sep 2025 11:15:29 +0200 Subject: [PATCH 8/9] review feedback: resources clarification, corrected naming of test step --- docs/modules/airflow/pages/usage-guide/storage-resources.adoc | 1 + tests/templates/kuttl/ldap/60-install-airflow-cluster.yaml.j2 | 2 +- .../mount-dags-configmap/30-install-airflow-cluster.yaml.j2 | 2 +- .../kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 | 2 +- .../kuttl/orphaned-resources/30-install-airflow-cluster.yaml.j2 | 2 +- .../kuttl/remote-logging/40-install-airflow-cluster.yaml.j2 | 2 +- .../kuttl/resources/30-install-airflow-cluster.yaml.j2 | 2 +- tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 | 2 +- .../kuttl/triggerer/30-install-airflow-cluster.yaml.j2 | 2 +- 9 files changed, 9 insertions(+), 8 deletions(-) diff --git a/docs/modules/airflow/pages/usage-guide/storage-resources.adoc b/docs/modules/airflow/pages/usage-guide/storage-resources.adoc index 20133720..3c399557 100644 --- a/docs/modules/airflow/pages/usage-guide/storage-resources.adoc +++ b/docs/modules/airflow/pages/usage-guide/storage-resources.adoc @@ -9,6 +9,7 @@ A minimal HA setup consisting of 2 schedulers, 2 workers, 2 webservers, 2 dag-pr * `23200` CPU limit * `18432Mi` memory request and limit +This includes auxiliary containers for logging, metrics, and gitsync. Corresponding to the values above, the operator uses the following resource defaults: [source,yaml] diff --git a/tests/templates/kuttl/ldap/60-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/ldap/60-install-airflow-cluster.yaml.j2 index b87f0d8e..fb8ced2c 100644 --- a/tests/templates/kuttl/ldap/60-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/ldap/60-install-airflow-cluster.yaml.j2 @@ -1,7 +1,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep metadata: - name: install-airflow-db + name: install-airflow timeout: 480 commands: - script: | diff --git a/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 index 324c0642..e11f6bc0 100644 --- a/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 @@ -1,7 +1,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep metadata: - name: install-airflow-db + name: install-airflow timeout: 480 --- apiVersion: v1 diff --git a/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 index 113a2346..5b0d5ca4 100644 --- a/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 @@ -1,7 +1,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep metadata: - name: install-airflow-db + name: install-airflow timeout: 480 --- apiVersion: v1 diff --git a/tests/templates/kuttl/orphaned-resources/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/orphaned-resources/30-install-airflow-cluster.yaml.j2 index 7bf63a71..8410596e 100644 --- a/tests/templates/kuttl/orphaned-resources/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/orphaned-resources/30-install-airflow-cluster.yaml.j2 @@ -1,7 +1,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep metadata: - name: install-airflow-db + name: install-airflow timeout: 480 --- apiVersion: v1 diff --git a/tests/templates/kuttl/remote-logging/40-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/remote-logging/40-install-airflow-cluster.yaml.j2 index f3004952..694322e8 100644 --- a/tests/templates/kuttl/remote-logging/40-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/remote-logging/40-install-airflow-cluster.yaml.j2 @@ -1,7 +1,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep metadata: - name: install-airflow-db + name: install-airflow timeout: 480 --- apiVersion: v1 diff --git a/tests/templates/kuttl/resources/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/resources/30-install-airflow-cluster.yaml.j2 index 22a2b673..cda76f90 100644 --- a/tests/templates/kuttl/resources/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/resources/30-install-airflow-cluster.yaml.j2 @@ -1,7 +1,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep metadata: - name: install-airflow-db + name: install-airflow timeout: 480 --- apiVersion: v1 diff --git a/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 index 67cc7c8b..61276545 100644 --- a/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 @@ -1,7 +1,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep metadata: - name: install-airflow-db + name: install-airflow timeout: 480 --- apiVersion: v1 diff --git a/tests/templates/kuttl/triggerer/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/triggerer/30-install-airflow-cluster.yaml.j2 index ae9b08a6..9ddad88b 100644 --- a/tests/templates/kuttl/triggerer/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/triggerer/30-install-airflow-cluster.yaml.j2 @@ -1,7 +1,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep metadata: - name: install-airflow-db + name: install-airflow timeout: 480 --- apiVersion: v1 From 9a7c5d8be1e2a72cbd77b846a4f291c3d80d6082 Mon Sep 17 00:00:00 2001 From: Andrew Kenworthy Date: Tue, 9 Sep 2025 12:06:14 +0200 Subject: [PATCH 9/9] review feedback: re-worked role config --- rust/operator-binary/src/crd/mod.rs | 90 ++++++++++++++--------------- 1 file changed, 44 insertions(+), 46 deletions(-) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index f89d2ec3..772fd43c 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -404,56 +404,13 @@ impl v1alpha1::AirflowCluster { // Initialize the result with all default values as baseline let conf_defaults = AirflowConfig::default_config(&self.name_any(), role); - let role = match role { - AirflowRole::Webserver => { - &extract_role_from_webserver_config(self.spec.webservers.to_owned().context( - UnknownAirflowRoleSnafu { - role: role.to_string(), - roles: AirflowRole::roles(), - }, - )?) - } - AirflowRole::Worker => { - if let AirflowExecutor::CeleryExecutor { config } = &self.spec.executor { - config - } else { - return Err(Error::NoRoleForExecutorFailure); - } - } - AirflowRole::Scheduler => { - self.spec - .schedulers - .as_ref() - .context(UnknownAirflowRoleSnafu { - role: role.to_string(), - roles: AirflowRole::roles(), - })? - } - AirflowRole::DagProcessor => { - self.spec - .dag_processors - .as_ref() - .context(UnknownAirflowRoleSnafu { - role: role.to_string(), - roles: AirflowRole::roles(), - })? - } - AirflowRole::Triggerer => { - self.spec - .triggerers - .as_ref() - .context(UnknownAirflowRoleSnafu { - role: role.to_string(), - roles: AirflowRole::roles(), - })? - } - }; + let role_config = role.role_config(self)?; // Retrieve role resource config - let mut conf_role = role.config.config.to_owned(); + let mut conf_role = role_config.config.config; // Retrieve rolegroup specific resource config - let mut conf_rolegroup = role + let mut conf_rolegroup = role_config .role_groups .get(&rolegroup_ref.role_group) .map(|rg| rg.config.config.clone()) @@ -814,6 +771,47 @@ impl AirflowRole { Self::Worker | Self::Scheduler | Self::DagProcessor | Self::Triggerer => None, } } + + pub fn role_config( + &self, + airflow: &v1alpha1::AirflowCluster, + ) -> Result, Error> { + let role = self.to_string(); + let roles = AirflowRole::roles(); + + let role_config = match self { + AirflowRole::Webserver => &extract_role_from_webserver_config( + airflow + .spec + .webservers + .to_owned() + .context(UnknownAirflowRoleSnafu { role, roles })?, + ), + AirflowRole::Worker => { + if let AirflowExecutor::CeleryExecutor { config } = &airflow.spec.executor { + config + } else { + return Err(Error::NoRoleForExecutorFailure); + } + } + AirflowRole::Scheduler => airflow + .spec + .schedulers + .as_ref() + .context(UnknownAirflowRoleSnafu { role, roles })?, + AirflowRole::DagProcessor => airflow + .spec + .dag_processors + .as_ref() + .context(UnknownAirflowRoleSnafu { role, roles })?, + AirflowRole::Triggerer => airflow + .spec + .triggerers + .as_ref() + .context(UnknownAirflowRoleSnafu { role, roles })?, + }; + Ok(role_config.clone()) + } } fn container_debug_command() -> String {