diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d181fdd..53156d1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Added - Add a flag to determine if database initialization steps should be executed ([#669]). +- Add new roles for dag-processor and triggerer processes ([#679]). ### Fixed @@ -19,6 +20,7 @@ [#668]: https://github.com/stackabletech/airflow-operator/pull/668 [#669]: https://github.com/stackabletech/airflow-operator/pull/669 [#678]: https://github.com/stackabletech/airflow-operator/pull/678 +[#679]: https://github.com/stackabletech/airflow-operator/pull/679 [#683]: https://github.com/stackabletech/airflow-operator/pull/683 ## [25.7.0] - 2025-07-23 diff --git a/deploy/helm/airflow-operator/crds/crds.yaml b/deploy/helm/airflow-operator/crds/crds.yaml index 79612fca..06b76491 100644 --- a/deploy/helm/airflow-operator/crds/crds.yaml +++ b/deploy/helm/airflow-operator/crds/crds.yaml @@ -645,6 +645,421 @@ spec: description: Flag to stop the cluster. This means all deployed resources (e.g. Services, StatefulSets, ConfigMaps) are kept but all deployed Pods (e.g. replicas from a StatefulSet) are scaled to 0 and therefore stopped and removed. If applied at the same time with `reconciliationPaused`, the latter will pause reconciliation and `stopped` will take no effect until `reconciliationPaused` is set to false or removed. type: boolean type: object + dagProcessors: + description: The `dagProcessors` role runs the DAG processor routine for DAG preparation. + nullable: true + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + affinity: + default: + nodeAffinity: null + nodeSelector: null + podAffinity: null + podAntiAffinity: null + description: These configuration settings control [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement). + properties: + nodeAffinity: + description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + nodeSelector: + additionalProperties: + type: string + description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + podAffinity: + description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + podAntiAffinity: + description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + gracefulShutdownTimeout: + description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. + nullable: true + type: string + logging: + default: + containers: {} + enableVectorAgent: null + description: Logging configuration, learn more in the [logging concept documentation](https://docs.stackable.tech/home/nightly/concepts/logging). + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + description: Log configuration per container. + type: object + enableVectorAgent: + description: Wether or not to deploy a container with the Vector log agent. + nullable: true + type: boolean + type: object + resources: + default: + cpu: + max: null + min: null + memory: + limit: null + runtimeLimits: {} + storage: {} + description: Resource usage is configured here, this includes CPU usage, memory usage and disk storage usage, if this role needs any. + properties: + cpu: + default: + max: null + min: null + properties: + max: + description: The maximum amount of CPU cores that can be requested by Pods. Equivalent to the `limit` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + min: + description: The minimal amount of CPU cores that Pods need to run. Equivalent to the `request` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + type: object + memory: + properties: + limit: + description: 'The maximum amount of memory that should be available to the Pod. Specified as a byte [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/), which means these suffixes are supported: E, P, T, G, M, k. You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. For example, the following represent roughly the same value: `128974848, 129e6, 129M, 128974848000m, 123Mi`' + nullable: true + type: string + runtimeLimits: + description: Additional options that can be specified. + type: object + type: object + storage: + type: object + type: object + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + description: The `configOverrides` can be used to configure properties in product config files that are not exposed in the CRD. Read the [config overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#config-overrides) and consult the operator specific usage guide documentation for details on the available config files and settings for the specific product. + type: object + envOverrides: + additionalProperties: + type: string + default: {} + description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' + type: object + podOverrides: + default: {} + description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. + type: object + x-kubernetes-preserve-unknown-fields: true + roleConfig: + default: + podDisruptionBudget: + enabled: true + maxUnavailable: null + description: This is a product-agnostic RoleConfig, which is sufficient for most of the products. + properties: + podDisruptionBudget: + default: + enabled: true + maxUnavailable: null + description: |- + This struct is used to configure: + + 1. If PodDisruptionBudgets are created by the operator 2. The allowed number of Pods to be unavailable (`maxUnavailable`) + + Learn more in the [allowed Pod disruptions documentation](https://docs.stackable.tech/home/nightly/concepts/operations/pod_disruptions). + properties: + enabled: + default: true + description: Whether a PodDisruptionBudget should be written out for this role. Disabling this enables you to specify your own - custom - one. Defaults to true. + type: boolean + maxUnavailable: + description: The number of Pods that are allowed to be down because of voluntary disruptions. If you don't explicitly set this, the operator will use a sane default based upon knowledge about the individual product. + format: uint16 + minimum: 0.0 + nullable: true + type: integer + type: object + type: object + roleGroups: + additionalProperties: + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + affinity: + default: + nodeAffinity: null + nodeSelector: null + podAffinity: null + podAntiAffinity: null + description: These configuration settings control [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement). + properties: + nodeAffinity: + description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + nodeSelector: + additionalProperties: + type: string + description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + podAffinity: + description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + podAntiAffinity: + description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + gracefulShutdownTimeout: + description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. + nullable: true + type: string + logging: + default: + containers: {} + enableVectorAgent: null + description: Logging configuration, learn more in the [logging concept documentation](https://docs.stackable.tech/home/nightly/concepts/logging). + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + description: Log configuration per container. + type: object + enableVectorAgent: + description: Wether or not to deploy a container with the Vector log agent. + nullable: true + type: boolean + type: object + resources: + default: + cpu: + max: null + min: null + memory: + limit: null + runtimeLimits: {} + storage: {} + description: Resource usage is configured here, this includes CPU usage, memory usage and disk storage usage, if this role needs any. + properties: + cpu: + default: + max: null + min: null + properties: + max: + description: The maximum amount of CPU cores that can be requested by Pods. Equivalent to the `limit` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + min: + description: The minimal amount of CPU cores that Pods need to run. Equivalent to the `request` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + type: object + memory: + properties: + limit: + description: 'The maximum amount of memory that should be available to the Pod. Specified as a byte [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/), which means these suffixes are supported: E, P, T, G, M, k. You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. For example, the following represent roughly the same value: `128974848, 129e6, 129M, 128974848000m, 123Mi`' + nullable: true + type: string + runtimeLimits: + description: Additional options that can be specified. + type: object + type: object + storage: + type: object + type: object + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + description: The `configOverrides` can be used to configure properties in product config files that are not exposed in the CRD. Read the [config overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#config-overrides) and consult the operator specific usage guide documentation for details on the available config files and settings for the specific product. + type: object + envOverrides: + additionalProperties: + type: string + default: {} + description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' + type: object + podOverrides: + default: {} + description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. + type: object + x-kubernetes-preserve-unknown-fields: true + replicas: + format: uint16 + minimum: 0.0 + nullable: true + type: integer + type: object + type: object + required: + - roleGroups + type: object image: anyOf: - required: @@ -682,19 +1097,208 @@ spec: required: - name type: object - nullable: true - type: array - repo: - description: Name of the docker repo, e.g. `oci.stackable.tech/sdp` - nullable: true - type: string - stackableVersion: - description: Stackable version of the product, e.g. `23.4`, `23.4.1` or `0.0.0-dev`. If not specified, the operator will use its own version, e.g. `23.4.1`. When using a nightly operator or a pr version, it will use the nightly `0.0.0-dev` image. - nullable: true - type: string + nullable: true + type: array + repo: + description: Name of the docker repo, e.g. `oci.stackable.tech/sdp` + nullable: true + type: string + stackableVersion: + description: Stackable version of the product, e.g. `23.4`, `23.4.1` or `0.0.0-dev`. If not specified, the operator will use its own version, e.g. `23.4.1`. When using a nightly operator or a pr version, it will use the nightly `0.0.0-dev` image. + nullable: true + type: string + type: object + kubernetesExecutors: + description: With the Kuberentes executor, executor Pods are created on demand. + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + affinity: + default: + nodeAffinity: null + nodeSelector: null + podAffinity: null + podAntiAffinity: null + description: These configuration settings control [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement). + properties: + nodeAffinity: + description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + nodeSelector: + additionalProperties: + type: string + description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + podAffinity: + description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + podAntiAffinity: + description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + gracefulShutdownTimeout: + description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. + nullable: true + type: string + logging: + default: + containers: {} + enableVectorAgent: null + description: Logging configuration, learn more in the [logging concept documentation](https://docs.stackable.tech/home/nightly/concepts/logging). + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + description: Log configuration per container. + type: object + enableVectorAgent: + description: Wether or not to deploy a container with the Vector log agent. + nullable: true + type: boolean + type: object + resources: + default: + cpu: + max: null + min: null + memory: + limit: null + runtimeLimits: {} + storage: {} + description: Resource usage is configured here, this includes CPU usage, memory usage and disk storage usage, if this role needs any. + properties: + cpu: + default: + max: null + min: null + properties: + max: + description: The maximum amount of CPU cores that can be requested by Pods. Equivalent to the `limit` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + min: + description: The minimal amount of CPU cores that Pods need to run. Equivalent to the `request` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + type: object + memory: + properties: + limit: + description: 'The maximum amount of memory that should be available to the Pod. Specified as a byte [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/), which means these suffixes are supported: E, P, T, G, M, k. You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. For example, the following represent roughly the same value: `128974848, 129e6, 129M, 128974848000m, 123Mi`' + nullable: true + type: string + runtimeLimits: + description: Additional options that can be specified. + type: object + type: object + storage: + type: object + type: object + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + description: The `configOverrides` can be used to configure properties in product config files that are not exposed in the CRD. Read the [config overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#config-overrides) and consult the operator specific usage guide documentation for details on the available config files and settings for the specific product. + type: object + envOverrides: + additionalProperties: + type: string + default: {} + description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' + type: object + podOverrides: + default: {} + description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. + type: object + x-kubernetes-preserve-unknown-fields: true type: object - kubernetesExecutors: - description: With the Kuberentes executor, executor Pods are created on demand. + schedulers: + description: The `schedulers` is responsible for triggering jobs and persisting their metadata to the backend database. Jobs are scheduled on the workers/executors. + nullable: true properties: cliOverrides: additionalProperties: @@ -880,9 +1484,235 @@ spec: description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. type: object x-kubernetes-preserve-unknown-fields: true + roleConfig: + default: + podDisruptionBudget: + enabled: true + maxUnavailable: null + description: This is a product-agnostic RoleConfig, which is sufficient for most of the products. + properties: + podDisruptionBudget: + default: + enabled: true + maxUnavailable: null + description: |- + This struct is used to configure: + + 1. If PodDisruptionBudgets are created by the operator 2. The allowed number of Pods to be unavailable (`maxUnavailable`) + + Learn more in the [allowed Pod disruptions documentation](https://docs.stackable.tech/home/nightly/concepts/operations/pod_disruptions). + properties: + enabled: + default: true + description: Whether a PodDisruptionBudget should be written out for this role. Disabling this enables you to specify your own - custom - one. Defaults to true. + type: boolean + maxUnavailable: + description: The number of Pods that are allowed to be down because of voluntary disruptions. If you don't explicitly set this, the operator will use a sane default based upon knowledge about the individual product. + format: uint16 + minimum: 0.0 + nullable: true + type: integer + type: object + type: object + roleGroups: + additionalProperties: + properties: + cliOverrides: + additionalProperties: + type: string + default: {} + type: object + config: + default: {} + properties: + affinity: + default: + nodeAffinity: null + nodeSelector: null + podAffinity: null + podAntiAffinity: null + description: These configuration settings control [Pod placement](https://docs.stackable.tech/home/nightly/concepts/operations/pod_placement). + properties: + nodeAffinity: + description: Same as the `spec.affinity.nodeAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + nodeSelector: + additionalProperties: + type: string + description: Simple key-value pairs forming a nodeSelector, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + podAffinity: + description: Same as the `spec.affinity.podAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + podAntiAffinity: + description: Same as the `spec.affinity.podAntiAffinity` field on the Pod, see the [Kubernetes docs](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node) + nullable: true + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + gracefulShutdownTimeout: + description: Time period Pods have to gracefully shut down, e.g. `30m`, `1h` or `2d`. Consult the operator documentation for details. + nullable: true + type: string + logging: + default: + containers: {} + enableVectorAgent: null + description: Logging configuration, learn more in the [logging concept documentation](https://docs.stackable.tech/home/nightly/concepts/logging). + properties: + containers: + additionalProperties: + anyOf: + - required: + - custom + - {} + description: Log configuration of the container + properties: + console: + description: Configuration for the console appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + custom: + description: Custom log configuration provided in a ConfigMap + properties: + configMap: + description: ConfigMap containing the log configuration files + nullable: true + type: string + type: object + file: + description: Configuration for the file appender + nullable: true + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + loggers: + additionalProperties: + description: Configuration of a logger + properties: + level: + description: The log level threshold. Log events with a lower log level are discarded. + enum: + - TRACE + - DEBUG + - INFO + - WARN + - ERROR + - FATAL + - NONE + nullable: true + type: string + type: object + default: {} + description: Configuration per logger + type: object + type: object + description: Log configuration per container. + type: object + enableVectorAgent: + description: Wether or not to deploy a container with the Vector log agent. + nullable: true + type: boolean + type: object + resources: + default: + cpu: + max: null + min: null + memory: + limit: null + runtimeLimits: {} + storage: {} + description: Resource usage is configured here, this includes CPU usage, memory usage and disk storage usage, if this role needs any. + properties: + cpu: + default: + max: null + min: null + properties: + max: + description: The maximum amount of CPU cores that can be requested by Pods. Equivalent to the `limit` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + min: + description: The minimal amount of CPU cores that Pods need to run. Equivalent to the `request` for Pod resource configuration. Cores are specified either as a decimal point number or as milli units. For example:`1.5` will be 1.5 cores, also written as `1500m`. + nullable: true + type: string + type: object + memory: + properties: + limit: + description: 'The maximum amount of memory that should be available to the Pod. Specified as a byte [Quantity](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/quantity/), which means these suffixes are supported: E, P, T, G, M, k. You can also use the power-of-two equivalents: Ei, Pi, Ti, Gi, Mi, Ki. For example, the following represent roughly the same value: `128974848, 129e6, 129M, 128974848000m, 123Mi`' + nullable: true + type: string + runtimeLimits: + description: Additional options that can be specified. + type: object + type: object + storage: + type: object + type: object + type: object + configOverrides: + additionalProperties: + additionalProperties: + type: string + type: object + default: {} + description: The `configOverrides` can be used to configure properties in product config files that are not exposed in the CRD. Read the [config overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#config-overrides) and consult the operator specific usage guide documentation for details on the available config files and settings for the specific product. + type: object + envOverrides: + additionalProperties: + type: string + default: {} + description: '`envOverrides` configure environment variables to be set in the Pods. It is a map from strings to strings - environment variables and the value to set. Read the [environment variable overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#env-overrides) for more information and consult the operator specific usage guide to find out about the product specific environment variables that are available.' + type: object + podOverrides: + default: {} + description: In the `podOverrides` property you can define a [PodTemplateSpec](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.27/#podtemplatespec-v1-core) to override any property that can be set on a Kubernetes Pod. Read the [Pod overrides documentation](https://docs.stackable.tech/home/nightly/concepts/overrides#pod-overrides) for more information. + type: object + x-kubernetes-preserve-unknown-fields: true + replicas: + format: uint16 + minimum: 0.0 + nullable: true + type: integer + type: object + type: object + required: + - roleGroups type: object - schedulers: - description: The `scheduler` is responsible for triggering jobs and persisting their metadata to the backend database. Jobs are scheduled on the workers/executors. + triggerers: + description: The `triggerers` role runs the triggerer process for use with deferrable DAG operators. nullable: true properties: cliOverrides: @@ -1297,7 +2127,7 @@ spec: - roleGroups type: object webservers: - description: The `webserver` role provides the main UI for user interaction. + description: The `webservers` role provides the main UI for user interaction. nullable: true properties: cliOverrides: diff --git a/docs/modules/airflow/examples/getting_started/code/airflow.yaml b/docs/modules/airflow/examples/getting_started/code/airflow.yaml index c3522e2b..5039134c 100644 --- a/docs/modules/airflow/examples/getting_started/code/airflow.yaml +++ b/docs/modules/airflow/examples/getting_started/code/airflow.yaml @@ -19,8 +19,16 @@ spec: celeryExecutors: roleGroups: default: - replicas: 2 + replicas: 1 schedulers: roleGroups: default: replicas: 1 + dagProcessors: + roleGroups: + default: + replicas: 1 + triggerers: + roleGroups: + default: + replicas: 1 diff --git a/docs/modules/airflow/examples/getting_started/code/getting_started.sh b/docs/modules/airflow/examples/getting_started/code/getting_started.sh index 2cd9f692..4bed0189 100755 --- a/docs/modules/airflow/examples/getting_started/code/getting_started.sh +++ b/docs/modules/airflow/examples/getting_started/code/getting_started.sh @@ -88,6 +88,8 @@ echo "Awaiting Airflow rollout finish ..." kubectl rollout status --watch --timeout=5m statefulset/airflow-webserver-default kubectl rollout status --watch --timeout=5m statefulset/airflow-worker-default kubectl rollout status --watch --timeout=5m statefulset/airflow-scheduler-default +kubectl rollout status --watch --timeout=5m statefulset/airflow-dagprocessor-default +kubectl rollout status --watch --timeout=5m statefulset/airflow-triggerer-default # end::watch-airflow-rollout[] echo "Starting port-forwarding of port 8080" diff --git a/docs/modules/airflow/examples/getting_started/code/getting_started.sh.j2 b/docs/modules/airflow/examples/getting_started/code/getting_started.sh.j2 index 00adc93e..beaab225 100755 --- a/docs/modules/airflow/examples/getting_started/code/getting_started.sh.j2 +++ b/docs/modules/airflow/examples/getting_started/code/getting_started.sh.j2 @@ -88,6 +88,8 @@ echo "Awaiting Airflow rollout finish ..." kubectl rollout status --watch --timeout=5m statefulset/airflow-webserver-default kubectl rollout status --watch --timeout=5m statefulset/airflow-worker-default kubectl rollout status --watch --timeout=5m statefulset/airflow-scheduler-default +kubectl rollout status --watch --timeout=5m statefulset/airflow-dagprocessor-default +kubectl rollout status --watch --timeout=5m statefulset/airflow-triggerer-default # end::watch-airflow-rollout[] echo "Starting port-forwarding of port 8080" diff --git a/docs/modules/airflow/pages/getting_started/first_steps.adoc b/docs/modules/airflow/pages/getting_started/first_steps.adoc index 37e395f1..3ee569ff 100644 --- a/docs/modules/airflow/pages/getting_started/first_steps.adoc +++ b/docs/modules/airflow/pages/getting_started/first_steps.adoc @@ -38,11 +38,15 @@ NOTE: The admin user is disabled if you use a non-default authentication mechani == Airflow -An Airflow cluster is made of up three components: +An Airflow cluster is made up of several components, two of which are optional: * `webserver`: this provides the main UI for user-interaction * `executors`: the CeleryExecutor or KubernetesExecutor nodes over which the job workload is distributed by the scheduler * `scheduler`: responsible for triggering jobs and persisting their metadata to the backend database +* `dagProcessors`: (Optional) responsible for monitoring, parsing and preparing DAGs for processing. +If this role is not specified then the process will be started as a scheduler subprocess (Airflow 2.x), or as a standalone process in the same container as the scheduler (Airflow 3.x+) +* `triggerers`: (Optional) DAGs making use of deferrable operators can be used together with one or more triggerer processes to free up worker slots. +This deferral process is also useful for providing a measure of high availability Create a file named `airflow.yaml` with the following contents: @@ -92,7 +96,9 @@ airflow-redis-master 1/1 16m airflow-redis-replicas 1/1 16m airflow-scheduler-default 1/1 11m airflow-webserver-default 1/1 11m -airflow-celery-executor-default 2/2 11m +airflow-celery-executor-default 1/1 11m +airflow-dagprocessor-default 1/1 11m +airflow-triggerer-default 1/1 11m ---- When the Airflow cluster has been created and the database is initialized, Airflow can be opened in the diff --git a/docs/modules/airflow/pages/index.adoc b/docs/modules/airflow/pages/index.adoc index 58a32b69..29bb0663 100644 --- a/docs/modules/airflow/pages/index.adoc +++ b/docs/modules/airflow/pages/index.adoc @@ -3,11 +3,11 @@ :keywords: Stackable Operator, Apache Airflow, Kubernetes, k8s, operator, job pipeline, scheduler, ETL :airflow: https://airflow.apache.org/ :dags: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html -:k8s-crs: https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/ :github: https://github.com/stackabletech/airflow-operator/ :crd: {crd-docs-base-url}/airflow-operator/{crd-docs-version}/ :crd-airflowcluster: {crd-docs}/airflow.stackable.tech/airflowcluster/v1alpha1/ :feature-tracker: https://features.stackable.tech/unified +:deferrable-operators: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html#deferrable-operators-triggers [.link-bar] * {github}[GitHub {external-link-icon}^] @@ -27,7 +27,8 @@ It guides you through installing the operator alongside a PostgreSQL database an === Custom resources The AirflowCluster is the resource for the configuration of the Airflow instance. -The resource defines three xref:concepts:roles-and-role-groups.adoc[roles]: `webserver`, `worker` and `scheduler` (the `worker` role is embedded within `spec.celeryExecutors`: this is described in the next section). +The custom resource defines the following xref:concepts:roles-and-role-groups.adoc[roles]: `webserver`, `worker`, `scheduler`, `dagProcessor` and `triggerer` (the `worker` role is embedded within `spec.celeryExecutors`: this is described in the next section). +The `dagProcessor` and `triggerer` roles are optional. The various configuration options are explained in the xref:usage-guide/index.adoc[]. It helps you tune your cluster to your needs by configuring xref:usage-guide/storage-resources.adoc[resource usage], xref:usage-guide/security.adoc[security], xref:usage-guide/logging.adoc[logging] and more. @@ -70,6 +71,17 @@ kubernetesExecutors: ... ---- +=== DAG-Processors + +In Airflow 2.x, a DAG-Processor can be started either as a standalone process or a subprocess within the scheduler component. +For Airflow 3.x+ it _must_ be started as a standalone process, either in a separate container or in the scheduler container. +In each case the default will be applied (subprocess or combined in the scheduler container) if the role is not specified. + +=== Triggerers + +DAGs using deferrable operators can be combined with the triggerer component to free up worker slots and/or provide high availability. +For more information, please refer to the {deferrable-operators}[documentation {external-link-icon}^]. + === Kubernetes resources Based on the custom resources you define, the operator creates ConfigMaps, StatefulSets and Services. diff --git a/docs/modules/airflow/pages/usage-guide/storage-resources.adoc b/docs/modules/airflow/pages/usage-guide/storage-resources.adoc index df37b4b7..3c399557 100644 --- a/docs/modules/airflow/pages/usage-guide/storage-resources.adoc +++ b/docs/modules/airflow/pages/usage-guide/storage-resources.adoc @@ -1,14 +1,15 @@ = Resource Requests -:description: Find out about minimal HA Airflow requirements for CPU and memory, with defaults for schedulers, Celery executors, webservers using Kubernetes resource limits. +:description: Find out about minimal HA Airflow requirements for CPU and memory, with defaults for schedulers, Celery executors, webservers, dagProcessors and triggerers using Kubernetes resource limits. include::home:concepts:stackable_resource_requests.adoc[] -A minimal HA setup consisting of 2 schedulers, 2 workers and 2 webservers has the following https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/[resource requirements]: +A minimal HA setup consisting of 2 schedulers, 2 workers, 2 webservers, 2 dag-processors and 1 triggerer has the following https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/[resource requirements]: -* `8700m` CPU request -* `17400m` CPU limit -* `15872Mi` memory request and limit +* `11600` CPU request +* `23200` CPU limit +* `18432Mi` memory request and limit +This includes auxiliary containers for logging, metrics, and gitsync. Corresponding to the values above, the operator uses the following resource defaults: [source,yaml] @@ -22,6 +23,9 @@ spec: max: "2" memory: limit: 1Gi + roleGroups: + default: + replicas: 2 celeryExecutors: config: resources: @@ -30,6 +34,9 @@ spec: max: "2" memory: limit: 3Gi + roleGroups: + default: + replicas: 2 webservers: config: resources: @@ -38,4 +45,29 @@ spec: max: "2" memory: limit: 3Gi + roleGroups: + default: + replicas: 2 + dagProcessors: + config: + resources: + cpu: + min: "1" + max: "2" + memory: + limit: 1Gi + roleGroups: + default: + replicas: 2 + triggerers: + config: + resources: + cpu: + min: "1" + max: "2" + memory: + limit: 1Gi + roleGroups: + default: + replicas: 1 ---- diff --git a/rust/operator-binary/src/airflow_controller.rs b/rust/operator-binary/src/airflow_controller.rs index b06e1b6c..13419ff1 100644 --- a/rust/operator-binary/src/airflow_controller.rs +++ b/rust/operator-binary/src/airflow_controller.rs @@ -1156,7 +1156,10 @@ fn build_server_rolegroup_statefulset( AirflowRole::Scheduler => { "OrderedReady" // Scheduler pods should start after another, since part of their startup phase is initializing the database, see crd/src/lib.rs } - AirflowRole::Webserver | AirflowRole::Worker => "Parallel", + AirflowRole::Webserver + | AirflowRole::Worker + | AirflowRole::DagProcessor + | AirflowRole::Triggerer => "Parallel", } .to_string(), ), diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index 8f92e824..772fd43c 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -205,17 +205,25 @@ pub mod versioned { #[serde(default)] pub cluster_operation: ClusterOperation, - /// The `webserver` role provides the main UI for user interaction. + /// The `webservers` role provides the main UI for user interaction. #[serde(default, skip_serializing_if = "Option::is_none")] pub webservers: Option>, - /// The `scheduler` is responsible for triggering jobs and persisting their metadata to the backend database. + /// The `schedulers` is responsible for triggering jobs and persisting their metadata to the backend database. /// Jobs are scheduled on the workers/executors. #[serde(default, skip_serializing_if = "Option::is_none")] pub schedulers: Option>, #[serde(flatten)] pub executor: AirflowExecutor, + + /// The `dagProcessors` role runs the DAG processor routine for DAG preparation. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub dag_processors: Option>, + + /// The `triggerers` role runs the triggerer process for use with deferrable DAG operators. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub triggerers: Option>, } #[derive(Clone, Deserialize, Debug, JsonSchema, PartialEq, Serialize)] @@ -342,7 +350,10 @@ impl v1alpha1::AirflowCluster { pub fn group_listener_name(&self, role: &AirflowRole) -> Option { match role { AirflowRole::Webserver => Some(role_service_name(&self.name_any(), &role.to_string())), - AirflowRole::Scheduler | AirflowRole::Worker => None, + AirflowRole::Scheduler + | AirflowRole::Worker + | AirflowRole::DagProcessor + | AirflowRole::Triggerer => None, } } @@ -356,6 +367,8 @@ impl v1alpha1::AirflowCluster { .to_owned() .map(extract_role_from_webserver_config), AirflowRole::Scheduler => self.spec.schedulers.to_owned(), + AirflowRole::DagProcessor => self.spec.dag_processors.to_owned(), + AirflowRole::Triggerer => self.spec.triggerers.to_owned(), AirflowRole::Worker => { if let AirflowExecutor::CeleryExecutor { config } = &self.spec.executor { Some(config.clone()) @@ -391,38 +404,13 @@ impl v1alpha1::AirflowCluster { // Initialize the result with all default values as baseline let conf_defaults = AirflowConfig::default_config(&self.name_any(), role); - let role = match role { - AirflowRole::Webserver => { - &extract_role_from_webserver_config(self.spec.webservers.to_owned().context( - UnknownAirflowRoleSnafu { - role: role.to_string(), - roles: AirflowRole::roles(), - }, - )?) - } - AirflowRole::Worker => { - if let AirflowExecutor::CeleryExecutor { config } = &self.spec.executor { - config - } else { - return Err(Error::NoRoleForExecutorFailure); - } - } - AirflowRole::Scheduler => { - self.spec - .schedulers - .as_ref() - .context(UnknownAirflowRoleSnafu { - role: role.to_string(), - roles: AirflowRole::roles(), - })? - } - }; + let role_config = role.role_config(self)?; // Retrieve role resource config - let mut conf_role = role.config.config.to_owned(); + let mut conf_role = role_config.config.config; // Retrieve rolegroup specific resource config - let mut conf_rolegroup = role + let mut conf_rolegroup = role_config .role_groups .get(&rolegroup_ref.role_group) .map(|rg| rg.config.config.clone()) @@ -564,6 +552,12 @@ pub enum AirflowRole { #[strum(serialize = "worker")] Worker, + + #[strum(serialize = "dagprocessor")] + DagProcessor, + + #[strum(serialize = "triggerer")] + Triggerer, } impl AirflowRole { @@ -625,10 +619,27 @@ impl AirflowRole { command.extend(vec![ "prepare_signal_handlers".to_string(), container_debug_command(), - "airflow dag-processor &".to_string(), "airflow scheduler &".to_string(), ]); + if airflow.spec.dag_processors.is_none() { + // If no dag_processors role has been specified, the + // process needs to be included with the scheduler + // (with 3.x there is no longer the possibility of + // starting it as a subprocess, so it has to be + // explicitly started *somewhere*) + command.extend(vec!["airflow dag-processor &".to_string()]); + } } + AirflowRole::DagProcessor => command.extend(vec![ + "prepare_signal_handlers".to_string(), + container_debug_command(), + "airflow dag-processor &".to_string(), + ]), + AirflowRole::Triggerer => command.extend(vec![ + "prepare_signal_handlers".to_string(), + container_debug_command(), + "airflow triggerer &".to_string(), + ]), AirflowRole::Worker => command.extend(vec![ "prepare_signal_handlers".to_string(), container_debug_command(), @@ -671,6 +682,16 @@ impl AirflowRole { "airflow scheduler &".to_string(), ]); } + AirflowRole::DagProcessor => command.extend(vec![ + "prepare_signal_handlers".to_string(), + container_debug_command(), + "airflow dag-processor &".to_string(), + ]), + AirflowRole::Triggerer => command.extend(vec![ + "prepare_signal_handlers".to_string(), + container_debug_command(), + "airflow triggerer &".to_string(), + ]), AirflowRole::Worker => command.extend(vec![ "prepare_signal_handlers".to_string(), container_debug_command(), @@ -727,6 +748,8 @@ impl AirflowRole { AirflowRole::Webserver => Some(HTTP_PORT), AirflowRole::Scheduler => None, AirflowRole::Worker => None, + AirflowRole::DagProcessor => None, + AirflowRole::Triggerer => None, } } @@ -745,9 +768,50 @@ impl AirflowRole { .webservers .to_owned() .map(|webserver| webserver.role_config.listener_class), - Self::Worker | Self::Scheduler => None, + Self::Worker | Self::Scheduler | Self::DagProcessor | Self::Triggerer => None, } } + + pub fn role_config( + &self, + airflow: &v1alpha1::AirflowCluster, + ) -> Result, Error> { + let role = self.to_string(); + let roles = AirflowRole::roles(); + + let role_config = match self { + AirflowRole::Webserver => &extract_role_from_webserver_config( + airflow + .spec + .webservers + .to_owned() + .context(UnknownAirflowRoleSnafu { role, roles })?, + ), + AirflowRole::Worker => { + if let AirflowExecutor::CeleryExecutor { config } = &airflow.spec.executor { + config + } else { + return Err(Error::NoRoleForExecutorFailure); + } + } + AirflowRole::Scheduler => airflow + .spec + .schedulers + .as_ref() + .context(UnknownAirflowRoleSnafu { role, roles })?, + AirflowRole::DagProcessor => airflow + .spec + .dag_processors + .as_ref() + .context(UnknownAirflowRoleSnafu { role, roles })?, + AirflowRole::Triggerer => airflow + .spec + .triggerers + .as_ref() + .context(UnknownAirflowRoleSnafu { role, roles })?, + }; + Ok(role_config.clone()) + } } fn container_debug_command() -> String { @@ -881,9 +945,10 @@ impl AirflowConfig { logging: product_logging::spec::default_logging(), affinity: get_affinity(cluster_name, role), graceful_shutdown_timeout: Some(match role { - AirflowRole::Webserver | AirflowRole::Scheduler => { - DEFAULT_AIRFLOW_GRACEFUL_SHUTDOWN_TIMEOUT - } + AirflowRole::Webserver + | AirflowRole::Scheduler + | AirflowRole::DagProcessor + | AirflowRole::Triggerer => DEFAULT_AIRFLOW_GRACEFUL_SHUTDOWN_TIMEOUT, AirflowRole::Worker => DEFAULT_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT, }), } @@ -956,6 +1021,26 @@ fn default_resources(role: &AirflowRole) -> ResourcesFragment ( + CpuLimitsFragment { + min: Some(Quantity("1".to_owned())), + max: Some(Quantity("2".to_owned())), + }, + MemoryLimitsFragment { + limit: Some(Quantity("1Gi".to_owned())), + runtime_limits: NoRuntimeLimitsFragment {}, + }, + ), + AirflowRole::Triggerer => ( + CpuLimitsFragment { + min: Some(Quantity("1".to_owned())), + max: Some(Quantity("2".to_owned())), + }, + MemoryLimitsFragment { + limit: Some(Quantity("1Gi".to_owned())), + runtime_limits: NoRuntimeLimitsFragment {}, + }, + ), }; ResourcesFragment { diff --git a/rust/operator-binary/src/env_vars.rs b/rust/operator-binary/src/env_vars.rs index 7e70491f..610d9afe 100644 --- a/rust/operator-binary/src/env_vars.rs +++ b/rust/operator-binary/src/env_vars.rs @@ -15,8 +15,8 @@ use stackable_operator::{ use crate::{ crd::{ - AirflowConfig, AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR, - STACKABLE_LOG_DIR, TEMPLATE_LOCATION, TEMPLATE_NAME, + AirflowExecutor, AirflowRole, ExecutorConfig, LOG_CONFIG_DIR, STACKABLE_LOG_DIR, + TEMPLATE_LOCATION, TEMPLATE_NAME, authentication::{ AirflowAuthenticationClassResolved, AirflowClientAuthenticationDetailsResolved, }, @@ -86,57 +86,53 @@ pub fn build_airflow_statefulset_envs( resolved_product_image: &ResolvedProductImage, ) -> Result, Error> { let mut env: BTreeMap = BTreeMap::new(); + let secret = airflow.spec.cluster_config.credentials_secret.as_str(); env.extend(static_envs(git_sync_resources)); - add_version_specific_env_vars(airflow, airflow_role, resolved_product_image, &mut env); - // environment variables let env_vars = rolegroup_config.get(&PropertyNameKind::Env); - let secret_prop = - env_vars.and_then(|vars| vars.get(AirflowConfig::CREDENTIALS_SECRET_PROPERTY)); + add_version_specific_env_vars(airflow, airflow_role, resolved_product_image, &mut env); + + env.insert( + AIRFLOW_WEBSERVER_SECRET_KEY.into(), + // The secret key is used to run the webserver flask app and also used to authorize + // requests to Celery workers when logs are retrieved. + env_var_from_secret( + AIRFLOW_WEBSERVER_SECRET_KEY, + secret, + "connections.secretKey", + ), + ); + env.insert( + AIRFLOW_DATABASE_SQL_ALCHEMY_CONN.into(), + env_var_from_secret( + AIRFLOW_DATABASE_SQL_ALCHEMY_CONN, + secret, + "connections.sqlalchemyDatabaseUri", + ), + ); - if let Some(secret) = secret_prop { + // Redis is only needed when celery executors are used + // see https://github.com/stackabletech/airflow-operator/issues/424 for details + if matches!(executor, AirflowExecutor::CeleryExecutor { .. }) { env.insert( - AIRFLOW_WEBSERVER_SECRET_KEY.into(), - // The secret key is used to run the webserver flask app and also used to authorize - // requests to Celery workers when logs are retrieved. + AIRFLOW_CELERY_RESULT_BACKEND.into(), env_var_from_secret( - AIRFLOW_WEBSERVER_SECRET_KEY, + AIRFLOW_CELERY_RESULT_BACKEND, secret, - "connections.secretKey", + "connections.celeryResultBackend", ), ); env.insert( - AIRFLOW_DATABASE_SQL_ALCHEMY_CONN.into(), + AIRFLOW_CELERY_BROKER_URL.into(), env_var_from_secret( - AIRFLOW_DATABASE_SQL_ALCHEMY_CONN, + AIRFLOW_CELERY_BROKER_URL, secret, - "connections.sqlalchemyDatabaseUri", + "connections.celeryBrokerUrl", ), ); - - // Redis is only needed when celery executors are used - // see https://github.com/stackabletech/airflow-operator/issues/424 for details - if matches!(executor, AirflowExecutor::CeleryExecutor { .. }) { - env.insert( - AIRFLOW_CELERY_RESULT_BACKEND.into(), - env_var_from_secret( - AIRFLOW_CELERY_RESULT_BACKEND, - secret, - "connections.celeryResultBackend", - ), - ); - env.insert( - AIRFLOW_CELERY_BROKER_URL.into(), - env_var_from_secret( - AIRFLOW_CELERY_BROKER_URL, - secret, - "connections.celeryBrokerUrl", - ), - ); - } } let dags_folder = get_dags_folder(git_sync_resources); @@ -527,6 +523,19 @@ fn add_version_specific_env_vars( ..Default::default() }, ); + if airflow.spec.dag_processors.is_some() { + // In airflow 2.x the dag-processor can optionally be started as a + // standalone process (rather then as a scheduler subprocess), + // accompanied by this env-var being set to True. + env.insert( + "AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR".into(), + EnvVar { + name: "AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR".into(), + value: Some("True".into()), + ..Default::default() + }, + ); + } } } diff --git a/rust/operator-binary/src/operations/pdb.rs b/rust/operator-binary/src/operations/pdb.rs index 9bfdd0aa..8b471082 100644 --- a/rust/operator-binary/src/operations/pdb.rs +++ b/rust/operator-binary/src/operations/pdb.rs @@ -37,6 +37,8 @@ pub async fn add_pdbs( let max_unavailable = pdb.max_unavailable.unwrap_or(match role { AirflowRole::Scheduler => max_unavailable_schedulers(), AirflowRole::Webserver => max_unavailable_webservers(), + AirflowRole::DagProcessor => max_unavailable_dag_processors(), + AirflowRole::Triggerer => max_unavailable_triggerers(), AirflowRole::Worker => match airflow.spec.executor { AirflowExecutor::CeleryExecutor { .. } => max_unavailable_workers(), AirflowExecutor::KubernetesExecutor { .. } => { @@ -77,3 +79,11 @@ fn max_unavailable_workers() -> u16 { fn max_unavailable_webservers() -> u16 { 1 } + +fn max_unavailable_dag_processors() -> u16 { + 1 +} + +fn max_unavailable_triggerers() -> u16 { + 1 +} diff --git a/tests/templates/kuttl/ldap/60-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/ldap/60-install-airflow-cluster.yaml.j2 index b87f0d8e..fb8ced2c 100644 --- a/tests/templates/kuttl/ldap/60-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/ldap/60-install-airflow-cluster.yaml.j2 @@ -1,7 +1,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep metadata: - name: install-airflow-db + name: install-airflow timeout: 480 commands: - script: | diff --git a/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 index 324c0642..e11f6bc0 100644 --- a/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/mount-dags-configmap/30-install-airflow-cluster.yaml.j2 @@ -1,7 +1,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep metadata: - name: install-airflow-db + name: install-airflow timeout: 480 --- apiVersion: v1 diff --git a/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 index 113a2346..5b0d5ca4 100644 --- a/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/mount-dags-gitsync/30-install-airflow-cluster.yaml.j2 @@ -1,7 +1,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep metadata: - name: install-airflow-db + name: install-airflow timeout: 480 --- apiVersion: v1 diff --git a/tests/templates/kuttl/orphaned-resources/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/orphaned-resources/30-install-airflow-cluster.yaml.j2 index 7bf63a71..8410596e 100644 --- a/tests/templates/kuttl/orphaned-resources/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/orphaned-resources/30-install-airflow-cluster.yaml.j2 @@ -1,7 +1,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep metadata: - name: install-airflow-db + name: install-airflow timeout: 480 --- apiVersion: v1 diff --git a/tests/templates/kuttl/remote-logging/40-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/remote-logging/40-install-airflow-cluster.yaml.j2 index f3004952..694322e8 100644 --- a/tests/templates/kuttl/remote-logging/40-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/remote-logging/40-install-airflow-cluster.yaml.j2 @@ -1,7 +1,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep metadata: - name: install-airflow-db + name: install-airflow timeout: 480 --- apiVersion: v1 diff --git a/tests/templates/kuttl/resources/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/resources/30-install-airflow-cluster.yaml.j2 index 22a2b673..cda76f90 100644 --- a/tests/templates/kuttl/resources/30-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/resources/30-install-airflow-cluster.yaml.j2 @@ -1,7 +1,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep metadata: - name: install-airflow-db + name: install-airflow timeout: 480 --- apiVersion: v1 diff --git a/tests/templates/kuttl/smoke/40-assert.yaml.j2 b/tests/templates/kuttl/smoke/40-assert.yaml.j2 index 81337261..0a2aed84 100644 --- a/tests/templates/kuttl/smoke/40-assert.yaml.j2 +++ b/tests/templates/kuttl/smoke/40-assert.yaml.j2 @@ -51,6 +51,30 @@ status: readyReplicas: 1 replicas: 1 --- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-dagprocessor-default +spec: + template: + spec: + terminationGracePeriodSeconds: 120 +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-triggerer-default +spec: + template: + spec: + terminationGracePeriodSeconds: 120 +status: + readyReplicas: 1 + replicas: 1 +--- apiVersion: policy/v1 kind: PodDisruptionBudget metadata: @@ -79,3 +103,21 @@ status: expectedPods: 1 currentHealthy: 1 disruptionsAllowed: 1 +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: airflow-dagprocessor +status: + expectedPods: 1 + currentHealthy: 1 + disruptionsAllowed: 1 +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: airflow-triggerer +status: + expectedPods: 1 + currentHealthy: 1 + disruptionsAllowed: 1 diff --git a/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 index a4793606..61276545 100644 --- a/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 +++ b/tests/templates/kuttl/smoke/40-install-airflow-cluster.yaml.j2 @@ -1,7 +1,7 @@ apiVersion: kuttl.dev/v1beta1 kind: TestStep metadata: - name: install-airflow-db + name: install-airflow timeout: 480 --- apiVersion: v1 @@ -82,3 +82,17 @@ spec: roleGroups: default: replicas: 1 + dagProcessors: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 1 + triggerers: + config: + logging: + enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }} + roleGroups: + default: + replicas: 1 diff --git a/tests/templates/kuttl/triggerer/00-patch-ns.yaml.j2 b/tests/templates/kuttl/triggerer/00-patch-ns.yaml.j2 new file mode 100644 index 00000000..67185acf --- /dev/null +++ b/tests/templates/kuttl/triggerer/00-patch-ns.yaml.j2 @@ -0,0 +1,9 @@ +{% if test_scenario['values']['openshift'] == 'true' %} +# see https://github.com/stackabletech/issues/issues/566 +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: kubectl patch namespace $NAMESPACE -p '{"metadata":{"labels":{"pod-security.kubernetes.io/enforce":"privileged"}}}' + timeout: 120 +{% endif %} diff --git a/tests/templates/kuttl/triggerer/05-assert.yaml b/tests/templates/kuttl/triggerer/05-assert.yaml new file mode 100644 index 00000000..319e927a --- /dev/null +++ b/tests/templates/kuttl/triggerer/05-assert.yaml @@ -0,0 +1,14 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-postgresql +timeout: 480 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-postgresql +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/triggerer/05-install-postgresql.yaml b/tests/templates/kuttl/triggerer/05-install-postgresql.yaml new file mode 100644 index 00000000..dc25ba20 --- /dev/null +++ b/tests/templates/kuttl/triggerer/05-install-postgresql.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install airflow-postgresql + --namespace $NAMESPACE + --version 16.4.2 + -f helm-bitnami-postgresql-values.yaml + oci://registry-1.docker.io/bitnamicharts/postgresql + timeout: 600 diff --git a/tests/templates/kuttl/triggerer/10-assert.yaml.j2 b/tests/templates/kuttl/triggerer/10-assert.yaml.j2 new file mode 100644 index 00000000..8d585401 --- /dev/null +++ b/tests/templates/kuttl/triggerer/10-assert.yaml.j2 @@ -0,0 +1,24 @@ +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-redis +timeout: 360 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-redis-master +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-redis-replicas +status: + readyReplicas: 1 + replicas: 1 +{% endif %} diff --git a/tests/templates/kuttl/triggerer/10-install-redis.yaml.j2 b/tests/templates/kuttl/triggerer/10-install-redis.yaml.j2 new file mode 100644 index 00000000..aae9a14f --- /dev/null +++ b/tests/templates/kuttl/triggerer/10-install-redis.yaml.j2 @@ -0,0 +1,13 @@ +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + - script: >- + helm install airflow-redis + --namespace $NAMESPACE + --version 17.11.3 + -f helm-bitnami-redis-values.yaml + --repo https://charts.bitnami.com/bitnami redis + timeout: 600 +{% endif %} diff --git a/tests/templates/kuttl/triggerer/30-assert.yaml.j2 b/tests/templates/kuttl/triggerer/30-assert.yaml.j2 new file mode 100644 index 00000000..6ffaabb2 --- /dev/null +++ b/tests/templates/kuttl/triggerer/30-assert.yaml.j2 @@ -0,0 +1,32 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-cluster +timeout: 1200 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-webserver-default +status: + readyReplicas: 1 + replicas: 1 +{% if test_scenario['values']['executor'] == 'celery' %} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-worker-default +status: + readyReplicas: 1 + replicas: 1 +{% endif %} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: airflow-scheduler-default +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/triggerer/30-install-airflow-cluster.yaml.j2 b/tests/templates/kuttl/triggerer/30-install-airflow-cluster.yaml.j2 new file mode 100644 index 00000000..9ddad88b --- /dev/null +++ b/tests/templates/kuttl/triggerer/30-install-airflow-cluster.yaml.j2 @@ -0,0 +1,143 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +metadata: + name: install-airflow +timeout: 480 +--- +apiVersion: v1 +kind: Secret +metadata: + name: test-airflow-credentials +type: Opaque +stringData: + adminUser.username: airflow + adminUser.firstname: Airflow + adminUser.lastname: Admin + adminUser.email: airflow@airflow.com + adminUser.password: airflow + connections.secretKey: thisISaSECRET_1234 + connections.sqlalchemyDatabaseUri: postgresql+psycopg2://airflow:airflow@airflow-postgresql/airflow +{% if test_scenario['values']['executor'] == 'celery' %} + connections.celeryResultBackend: db+postgresql://airflow:airflow@airflow-postgresql/airflow + connections.celeryBrokerUrl: redis://:redis@airflow-redis-master:6379/0 +{% endif %} +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: triggerer-dag +data: + triggerer_dag.py: | + from datetime import datetime, timedelta + + from airflow import DAG + from airflow.models.baseoperator import BaseOperator + from airflow.triggers.temporal import TimeDeltaTrigger + from airflow.utils.context import Context + from airflow.operators.empty import EmptyOperator + + # ------------------------------------------------------ + # Custom deferrable operator - does a simple async sleep + # ------------------------------------------------------ + class CoreDeferrableSleepOperator(BaseOperator): + """ + Sleeps for ``duration`` seconds without occupying a worker. + The async hand-off happens via ``self.defer`` + ``TimeDeltaTrigger``. + """ + ui_color = "#ffefeb" + + def __init__(self, *, duration: int, **kwargs): + super().__init__(**kwargs) + self.duration = duration + + def execute(self, context: Context): + """Run on a worker, then hand control to the Triggerer.""" + # Build the trigger that will fire after `duration` seconds. + trigger = TimeDeltaTrigger(timedelta(seconds=self.duration)) + + # *** Asynchronous hand-off *** + # This tells the scheduler: “pause this task, let the Triggerer watch the timer”. + self.defer(trigger=trigger, method_name="execute_complete") + + def execute_complete(self, context: Context, event=None): + """Resumes here once the Triggerer fires.""" + self.log.info("Deferrable sleep of %s seconds finished.", self.duration) + return "DONE" + + default_args = {"owner": "stackable", "retries": 0} + + with DAG( + dag_id="core_deferrable_sleep_demo", + schedule=None, + # N.B. this be earlier than the current timestamp! + start_date=datetime(2025, 8, 1), + catchup=False, + default_args=default_args, + tags=["example", "triggerer"], + ) as dag: + + sleep = CoreDeferrableSleepOperator( + task_id="deferrable_sleep", + duration=10, + ) + + sleep +--- +apiVersion: airflow.stackable.tech/v1alpha1 +kind: AirflowCluster +metadata: + name: airflow +spec: + image: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + custom: "{{ test_scenario['values']['airflow-latest'].split(',')[1] }}" + productVersion: "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + productVersion: "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} + pullPolicy: IfNotPresent + clusterConfig: + credentialsSecret: test-airflow-credentials + volumes: + - name: triggerer-dag + configMap: + name: triggerer-dag + volumeMounts: + - name: triggerer-dag + mountPath: /dags/triggerer_dag.py + subPath: triggerer_dag.py + webservers: + roleConfig: + listenerClass: external-unstable + roleGroups: + default: + envOverrides: &envOverrides + AIRFLOW__CORE__DAGS_FOLDER: "/dags" + replicas: 1 +{% if test_scenario['values']['executor'] == 'celery' %} + celeryExecutors: + roleGroups: + default: + envOverrides: *envOverrides + replicas: 1 +{% elif test_scenario['values']['executor'] == 'kubernetes' %} + kubernetesExecutors: + envOverrides: *envOverrides +{% endif %} + schedulers: + config: + gracefulShutdownTimeout: 10s + roleGroups: + default: + envOverrides: *envOverrides + replicas: 1 + dagProcessors: + roleGroups: + default: + envOverrides: *envOverrides + replicas: 1 + triggerers: + roleGroups: + default: + envOverrides: *envOverrides + replicas: 1 diff --git a/tests/templates/kuttl/triggerer/40-assert.yaml b/tests/templates/kuttl/triggerer/40-assert.yaml new file mode 100644 index 00000000..6edaa3c3 --- /dev/null +++ b/tests/templates/kuttl/triggerer/40-assert.yaml @@ -0,0 +1,14 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-python +timeout: 240 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-airflow-python +status: + readyReplicas: 1 + replicas: 1 diff --git a/tests/templates/kuttl/triggerer/40-install-airflow-python.yaml b/tests/templates/kuttl/triggerer/40-install-airflow-python.yaml new file mode 100644 index 00000000..c3f865a0 --- /dev/null +++ b/tests/templates/kuttl/triggerer/40-install-airflow-python.yaml @@ -0,0 +1,23 @@ +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: test-airflow-python + labels: + app: test-airflow-python +spec: + replicas: 1 + selector: + matchLabels: + app: test-airflow-python + template: + metadata: + labels: + app: test-airflow-python + spec: + containers: + - name: test-airflow-python + image: oci.stackable.tech/sdp/testing-tools:0.2.0-stackable0.0.0-dev + imagePullPolicy: IfNotPresent + stdin: true + tty: true diff --git a/tests/templates/kuttl/triggerer/50-assert.yaml.j2 b/tests/templates/kuttl/triggerer/50-assert.yaml.j2 new file mode 100644 index 00000000..b85052aa --- /dev/null +++ b/tests/templates/kuttl/triggerer/50-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-airflow-webserver-health-check +timeout: 480 +commands: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/health.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} diff --git a/tests/templates/kuttl/triggerer/50-health-check.yaml b/tests/templates/kuttl/triggerer/50-health-check.yaml new file mode 100644 index 00000000..5d3b329f --- /dev/null +++ b/tests/templates/kuttl/triggerer/50-health-check.yaml @@ -0,0 +1,7 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +timeout: 480 +commands: + - script: kubectl cp -n $NAMESPACE ../../../../templates/kuttl/commons/health.py test-airflow-python-0:/tmp + timeout: 240 diff --git a/tests/templates/kuttl/triggerer/60-assert.yaml.j2 b/tests/templates/kuttl/triggerer/60-assert.yaml.j2 new file mode 100644 index 00000000..b9ce5d22 --- /dev/null +++ b/tests/templates/kuttl/triggerer/60-assert.yaml.j2 @@ -0,0 +1,12 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: triggerer_metrics +timeout: 480 +commands: +{% if test_scenario['values']['airflow-latest'].find(",") > 0 %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/triggerer_metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'].split(',')[0] }}" +{% else %} + - script: kubectl exec -n $NAMESPACE test-airflow-python-0 -- python /tmp/triggerer_metrics.py --airflow-version "{{ test_scenario['values']['airflow-latest'] }}" +{% endif %} diff --git a/tests/templates/kuttl/triggerer/60-install-metrics-script.yaml b/tests/templates/kuttl/triggerer/60-install-metrics-script.yaml new file mode 100644 index 00000000..377566b1 --- /dev/null +++ b/tests/templates/kuttl/triggerer/60-install-metrics-script.yaml @@ -0,0 +1,8 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +metadata: + name: metrics +commands: + - script: kubectl cp -n $NAMESPACE triggerer_metrics.py test-airflow-python-0:/tmp + timeout: 240 diff --git a/tests/templates/kuttl/triggerer/70-assert.yaml.j2 b/tests/templates/kuttl/triggerer/70-assert.yaml.j2 new file mode 100644 index 00000000..7227e06f --- /dev/null +++ b/tests/templates/kuttl/triggerer/70-assert.yaml.j2 @@ -0,0 +1,25 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +metadata: + name: test-log-endpoint +timeout: 240 +commands: +{% if test_scenario['values']['executor'] == 'celery' %} + - script: | + set -eu + + # Log-Endpoint Test: + # This is executed from the Webserver as JWT keys must be present. + # A small server is started on each worker that serves the logs on its + # 8793 port for the Webserver: we don't use the token as that is an + # internal implementation, but check that the endpoint is reachable, + # indicated by a 403. + CURL_RESPONSE=$( + kubectl -n $NAMESPACE exec airflow-webserver-default-0 -- sh -c 'CODE=$(curl -s -o /dev/null -w "%{http_code}" http://airflow-worker-default-headless:8793 2>/dev/null || true);echo "$CODE"' + ) + + # Log-Endpoint Test Assertion: + echo "The HTTP Code is $CURL_RESPONSE (an internal JWT token is needed for full access)" + [ "$CURL_RESPONSE" -eq 403 ] +{% endif %} diff --git a/tests/templates/kuttl/triggerer/helm-bitnami-postgresql-values.yaml.j2 b/tests/templates/kuttl/triggerer/helm-bitnami-postgresql-values.yaml.j2 new file mode 100644 index 00000000..80c50924 --- /dev/null +++ b/tests/templates/kuttl/triggerer/helm-bitnami-postgresql-values.yaml.j2 @@ -0,0 +1,37 @@ +--- +global: + security: + allowInsecureImages: true + +image: + repository: bitnamilegacy/postgresql + +volumePermissions: + enabled: false + image: + repository: bitnamilegacy/os-shell + securityContext: + runAsUser: auto + +metrics: + image: + repository: bitnamilegacy/postgres-exporter + +primary: + podSecurityContext: +{% if test_scenario['values']['openshift'] == 'true' %} + enabled: false +{% else %} + enabled: true +{% endif %} + containerSecurityContext: + enabled: false + +shmVolume: + chmod: + enabled: false + +auth: + username: airflow + password: airflow + database: airflow diff --git a/tests/templates/kuttl/triggerer/helm-bitnami-redis-values.yaml.j2 b/tests/templates/kuttl/triggerer/helm-bitnami-redis-values.yaml.j2 new file mode 100644 index 00000000..0198dc51 --- /dev/null +++ b/tests/templates/kuttl/triggerer/helm-bitnami-redis-values.yaml.j2 @@ -0,0 +1,49 @@ +--- +global: + security: + allowInsecureImages: true # needed starting with Chart version 20.5.0 if modifying images +image: + repository: bitnamilegacy/redis +sentinel: + image: + repository: bitnamilegacy/redis-sentinel +metrics: + image: + repository: bitnamilegacy/redis-exporter +kubectl: + image: + repository: bitnamilegacy/kubectl +sysctl: + image: + repository: bitnamilegacy/os-shell + +volumePermissions: + enabled: false + image: + repository: bitnamilegacy/os-shell + containerSecurityContext: + runAsUser: auto + +master: + podSecurityContext: +{% if test_scenario['values']['openshift'] == 'true' %} + enabled: false +{% else %} + enabled: true +{% endif %} + containerSecurityContext: + enabled: false + +replica: + replicaCount: 1 + podSecurityContext: +{% if test_scenario['values']['openshift'] == 'true' %} + enabled: false +{% else %} + enabled: true +{% endif %} + containerSecurityContext: + enabled: false + +auth: + password: redis diff --git a/tests/templates/kuttl/triggerer/triggerer_metrics.py b/tests/templates/kuttl/triggerer/triggerer_metrics.py new file mode 100755 index 00000000..51a417a4 --- /dev/null +++ b/tests/templates/kuttl/triggerer/triggerer_metrics.py @@ -0,0 +1,175 @@ +#!/usr/bin/env python + +import requests +import time +import sys +from datetime import datetime, timezone +import argparse +import logging + + +def exception_handler(exception_type, exception, traceback): + print(f"{exception_type.__name__}: {exception.args}") + + +sys.excepthook = exception_handler + + +def assert_metric(role, role_group, metric): + metric_response = requests.get( + f"http://airflow-{role}-{role_group}-metrics:9102/metrics" + ) + assert metric_response.status_code == 200, ( + f"Metrics could not be retrieved from the {role}-{role_group}." + ) + return metric in metric_response.text + + +def metrics_v3(role_group: str) -> None: + now = datetime.now(timezone.utc) + ts = now.strftime("%Y-%m-%dT%H:%M:%S.%f") + now.strftime("%z") + + # Trigger a deferrable DAG run to create metrics + dag_id = "core_deferrable_sleep_demo" + dag_data = {"logical_date": f"{ts}"} + + # allow a few moments for the DAGs to be registered to all roles + time.sleep(10) + + rest_url = "http://airflow-webserver:8080/api/v2" + token_url = "http://airflow-webserver:8080/auth/token" + + data = {"username": "airflow", "password": "airflow"} + + headers = {"Content-Type": "application/json"} + + response = requests.post(token_url, headers=headers, json=data) + + if response.status_code == 200 or response.status_code == 201: + token_data = response.json() + access_token = token_data["access_token"] + print(f"Access Token: {access_token}") + else: + print( + f"Failed to obtain access token: {response.status_code} - {response.text}" + ) + sys.exit(1) + + headers = { + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + } + + # activate DAG + response = requests.patch( + f"{rest_url}/dags/{dag_id}", headers=headers, json={"is_paused": False} + ) + # trigger DAG + response = requests.post( + f"{rest_url}/dags/{dag_id}/dagRuns", headers=headers, json=dag_data + ) + + # Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid + # or minimize this by looping over the check instead. + iterations = 4 + loop = 0 + while True: + assert response.status_code == 200, "DAG run could not be triggered." + # Wait for the metrics to be consumed by the statsd-exporter + time.sleep(5) + # (disable line-break flake checks) + if ( + (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) + and ( + assert_metric( + "webserver", + role_group, + "airflow_task_instance_created_CoreDeferrableSleepOperator", + ) + ) # noqa: W503, W504 + and ( + assert_metric( + "scheduler", + role_group, + "airflow_dagrun_duration_success_core_deferrable_sleep_demo_count", + ) + ) + ): # noqa: W503, W504 + break + time.sleep(10) + loop += 1 + if loop == iterations: + # force re-try of script + sys.exit(1) + + +def metrics_v2(role_group: str) -> None: + # Trigger a DAG run to create metrics + dag_id = "core_deferrable_sleep_demo" + + rest_url = "http://airflow-webserver:8080/api/v1" + auth = ("airflow", "airflow") + + # allow a few moments for the DAGs to be registered to all roles + time.sleep(10) + + response = requests.patch( + f"{rest_url}/dags/{dag_id}", auth=auth, json={"is_paused": False} + ) + response = requests.post( + f"{rest_url}/dags/{dag_id}/dagRuns", auth=auth, json={"conf": {}} + ) + + # Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid + # or minimize this by looping over the check instead. + iterations = 4 + loop = 0 + while True: + assert response.status_code == 200, "DAG run could not be triggered." + # Wait for the metrics to be consumed by the statsd-exporter + time.sleep(5) + # (disable line-break flake checks) + if ( + (assert_metric("scheduler", role_group, "airflow_scheduler_heartbeat")) + and ( + assert_metric( + "webserver", + role_group, + "airflow_task_instance_created_CoreDeferrableSleepOperator", + ) + ) # noqa: W503, W504 + and ( + assert_metric( + "scheduler", + role_group, + "airflow_dagrun_duration_success_core_deferrable_sleep_demo_count", + ) + ) + ): # noqa: W503, W504 + break + time.sleep(10) + loop += 1 + if loop == iterations: + # force re-try of script + sys.exit(1) + + +if __name__ == "__main__": + log_level = "DEBUG" + logging.basicConfig( + level=log_level, + format="%(asctime)s %(levelname)s: %(message)s", + stream=sys.stdout, + ) + + parser = argparse.ArgumentParser(description="Airflow metrics script") + parser.add_argument( + "--role-group", type=str, default="default", help="Role group to check" + ) + parser.add_argument("--airflow-version", type=str, help="Airflow version") + opts = parser.parse_args() + + if opts.airflow_version and opts.airflow_version.startswith("3"): + metrics_v3(opts.role_group) + else: + metrics_v2(opts.role_group) diff --git a/tests/test-definition.yaml b/tests/test-definition.yaml index f092e9a7..706f10b6 100644 --- a/tests/test-definition.yaml +++ b/tests/test-definition.yaml @@ -90,6 +90,11 @@ tests: - airflow - openshift - executor + - name: triggerer + dimensions: + - airflow-latest + - openshift + - executor - name: remote-logging dimensions: - airflow-latest