diff --git a/internal/controller/appwrapper/appwrapper_controller.go b/internal/controller/appwrapper/appwrapper_controller.go index 0892358..4f487a2 100644 --- a/internal/controller/appwrapper/appwrapper_controller.go +++ b/internal/controller/appwrapper/appwrapper_controller.go @@ -70,7 +70,7 @@ type podStatusSummary struct { succeeded int32 failed int32 terminalFailure bool - unhealthyNodes sets.Set[string] + noExecuteNodes sets.Set[string] } type componentStatusSummary struct { @@ -334,13 +334,13 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } - // Initiate migration of workloads that are using resources that Autopilot has flagged as unhealthy - detailMsg = fmt.Sprintf("Workload contains pods using unhealthy resources on Nodes: %v", podStatus.unhealthyNodes) - if len(podStatus.unhealthyNodes) > 0 { + // Initiate migration of workloads that are using resources that Autopilot has flagged as NoExecute + detailMsg = fmt.Sprintf("Workload contains pods using NoExecute resources on Nodes: %v", podStatus.noExecuteNodes) + if len(podStatus.noExecuteNodes) > 0 { meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{ Type: string(workloadv1beta2.Unhealthy), Status: metav1.ConditionTrue, - Reason: "AutopilotUnhealthy", + Reason: "AutopilotNoExecute", Message: detailMsg, }) r.Recorder.Event(aw, v1.EventTypeNormal, string(workloadv1beta2.Unhealthy), detailMsg) @@ -549,7 +549,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b return nil, err } summary := &podStatusSummary{expected: pc} - checkUnhealthyNodes := r.Config.Autopilot != nil && r.Config.Autopilot.MonitorNodes + checkNoExecuteNodes := r.Config.Autopilot != nil && r.Config.Autopilot.MonitorNodes for _, pod := range pods.Items { switch pod.Status.Phase { @@ -558,33 +558,33 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b case v1.PodRunning: if pod.DeletionTimestamp.IsZero() { summary.running += 1 - if checkUnhealthyNodes { - unhealthyNodesMutex.RLock() // BEGIN CRITICAL SECTION - if len(unhealthyNodes) > 0 { - if resources, ok := unhealthyNodes[pod.Spec.NodeName]; ok { + if checkNoExecuteNodes { + noExecuteNodesMutex.RLock() // BEGIN CRITICAL SECTION + if len(noExecuteNodes) > 0 { + if resources, ok := noExecuteNodes[pod.Spec.NodeName]; ok { for badResource := range resources { for _, container := range pod.Spec.Containers { if limit, ok := container.Resources.Limits[v1.ResourceName(badResource)]; ok { if !limit.IsZero() { - if summary.unhealthyNodes == nil { - summary.unhealthyNodes = make(sets.Set[string]) + if summary.noExecuteNodes == nil { + summary.noExecuteNodes = make(sets.Set[string]) } - summary.unhealthyNodes.Insert(pod.Spec.NodeName) + summary.noExecuteNodes.Insert(pod.Spec.NodeName) } } if request, ok := container.Resources.Requests[v1.ResourceName(badResource)]; ok { if !request.IsZero() { - if summary.unhealthyNodes == nil { - summary.unhealthyNodes = make(sets.Set[string]) + if summary.noExecuteNodes == nil { + summary.noExecuteNodes = make(sets.Set[string]) } - summary.unhealthyNodes.Insert(pod.Spec.NodeName) + summary.noExecuteNodes.Insert(pod.Spec.NodeName) } } } } } } - unhealthyNodesMutex.RUnlock() // END CRITICAL SECTION + noExecuteNodesMutex.RUnlock() // END CRITICAL SECTION } } case v1.PodSucceeded: diff --git a/internal/controller/appwrapper/node_health_monitor.go b/internal/controller/appwrapper/node_health_monitor.go index 7e808ac..d86f39d 100644 --- a/internal/controller/appwrapper/node_health_monitor.go +++ b/internal/controller/appwrapper/node_health_monitor.go @@ -36,74 +36,46 @@ import ( "github.com/project-codeflare/appwrapper/pkg/config" ) -// NodeHealthMonitor maintains the set of nodes that Autopilot has labelled as unhealthy +// NodeHealthMonitor watches Nodes and maintains mappings of Nodes that have either +// been marked as Unschedulable or that have been labeled to indicate that +// they have resources that Autopilot has tainted as NoSchedule or NoExeucte. +// This information is used to automate the maintenance of the lendingLimit of +// a designated slack ClusterQueue and to migrate running workloads away from NoExecute resources. type NodeHealthMonitor struct { client.Client Config *config.AppWrapperConfig } var ( - // unhealthyNodes is a mapping from Node names to a set of resources that Autopilot has labeled as unhealthy on that Node - unhealthyNodes = make(map[string]sets.Set[string]) - unhealthyNodesMutex sync.RWMutex - - // unschedulableNodes is a mapping from Node names to resource quantities than Autopilot has labeled as unschedulable on that Node - unschedulableNodes = make(map[string]map[string]*resource.Quantity) + // noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExeucte taint + noExecuteNodes = make(map[string]sets.Set[string]) + noExecuteNodesMutex sync.RWMutex + + // noScheduleNodes is a mapping from Node names to resource quantities that are unschedulable. + // A resource may be unscheduable either because: + // (a) the Node is cordoned (node.Spec.Unschedulable is true) or + // (b) Autopilot has labeled the with either a NoExecute or NoSchedule taint. + noScheduleNodes = make(map[string]map[string]*resource.Quantity) ) // permission to watch nodes //+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch //+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch -//gocyclo:ignore func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { node := &v1.Node{} if err := r.Get(ctx, req.NamespacedName, node); err != nil { return ctrl.Result{}, nil } - flaggedResources := make(sets.Set[string]) - for key, value := range node.GetLabels() { - for resourceName, taints := range r.Config.Autopilot.ResourceTaints { - for _, taint := range taints { - if key == taint.Key && value == taint.Value && taint.Effect == v1.TaintEffectNoExecute { - flaggedResources.Insert(resourceName) - } - } - } - } - - nodeChanged := false - unhealthyNodesMutex.Lock() // BEGIN CRITICAL SECTION - if priorEntry, ok := unhealthyNodes[node.GetName()]; ok { - if len(flaggedResources) == 0 { - delete(unhealthyNodes, node.GetName()) - nodeChanged = true - } else if !priorEntry.Equal(flaggedResources) { - unhealthyNodes[node.GetName()] = flaggedResources - nodeChanged = true - } - } else if len(flaggedResources) > 0 { - unhealthyNodes[node.GetName()] = flaggedResources - nodeChanged = true - } - unhealthyNodesMutex.Unlock() // END CRITICAL SECTION + r.updateNoExecuteNodes(ctx, node) - // Unsynchronized reads of unhealthyNodes below are safe because this method - // is the only writer to the map and the controller runtime is configured to - // not allow concurrent execution of this method. - - if nodeChanged { - log.FromContext(ctx).Info("Updated node health information", "Number Unhealthy Nodes", len(unhealthyNodes), "Unhealthy Resource Details", unhealthyNodes) - } - - // update lending limits on slack quota if configured + // If there is a slack ClusterQueue, update its lending limits if r.Config.SlackQueueName == "" { return ctrl.Result{}, nil } - // get slack quota cq := &kueue.ClusterQueue{} if err := r.Get(ctx, types.NamespacedName{Name: r.Config.SlackQueueName}, cq); err != nil { if errors.IsNotFound(err) { @@ -112,13 +84,54 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, err } + r.updateNoScheduleNodes(ctx, cq, node) + + return r.updateLendingLimits(ctx, cq) +} + +func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.Node) { + noExecuteResources := make(sets.Set[string]) + for key, value := range node.GetLabels() { + for resourceName, taints := range r.Config.Autopilot.ResourceTaints { + for _, taint := range taints { + if key == taint.Key && value == taint.Value && taint.Effect == v1.TaintEffectNoExecute { + noExecuteResources.Insert(resourceName) + } + } + } + } + + noExecuteNodesChanged := false + noExecuteNodesMutex.Lock() // BEGIN CRITICAL SECTION + if priorEntry, ok := noExecuteNodes[node.GetName()]; ok { + if len(noExecuteResources) == 0 { + delete(noExecuteNodes, node.GetName()) + noExecuteNodesChanged = true + } else if !priorEntry.Equal(noExecuteResources) { + noExecuteNodes[node.GetName()] = noExecuteResources + noExecuteNodesChanged = true + } + } else if len(noExecuteResources) > 0 { + noExecuteNodes[node.GetName()] = noExecuteResources + noExecuteNodesChanged = true + } + noExecuteNodesMutex.Unlock() // END CRITICAL SECTION + + // Safe to log outside the mutex because because this method is the only writer of noExecuteNodes + // and the controller runtime is configured to not allow concurrent execution of this controller. + if noExecuteNodesChanged { + log.FromContext(ctx).Info("Updated node NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes) + } +} + +func (r *NodeHealthMonitor) updateNoScheduleNodes(_ context.Context, cq *kueue.ClusterQueue, node *v1.Node) { // update unschedulable resource quantities for this node - flaggedQuantities := make(map[string]*resource.Quantity) + noScheduleQuantities := make(map[string]*resource.Quantity) if node.Spec.Unschedulable { - // flag all non-pod resources covered by cq if the node is cordoned + // add all non-pod resources covered by cq if the node is cordoned for _, resourceName := range cq.Spec.ResourceGroups[0].Flavors[0].Resources { if string(resourceName.Name) != "pods" { - flaggedQuantities[string(resourceName.Name)] = node.Status.Capacity.Name(resourceName.Name, resource.DecimalSI) + noScheduleQuantities[string(resourceName.Name)] = node.Status.Capacity.Name(resourceName.Name, resource.DecimalSI) } } } else { @@ -126,22 +139,25 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct for resourceName, taints := range r.Config.Autopilot.ResourceTaints { for _, taint := range taints { if key == taint.Key && value == taint.Value { - flaggedQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI) + noScheduleQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI) } } } } } - if len(flaggedQuantities) > 0 { - unschedulableNodes[node.GetName()] = flaggedQuantities + if len(noScheduleQuantities) > 0 { + noScheduleNodes[node.GetName()] = noScheduleQuantities } else { - delete(unschedulableNodes, node.GetName()) + delete(noScheduleNodes, node.GetName()) } +} + +func (r *NodeHealthMonitor) updateLendingLimits(ctx context.Context, cq *kueue.ClusterQueue) (ctrl.Result, error) { // compute unschedulable resource totals unschedulableQuantities := map[string]*resource.Quantity{} - for _, quantities := range unschedulableNodes { + for _, quantities := range noScheduleNodes { for resourceName, quantity := range quantities { if !quantity.IsZero() { if unschedulableQuantities[resourceName] == nil { diff --git a/internal/controller/appwrapper/node_health_monitor_test.go b/internal/controller/appwrapper/node_health_monitor_test.go index ee5db36..2a2589a 100644 --- a/internal/controller/appwrapper/node_health_monitor_test.go +++ b/internal/controller/appwrapper/node_health_monitor_test.go @@ -74,7 +74,7 @@ var _ = Describe("NodeMonitor Controller", func() { Expect(err).NotTo(HaveOccurred()) By("Healthy cluster has no unhealthy nodes") - Expect(len(unhealthyNodes)).Should(Equal(0)) + Expect(len(noExecuteNodes)).Should(Equal(0)) By("A node labeled EVICT is detected as unhealthy") node := getNode(node1Name.Name) @@ -84,25 +84,25 @@ var _ = Describe("NodeMonitor Controller", func() { Expect(err).NotTo(HaveOccurred()) _, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name}) Expect(err).NotTo(HaveOccurred()) - Expect(len(unhealthyNodes)).Should(Equal(1)) - Expect(unhealthyNodes).Should(HaveKey(node1Name.Name)) - Expect(unhealthyNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu")) + Expect(len(noExecuteNodes)).Should(Equal(1)) + Expect(noExecuteNodes).Should(HaveKey(node1Name.Name)) + Expect(noExecuteNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu")) By("Repeated reconcile does not change map") _, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name}) Expect(err).NotTo(HaveOccurred()) _, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name}) Expect(err).NotTo(HaveOccurred()) - Expect(len(unhealthyNodes)).Should(Equal(1)) - Expect(unhealthyNodes).Should(HaveKey(node1Name.Name)) - Expect(unhealthyNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu")) + Expect(len(noExecuteNodes)).Should(Equal(1)) + Expect(noExecuteNodes).Should(HaveKey(node1Name.Name)) + Expect(noExecuteNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu")) By("Removing the EVICT label updates unhealthyNodes") node.Labels["autopilot.ibm.com/gpuhealth"] = "ERR" Expect(k8sClient.Update(ctx, node)).Should(Succeed()) _, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name}) Expect(err).NotTo(HaveOccurred()) - Expect(len(unhealthyNodes)).Should(Equal(0)) + Expect(len(noExecuteNodes)).Should(Equal(0)) }) It("ClusterQueue Lending Adjustment", func() {