Skip to content

Commit 4b282d0

Browse files
authored
reorganize NodeMonitor code; no semantic change (#254)
Rename variables, fix comments, and refactor into smaller methods to prepare for actually fixing the bugs related to node deletion.
1 parent 69b0199 commit 4b282d0

File tree

3 files changed

+93
-77
lines changed

3 files changed

+93
-77
lines changed

internal/controller/appwrapper/appwrapper_controller.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ type podStatusSummary struct {
7070
succeeded int32
7171
failed int32
7272
terminalFailure bool
73-
unhealthyNodes sets.Set[string]
73+
noExecuteNodes sets.Set[string]
7474
}
7575

7676
type componentStatusSummary struct {
@@ -334,13 +334,13 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
334334
}
335335
}
336336

337-
// Initiate migration of workloads that are using resources that Autopilot has flagged as unhealthy
338-
detailMsg = fmt.Sprintf("Workload contains pods using unhealthy resources on Nodes: %v", podStatus.unhealthyNodes)
339-
if len(podStatus.unhealthyNodes) > 0 {
337+
// Initiate migration of workloads that are using resources that Autopilot has flagged as NoExecute
338+
detailMsg = fmt.Sprintf("Workload contains pods using NoExecute resources on Nodes: %v", podStatus.noExecuteNodes)
339+
if len(podStatus.noExecuteNodes) > 0 {
340340
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
341341
Type: string(workloadv1beta2.Unhealthy),
342342
Status: metav1.ConditionTrue,
343-
Reason: "AutopilotUnhealthy",
343+
Reason: "AutopilotNoExecute",
344344
Message: detailMsg,
345345
})
346346
r.Recorder.Event(aw, v1.EventTypeNormal, string(workloadv1beta2.Unhealthy), detailMsg)
@@ -549,7 +549,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
549549
return nil, err
550550
}
551551
summary := &podStatusSummary{expected: pc}
552-
checkUnhealthyNodes := r.Config.Autopilot != nil && r.Config.Autopilot.MonitorNodes
552+
checkNoExecuteNodes := r.Config.Autopilot != nil && r.Config.Autopilot.MonitorNodes
553553

554554
for _, pod := range pods.Items {
555555
switch pod.Status.Phase {
@@ -558,33 +558,33 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
558558
case v1.PodRunning:
559559
if pod.DeletionTimestamp.IsZero() {
560560
summary.running += 1
561-
if checkUnhealthyNodes {
562-
unhealthyNodesMutex.RLock() // BEGIN CRITICAL SECTION
563-
if len(unhealthyNodes) > 0 {
564-
if resources, ok := unhealthyNodes[pod.Spec.NodeName]; ok {
561+
if checkNoExecuteNodes {
562+
noExecuteNodesMutex.RLock() // BEGIN CRITICAL SECTION
563+
if len(noExecuteNodes) > 0 {
564+
if resources, ok := noExecuteNodes[pod.Spec.NodeName]; ok {
565565
for badResource := range resources {
566566
for _, container := range pod.Spec.Containers {
567567
if limit, ok := container.Resources.Limits[v1.ResourceName(badResource)]; ok {
568568
if !limit.IsZero() {
569-
if summary.unhealthyNodes == nil {
570-
summary.unhealthyNodes = make(sets.Set[string])
569+
if summary.noExecuteNodes == nil {
570+
summary.noExecuteNodes = make(sets.Set[string])
571571
}
572-
summary.unhealthyNodes.Insert(pod.Spec.NodeName)
572+
summary.noExecuteNodes.Insert(pod.Spec.NodeName)
573573
}
574574
}
575575
if request, ok := container.Resources.Requests[v1.ResourceName(badResource)]; ok {
576576
if !request.IsZero() {
577-
if summary.unhealthyNodes == nil {
578-
summary.unhealthyNodes = make(sets.Set[string])
577+
if summary.noExecuteNodes == nil {
578+
summary.noExecuteNodes = make(sets.Set[string])
579579
}
580-
summary.unhealthyNodes.Insert(pod.Spec.NodeName)
580+
summary.noExecuteNodes.Insert(pod.Spec.NodeName)
581581
}
582582
}
583583
}
584584
}
585585
}
586586
}
587-
unhealthyNodesMutex.RUnlock() // END CRITICAL SECTION
587+
noExecuteNodesMutex.RUnlock() // END CRITICAL SECTION
588588
}
589589
}
590590
case v1.PodSucceeded:

internal/controller/appwrapper/node_health_monitor.go

Lines changed: 68 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -36,74 +36,46 @@ import (
3636
"github.com/project-codeflare/appwrapper/pkg/config"
3737
)
3838

39-
// NodeHealthMonitor maintains the set of nodes that Autopilot has labelled as unhealthy
39+
// NodeHealthMonitor watches Nodes and maintains mappings of Nodes that have either
40+
// been marked as Unschedulable or that have been labeled to indicate that
41+
// they have resources that Autopilot has tainted as NoSchedule or NoExeucte.
42+
// This information is used to automate the maintenance of the lendingLimit of
43+
// a designated slack ClusterQueue and to migrate running workloads away from NoExecute resources.
4044
type NodeHealthMonitor struct {
4145
client.Client
4246
Config *config.AppWrapperConfig
4347
}
4448

4549
var (
46-
// unhealthyNodes is a mapping from Node names to a set of resources that Autopilot has labeled as unhealthy on that Node
47-
unhealthyNodes = make(map[string]sets.Set[string])
48-
unhealthyNodesMutex sync.RWMutex
49-
50-
// unschedulableNodes is a mapping from Node names to resource quantities than Autopilot has labeled as unschedulable on that Node
51-
unschedulableNodes = make(map[string]map[string]*resource.Quantity)
50+
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExeucte taint
51+
noExecuteNodes = make(map[string]sets.Set[string])
52+
noExecuteNodesMutex sync.RWMutex
53+
54+
// noScheduleNodes is a mapping from Node names to resource quantities that are unschedulable.
55+
// A resource may be unscheduable either because:
56+
// (a) the Node is cordoned (node.Spec.Unschedulable is true) or
57+
// (b) Autopilot has labeled the with either a NoExecute or NoSchedule taint.
58+
noScheduleNodes = make(map[string]map[string]*resource.Quantity)
5259
)
5360

5461
// permission to watch nodes
5562
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
5663
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch
5764

58-
//gocyclo:ignore
5965
func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
6066
node := &v1.Node{}
6167
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
6268
return ctrl.Result{}, nil
6369
}
6470

65-
flaggedResources := make(sets.Set[string])
66-
for key, value := range node.GetLabels() {
67-
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
68-
for _, taint := range taints {
69-
if key == taint.Key && value == taint.Value && taint.Effect == v1.TaintEffectNoExecute {
70-
flaggedResources.Insert(resourceName)
71-
}
72-
}
73-
}
74-
}
75-
76-
nodeChanged := false
77-
unhealthyNodesMutex.Lock() // BEGIN CRITICAL SECTION
78-
if priorEntry, ok := unhealthyNodes[node.GetName()]; ok {
79-
if len(flaggedResources) == 0 {
80-
delete(unhealthyNodes, node.GetName())
81-
nodeChanged = true
82-
} else if !priorEntry.Equal(flaggedResources) {
83-
unhealthyNodes[node.GetName()] = flaggedResources
84-
nodeChanged = true
85-
}
86-
} else if len(flaggedResources) > 0 {
87-
unhealthyNodes[node.GetName()] = flaggedResources
88-
nodeChanged = true
89-
}
90-
unhealthyNodesMutex.Unlock() // END CRITICAL SECTION
71+
r.updateNoExecuteNodes(ctx, node)
9172

92-
// Unsynchronized reads of unhealthyNodes below are safe because this method
93-
// is the only writer to the map and the controller runtime is configured to
94-
// not allow concurrent execution of this method.
95-
96-
if nodeChanged {
97-
log.FromContext(ctx).Info("Updated node health information", "Number Unhealthy Nodes", len(unhealthyNodes), "Unhealthy Resource Details", unhealthyNodes)
98-
}
99-
100-
// update lending limits on slack quota if configured
73+
// If there is a slack ClusterQueue, update its lending limits
10174

10275
if r.Config.SlackQueueName == "" {
10376
return ctrl.Result{}, nil
10477
}
10578

106-
// get slack quota
10779
cq := &kueue.ClusterQueue{}
10880
if err := r.Get(ctx, types.NamespacedName{Name: r.Config.SlackQueueName}, cq); err != nil {
10981
if errors.IsNotFound(err) {
@@ -112,36 +84,80 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
11284
return ctrl.Result{}, err
11385
}
11486

87+
r.updateNoScheduleNodes(ctx, cq, node)
88+
89+
return r.updateLendingLimits(ctx, cq)
90+
}
91+
92+
func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.Node) {
93+
noExecuteResources := make(sets.Set[string])
94+
for key, value := range node.GetLabels() {
95+
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
96+
for _, taint := range taints {
97+
if key == taint.Key && value == taint.Value && taint.Effect == v1.TaintEffectNoExecute {
98+
noExecuteResources.Insert(resourceName)
99+
}
100+
}
101+
}
102+
}
103+
104+
noExecuteNodesChanged := false
105+
noExecuteNodesMutex.Lock() // BEGIN CRITICAL SECTION
106+
if priorEntry, ok := noExecuteNodes[node.GetName()]; ok {
107+
if len(noExecuteResources) == 0 {
108+
delete(noExecuteNodes, node.GetName())
109+
noExecuteNodesChanged = true
110+
} else if !priorEntry.Equal(noExecuteResources) {
111+
noExecuteNodes[node.GetName()] = noExecuteResources
112+
noExecuteNodesChanged = true
113+
}
114+
} else if len(noExecuteResources) > 0 {
115+
noExecuteNodes[node.GetName()] = noExecuteResources
116+
noExecuteNodesChanged = true
117+
}
118+
noExecuteNodesMutex.Unlock() // END CRITICAL SECTION
119+
120+
// Safe to log outside the mutex because because this method is the only writer of noExecuteNodes
121+
// and the controller runtime is configured to not allow concurrent execution of this controller.
122+
if noExecuteNodesChanged {
123+
log.FromContext(ctx).Info("Updated node NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
124+
}
125+
}
126+
127+
func (r *NodeHealthMonitor) updateNoScheduleNodes(_ context.Context, cq *kueue.ClusterQueue, node *v1.Node) {
115128
// update unschedulable resource quantities for this node
116-
flaggedQuantities := make(map[string]*resource.Quantity)
129+
noScheduleQuantities := make(map[string]*resource.Quantity)
117130
if node.Spec.Unschedulable {
118-
// flag all non-pod resources covered by cq if the node is cordoned
131+
// add all non-pod resources covered by cq if the node is cordoned
119132
for _, resourceName := range cq.Spec.ResourceGroups[0].Flavors[0].Resources {
120133
if string(resourceName.Name) != "pods" {
121-
flaggedQuantities[string(resourceName.Name)] = node.Status.Capacity.Name(resourceName.Name, resource.DecimalSI)
134+
noScheduleQuantities[string(resourceName.Name)] = node.Status.Capacity.Name(resourceName.Name, resource.DecimalSI)
122135
}
123136
}
124137
} else {
125138
for key, value := range node.GetLabels() {
126139
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
127140
for _, taint := range taints {
128141
if key == taint.Key && value == taint.Value {
129-
flaggedQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
142+
noScheduleQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
130143
}
131144
}
132145
}
133146
}
134147
}
135148

136-
if len(flaggedQuantities) > 0 {
137-
unschedulableNodes[node.GetName()] = flaggedQuantities
149+
if len(noScheduleQuantities) > 0 {
150+
noScheduleNodes[node.GetName()] = noScheduleQuantities
138151
} else {
139-
delete(unschedulableNodes, node.GetName())
152+
delete(noScheduleNodes, node.GetName())
140153
}
154+
}
155+
156+
func (r *NodeHealthMonitor) updateLendingLimits(ctx context.Context, cq *kueue.ClusterQueue) (ctrl.Result, error) {
141157

142158
// compute unschedulable resource totals
143159
unschedulableQuantities := map[string]*resource.Quantity{}
144-
for _, quantities := range unschedulableNodes {
160+
for _, quantities := range noScheduleNodes {
145161
for resourceName, quantity := range quantities {
146162
if !quantity.IsZero() {
147163
if unschedulableQuantities[resourceName] == nil {

internal/controller/appwrapper/node_health_monitor_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ var _ = Describe("NodeMonitor Controller", func() {
7474
Expect(err).NotTo(HaveOccurred())
7575

7676
By("Healthy cluster has no unhealthy nodes")
77-
Expect(len(unhealthyNodes)).Should(Equal(0))
77+
Expect(len(noExecuteNodes)).Should(Equal(0))
7878

7979
By("A node labeled EVICT is detected as unhealthy")
8080
node := getNode(node1Name.Name)
@@ -84,25 +84,25 @@ var _ = Describe("NodeMonitor Controller", func() {
8484
Expect(err).NotTo(HaveOccurred())
8585
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
8686
Expect(err).NotTo(HaveOccurred())
87-
Expect(len(unhealthyNodes)).Should(Equal(1))
88-
Expect(unhealthyNodes).Should(HaveKey(node1Name.Name))
89-
Expect(unhealthyNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu"))
87+
Expect(len(noExecuteNodes)).Should(Equal(1))
88+
Expect(noExecuteNodes).Should(HaveKey(node1Name.Name))
89+
Expect(noExecuteNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu"))
9090

9191
By("Repeated reconcile does not change map")
9292
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
9393
Expect(err).NotTo(HaveOccurred())
9494
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
9595
Expect(err).NotTo(HaveOccurred())
96-
Expect(len(unhealthyNodes)).Should(Equal(1))
97-
Expect(unhealthyNodes).Should(HaveKey(node1Name.Name))
98-
Expect(unhealthyNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu"))
96+
Expect(len(noExecuteNodes)).Should(Equal(1))
97+
Expect(noExecuteNodes).Should(HaveKey(node1Name.Name))
98+
Expect(noExecuteNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu"))
9999

100100
By("Removing the EVICT label updates unhealthyNodes")
101101
node.Labels["autopilot.ibm.com/gpuhealth"] = "ERR"
102102
Expect(k8sClient.Update(ctx, node)).Should(Succeed())
103103
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
104104
Expect(err).NotTo(HaveOccurred())
105-
Expect(len(unhealthyNodes)).Should(Equal(0))
105+
Expect(len(noExecuteNodes)).Should(Equal(0))
106106
})
107107

108108
It("ClusterQueue Lending Adjustment", func() {

0 commit comments

Comments
 (0)