Skip to content

Commit 936dcd2

Browse files
committed
Redesign node monitoring to account for Node deletion
1. Split node monitoring into two reconcilers, one to monitor Nodes and one to monitor and update the designated slack ClusterQueue. 2. Remove entries from in memory caches when a Node is deleted. 3. Watch slack cluster queue to be able to react to changes in nominalQuotas and adjust lendingLimits accordingly. Fixes #252.
1 parent 4b282d0 commit 936dcd2

File tree

5 files changed

+247
-95
lines changed

5 files changed

+247
-95
lines changed

internal/controller/appwrapper/appwrapper_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
559559
if pod.DeletionTimestamp.IsZero() {
560560
summary.running += 1
561561
if checkNoExecuteNodes {
562-
noExecuteNodesMutex.RLock() // BEGIN CRITICAL SECTION
562+
nodeInfoMutex.RLock() // BEGIN CRITICAL SECTION
563563
if len(noExecuteNodes) > 0 {
564564
if resources, ok := noExecuteNodes[pod.Spec.NodeName]; ok {
565565
for badResource := range resources {
@@ -584,7 +584,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
584584
}
585585
}
586586
}
587-
noExecuteNodesMutex.RUnlock() // END CRITICAL SECTION
587+
nodeInfoMutex.RUnlock() // END CRITICAL SECTION
588588
}
589589
}
590590
case v1.PodSucceeded:

internal/controller/appwrapper/node_health_monitor.go

Lines changed: 80 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,20 @@ package appwrapper
1818

1919
import (
2020
"context"
21+
"maps"
2122
"sync"
2223

2324
v1 "k8s.io/api/core/v1"
2425
"k8s.io/apimachinery/pkg/api/errors"
2526
"k8s.io/apimachinery/pkg/api/resource"
26-
"k8s.io/apimachinery/pkg/types"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/util/sets"
28-
"k8s.io/utils/ptr"
2929

3030
ctrl "sigs.k8s.io/controller-runtime"
3131
"sigs.k8s.io/controller-runtime/pkg/client"
32+
"sigs.k8s.io/controller-runtime/pkg/event"
3233
"sigs.k8s.io/controller-runtime/pkg/handler"
3334
"sigs.k8s.io/controller-runtime/pkg/log"
34-
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
3535

3636
"github.com/project-codeflare/appwrapper/pkg/config"
3737
)
@@ -44,51 +44,77 @@ import (
4444
type NodeHealthMonitor struct {
4545
client.Client
4646
Config *config.AppWrapperConfig
47+
Events chan event.GenericEvent // event channel for NodeHealthMonitor to trigger SlackClusterQueueMonitor
4748
}
4849

4950
var (
50-
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExeucte taint
51-
noExecuteNodes = make(map[string]sets.Set[string])
52-
noExecuteNodesMutex sync.RWMutex
51+
// nodeInfoMutex syncnornized writes by NodeHealthMonitor with reads from AppWrapperReconciler and SlackClusterQueueMonitor
52+
nodeInfoMutex sync.RWMutex
5353

54-
// noScheduleNodes is a mapping from Node names to resource quantities that are unschedulable.
54+
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExecute taint
55+
noExecuteNodes = make(map[string]sets.Set[string])
56+
57+
// noScheduleNodes is a mapping from Node names to ResourceLists of unschedulable resources.
5558
// A resource may be unscheduable either because:
5659
// (a) the Node is cordoned (node.Spec.Unschedulable is true) or
57-
// (b) Autopilot has labeled the with either a NoExecute or NoSchedule taint.
58-
noScheduleNodes = make(map[string]map[string]*resource.Quantity)
60+
// (b) Autopilot has labeled the Node with a NoExecute or NoSchedule taint for the resource.
61+
noScheduleNodes = make(map[string]v1.ResourceList)
5962
)
6063

6164
// permission to watch nodes
6265
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
63-
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch
6466

6567
func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
6668
node := &v1.Node{}
6769
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
68-
return ctrl.Result{}, nil
70+
if errors.IsNotFound(err) {
71+
r.updateForNodeDeletion(ctx, req.Name)
72+
return ctrl.Result{}, nil
73+
}
74+
return ctrl.Result{}, err
6975
}
7076

71-
r.updateNoExecuteNodes(ctx, node)
72-
73-
// If there is a slack ClusterQueue, update its lending limits
74-
75-
if r.Config.SlackQueueName == "" {
76-
return ctrl.Result{}, nil
77+
if node.DeletionTimestamp.IsZero() {
78+
r.updateNoExecuteNodes(ctx, node)
79+
r.updateNoScheduleNodes(ctx, node)
80+
} else {
81+
r.updateForNodeDeletion(ctx, req.Name)
7782
}
7883

79-
cq := &kueue.ClusterQueue{}
80-
if err := r.Get(ctx, types.NamespacedName{Name: r.Config.SlackQueueName}, cq); err != nil {
81-
if errors.IsNotFound(err) {
82-
return ctrl.Result{}, nil // give up if slack quota is not defined
84+
return ctrl.Result{}, nil
85+
}
86+
87+
// Trigger dispatch by means of "*/*" request
88+
func (r *NodeHealthMonitor) triggerDispatch() {
89+
if r.Config.SlackQueueName != "" {
90+
select {
91+
case r.Events <- event.GenericEvent{Object: &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: "*", Name: "*"}}}:
92+
default:
93+
// do not block if event is already in channel
8394
}
84-
return ctrl.Result{}, err
8595
}
96+
}
8697

87-
r.updateNoScheduleNodes(ctx, cq, node)
88-
89-
return r.updateLendingLimits(ctx, cq)
98+
func (r *NodeHealthMonitor) updateForNodeDeletion(ctx context.Context, name string) {
99+
if _, ok := noExecuteNodes[name]; ok {
100+
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
101+
delete(noExecuteNodes, name)
102+
nodeInfoMutex.Unlock() // END CRITICAL SECTION
103+
r.triggerDispatch()
104+
log.FromContext(ctx).Info("Updated node NoExecute information for Node deletion",
105+
"Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
106+
}
107+
if _, ok := noScheduleNodes[name]; ok {
108+
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
109+
delete(noScheduleNodes, name)
110+
nodeInfoMutex.Unlock() // END CRITICAL SECTION
111+
r.triggerDispatch()
112+
log.FromContext(ctx).Info("Updated node NoSchedule information for Node deletion",
113+
"Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
114+
}
90115
}
91116

117+
// update noExecuteNodes entry for this node
92118
func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.Node) {
93119
noExecuteResources := make(sets.Set[string])
94120
for key, value := range node.GetLabels() {
@@ -102,7 +128,7 @@ func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.N
102128
}
103129

104130
noExecuteNodesChanged := false
105-
noExecuteNodesMutex.Lock() // BEGIN CRITICAL SECTION
131+
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
106132
if priorEntry, ok := noExecuteNodes[node.GetName()]; ok {
107133
if len(noExecuteResources) == 0 {
108134
delete(noExecuteNodes, node.GetName())
@@ -115,95 +141,56 @@ func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.N
115141
noExecuteNodes[node.GetName()] = noExecuteResources
116142
noExecuteNodesChanged = true
117143
}
118-
noExecuteNodesMutex.Unlock() // END CRITICAL SECTION
144+
nodeInfoMutex.Unlock() // END CRITICAL SECTION
119145

120-
// Safe to log outside the mutex because because this method is the only writer of noExecuteNodes
121-
// and the controller runtime is configured to not allow concurrent execution of this controller.
122146
if noExecuteNodesChanged {
147+
r.triggerDispatch()
123148
log.FromContext(ctx).Info("Updated node NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
124149
}
125150
}
126151

127-
func (r *NodeHealthMonitor) updateNoScheduleNodes(_ context.Context, cq *kueue.ClusterQueue, node *v1.Node) {
128-
// update unschedulable resource quantities for this node
129-
noScheduleQuantities := make(map[string]*resource.Quantity)
152+
// update noScheduleNodes entry for this node
153+
func (r *NodeHealthMonitor) updateNoScheduleNodes(ctx context.Context, node *v1.Node) {
154+
var noScheduleResources v1.ResourceList
130155
if node.Spec.Unschedulable {
131-
// add all non-pod resources covered by cq if the node is cordoned
132-
for _, resourceName := range cq.Spec.ResourceGroups[0].Flavors[0].Resources {
133-
if string(resourceName.Name) != "pods" {
134-
noScheduleQuantities[string(resourceName.Name)] = node.Status.Capacity.Name(resourceName.Name, resource.DecimalSI)
135-
}
136-
}
156+
noScheduleResources = node.Status.Capacity.DeepCopy()
157+
delete(noScheduleResources, v1.ResourcePods)
137158
} else {
159+
noScheduleResources = make(v1.ResourceList)
138160
for key, value := range node.GetLabels() {
139161
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
140162
for _, taint := range taints {
141163
if key == taint.Key && value == taint.Value {
142-
noScheduleQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
164+
quantity := node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
165+
if !quantity.IsZero() {
166+
noScheduleResources[v1.ResourceName(resourceName)] = *quantity
167+
}
143168
}
144169
}
145170
}
146171
}
147172
}
148173

149-
if len(noScheduleQuantities) > 0 {
150-
noScheduleNodes[node.GetName()] = noScheduleQuantities
151-
} else {
152-
delete(noScheduleNodes, node.GetName())
153-
}
154-
}
155-
156-
func (r *NodeHealthMonitor) updateLendingLimits(ctx context.Context, cq *kueue.ClusterQueue) (ctrl.Result, error) {
157-
158-
// compute unschedulable resource totals
159-
unschedulableQuantities := map[string]*resource.Quantity{}
160-
for _, quantities := range noScheduleNodes {
161-
for resourceName, quantity := range quantities {
162-
if !quantity.IsZero() {
163-
if unschedulableQuantities[resourceName] == nil {
164-
unschedulableQuantities[resourceName] = ptr.To(*quantity)
165-
} else {
166-
unschedulableQuantities[resourceName].Add(*quantity)
167-
}
168-
}
174+
noScheduleNodesChanged := false
175+
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
176+
if priorEntry, ok := noScheduleNodes[node.GetName()]; ok {
177+
if len(noScheduleResources) == 0 {
178+
delete(noScheduleNodes, node.GetName())
179+
noScheduleNodesChanged = true
180+
} else if !maps.Equal(priorEntry, noScheduleResources) {
181+
noScheduleNodes[node.GetName()] = noScheduleResources
182+
noScheduleNodesChanged = true
169183
}
184+
} else if len(noScheduleResources) > 0 {
185+
noScheduleNodes[node.GetName()] = noScheduleResources
186+
noScheduleNodesChanged = true
170187
}
188+
nodeInfoMutex.Unlock() // END CRITICAL SECTION
171189

172-
// enforce lending limits on 1st flavor of 1st resource group
173-
resources := cq.Spec.ResourceGroups[0].Flavors[0].Resources
174-
limitsChanged := false
175-
for i, quota := range resources {
176-
var lendingLimit *resource.Quantity
177-
if unschedulableQuantity := unschedulableQuantities[quota.Name.String()]; unschedulableQuantity != nil {
178-
if quota.NominalQuota.Cmp(*unschedulableQuantity) > 0 {
179-
lendingLimit = ptr.To(quota.NominalQuota)
180-
lendingLimit.Sub(*unschedulableQuantity)
181-
} else {
182-
lendingLimit = resource.NewQuantity(0, resource.DecimalSI)
183-
}
184-
}
185-
if quota.LendingLimit == nil && lendingLimit != nil ||
186-
quota.LendingLimit != nil && lendingLimit == nil ||
187-
quota.LendingLimit != nil && lendingLimit != nil && quota.LendingLimit.Cmp(*lendingLimit) != 0 {
188-
limitsChanged = true
189-
resources[i].LendingLimit = lendingLimit
190-
}
191-
}
192-
193-
// update lending limits
194-
if limitsChanged {
195-
err := r.Update(ctx, cq)
196-
if err == nil {
197-
log.FromContext(ctx).Info("Updated lending limits", "Resources", resources)
198-
return ctrl.Result{}, nil
199-
} else if errors.IsConflict(err) {
200-
return ctrl.Result{Requeue: true}, nil
201-
} else {
202-
return ctrl.Result{}, err
203-
}
190+
if noScheduleNodesChanged {
191+
r.triggerDispatch()
192+
log.FromContext(ctx).Info("Updated node NoSchedule information", "Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
204193
}
205-
206-
return ctrl.Result{}, nil
207194
}
208195

209196
// SetupWithManager sets up the controller with the Manager.

internal/controller/appwrapper/node_health_monitor_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,17 @@ import (
2424
"k8s.io/apimachinery/pkg/api/resource"
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/apimachinery/pkg/types"
27+
"sigs.k8s.io/controller-runtime/pkg/event"
2728
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2829
)
2930

3031
var _ = Describe("NodeMonitor Controller", func() {
3132
var slackQueueName = "fake-queue"
3233
var node1Name = types.NamespacedName{Name: "fake-node-1"}
3334
var node2Name = types.NamespacedName{Name: "fake-node-2"}
35+
var dispatch = types.NamespacedName{Name: "*"}
3436
var nodeMonitor *NodeHealthMonitor
37+
var cqMonitor *SlackClusterQueueMonitor
3538
nodeGPUs := v1.ResourceList{v1.ResourceName("nvidia.com/gpu"): resource.MustParse("4")}
3639

3740
BeforeEach(func() {
@@ -49,9 +52,16 @@ var _ = Describe("NodeMonitor Controller", func() {
4952
// Create reconciller
5053
awConfig := config.NewAppWrapperConfig()
5154
awConfig.SlackQueueName = slackQueueName
55+
conduit := make(chan event.GenericEvent, 1)
5256
nodeMonitor = &NodeHealthMonitor{
5357
Client: k8sClient,
5458
Config: awConfig,
59+
Events: conduit,
60+
}
61+
cqMonitor = &SlackClusterQueueMonitor{
62+
Client: k8sClient,
63+
Config: awConfig,
64+
Events: conduit,
5565
}
5666
})
5767

@@ -124,6 +134,8 @@ var _ = Describe("NodeMonitor Controller", func() {
124134
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
125135
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
126136
Expect(err).NotTo(HaveOccurred())
137+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
138+
Expect(err).NotTo(HaveOccurred())
127139

128140
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
129141
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(2)))
@@ -134,6 +146,8 @@ var _ = Describe("NodeMonitor Controller", func() {
134146
Expect(k8sClient.Update(ctx, node2)).Should(Succeed())
135147
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
136148
Expect(err).NotTo(HaveOccurred())
149+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
150+
Expect(err).NotTo(HaveOccurred())
137151

138152
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
139153
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).ShouldNot(BeNil())
@@ -144,6 +158,8 @@ var _ = Describe("NodeMonitor Controller", func() {
144158
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
145159
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
146160
Expect(err).NotTo(HaveOccurred())
161+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
162+
Expect(err).NotTo(HaveOccurred())
147163

148164
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
149165
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).ShouldNot(BeNil())
@@ -154,6 +170,8 @@ var _ = Describe("NodeMonitor Controller", func() {
154170
Expect(k8sClient.Update(ctx, node2)).Should(Succeed())
155171
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
156172
Expect(err).NotTo(HaveOccurred())
173+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
174+
Expect(err).NotTo(HaveOccurred())
157175

158176
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
159177
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).Should(BeNil())
@@ -164,10 +182,22 @@ var _ = Describe("NodeMonitor Controller", func() {
164182
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
165183
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
166184
Expect(err).NotTo(HaveOccurred())
185+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
186+
Expect(err).NotTo(HaveOccurred())
167187

168188
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
169189
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(2)))
170190

191+
// Increase the slack cluster queue's quota by 2 and expect LedningLimit to increase by 2 to become 4
192+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
193+
queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].NominalQuota = resource.MustParse("8")
194+
Expect(k8sClient.Update(ctx, queue)).Should(Succeed())
195+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: slackQueueName}})
196+
Expect(err).NotTo(HaveOccurred())
197+
198+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
199+
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(4)))
200+
171201
Expect(k8sClient.Delete(ctx, queue)).To(Succeed())
172202
})
173203
})

0 commit comments

Comments
 (0)