Skip to content

Commit f2561c5

Browse files
committed
Redesign node monitoring to account for Node deletion
1. Split node monitoring into two reconcilers, one to monitor Nodes and one to monitor and update the designated slack ClusterQueue. 2. Remove entries from in memory caches when a Node is deleted. 3. Watch slack cluster queue to be able to react to changes in nominalQuotas and adjust lendingLimits accordingly. Fixes #252.
1 parent 4b282d0 commit f2561c5

File tree

5 files changed

+253
-96
lines changed

5 files changed

+253
-96
lines changed

internal/controller/appwrapper/appwrapper_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
559559
if pod.DeletionTimestamp.IsZero() {
560560
summary.running += 1
561561
if checkNoExecuteNodes {
562-
noExecuteNodesMutex.RLock() // BEGIN CRITICAL SECTION
562+
nodeInfoMutex.RLock() // BEGIN CRITICAL SECTION
563563
if len(noExecuteNodes) > 0 {
564564
if resources, ok := noExecuteNodes[pod.Spec.NodeName]; ok {
565565
for badResource := range resources {
@@ -584,7 +584,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
584584
}
585585
}
586586
}
587-
noExecuteNodesMutex.RUnlock() // END CRITICAL SECTION
587+
nodeInfoMutex.RUnlock() // END CRITICAL SECTION
588588
}
589589
}
590590
case v1.PodSucceeded:

internal/controller/appwrapper/node_health_monitor.go

Lines changed: 86 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,20 @@ package appwrapper
1818

1919
import (
2020
"context"
21+
"maps"
2122
"sync"
2223

2324
v1 "k8s.io/api/core/v1"
2425
"k8s.io/apimachinery/pkg/api/errors"
2526
"k8s.io/apimachinery/pkg/api/resource"
26-
"k8s.io/apimachinery/pkg/types"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/util/sets"
28-
"k8s.io/utils/ptr"
2929

3030
ctrl "sigs.k8s.io/controller-runtime"
3131
"sigs.k8s.io/controller-runtime/pkg/client"
32+
"sigs.k8s.io/controller-runtime/pkg/event"
3233
"sigs.k8s.io/controller-runtime/pkg/handler"
3334
"sigs.k8s.io/controller-runtime/pkg/log"
34-
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
3535

3636
"github.com/project-codeflare/appwrapper/pkg/config"
3737
)
@@ -44,51 +44,82 @@ import (
4444
type NodeHealthMonitor struct {
4545
client.Client
4646
Config *config.AppWrapperConfig
47+
Events chan event.GenericEvent // event channel for NodeHealthMonitor to trigger SlackClusterQueueMonitor
4748
}
4849

4950
var (
50-
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExeucte taint
51-
noExecuteNodes = make(map[string]sets.Set[string])
52-
noExecuteNodesMutex sync.RWMutex
51+
// nodeInfoMutex syncnornized writes by NodeHealthMonitor with reads from AppWrapperReconciler and SlackClusterQueueMonitor
52+
nodeInfoMutex sync.RWMutex
5353

54-
// noScheduleNodes is a mapping from Node names to resource quantities that are unschedulable.
54+
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExecute taint
55+
noExecuteNodes = make(map[string]sets.Set[string])
56+
57+
// noScheduleNodes is a mapping from Node names to ResourceLists of unschedulable resources.
5558
// A resource may be unscheduable either because:
5659
// (a) the Node is cordoned (node.Spec.Unschedulable is true) or
57-
// (b) Autopilot has labeled the with either a NoExecute or NoSchedule taint.
58-
noScheduleNodes = make(map[string]map[string]*resource.Quantity)
60+
// (b) Autopilot has labeled the Node with a NoExecute or NoSchedule taint for the resource.
61+
noScheduleNodes = make(map[string]v1.ResourceList)
62+
)
63+
64+
const (
65+
dispatchEventName = "*trigger*"
5966
)
6067

6168
// permission to watch nodes
6269
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
63-
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch
6470

6571
func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
6672
node := &v1.Node{}
6773
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
68-
return ctrl.Result{}, nil
74+
if errors.IsNotFound(err) {
75+
r.updateForNodeDeletion(ctx, req.Name)
76+
return ctrl.Result{}, nil
77+
}
78+
return ctrl.Result{}, err
6979
}
7080

71-
r.updateNoExecuteNodes(ctx, node)
72-
73-
// If there is a slack ClusterQueue, update its lending limits
74-
75-
if r.Config.SlackQueueName == "" {
76-
return ctrl.Result{}, nil
81+
if node.DeletionTimestamp.IsZero() {
82+
r.updateNoExecuteNodes(ctx, node)
83+
r.updateNoScheduleNodes(ctx, node)
84+
} else {
85+
r.updateForNodeDeletion(ctx, req.Name)
7786
}
7887

79-
cq := &kueue.ClusterQueue{}
80-
if err := r.Get(ctx, types.NamespacedName{Name: r.Config.SlackQueueName}, cq); err != nil {
81-
if errors.IsNotFound(err) {
82-
return ctrl.Result{}, nil // give up if slack quota is not defined
88+
return ctrl.Result{}, nil
89+
}
90+
91+
func (r *NodeHealthMonitor) triggerSlackCQMonitor() {
92+
if r.Config.SlackQueueName != "" {
93+
select {
94+
// Trigger dispatch by means of "*/*" request
95+
case r.Events <- event.GenericEvent{Object: &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Name: dispatchEventName}}}:
96+
default:
97+
// do not block if event is already in channel
8398
}
84-
return ctrl.Result{}, err
8599
}
100+
}
86101

87-
r.updateNoScheduleNodes(ctx, cq, node)
88-
89-
return r.updateLendingLimits(ctx, cq)
102+
// update for the deletion of nodeName
103+
func (r *NodeHealthMonitor) updateForNodeDeletion(ctx context.Context, nodeName string) {
104+
if _, ok := noExecuteNodes[nodeName]; ok {
105+
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
106+
delete(noExecuteNodes, nodeName)
107+
nodeInfoMutex.Unlock() // END CRITICAL SECTION
108+
r.triggerSlackCQMonitor()
109+
log.FromContext(ctx).Info("Updated NoExecute information due to Node deletion",
110+
"Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
111+
}
112+
if _, ok := noScheduleNodes[nodeName]; ok {
113+
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
114+
delete(noScheduleNodes, nodeName)
115+
nodeInfoMutex.Unlock() // END CRITICAL SECTION
116+
r.triggerSlackCQMonitor()
117+
log.FromContext(ctx).Info("Updated NoSchedule information due to Node deletion",
118+
"Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
119+
}
90120
}
91121

122+
// update noExecuteNodes entry for node
92123
func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.Node) {
93124
noExecuteResources := make(sets.Set[string])
94125
for key, value := range node.GetLabels() {
@@ -102,7 +133,7 @@ func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.N
102133
}
103134

104135
noExecuteNodesChanged := false
105-
noExecuteNodesMutex.Lock() // BEGIN CRITICAL SECTION
136+
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
106137
if priorEntry, ok := noExecuteNodes[node.GetName()]; ok {
107138
if len(noExecuteResources) == 0 {
108139
delete(noExecuteNodes, node.GetName())
@@ -115,95 +146,56 @@ func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.N
115146
noExecuteNodes[node.GetName()] = noExecuteResources
116147
noExecuteNodesChanged = true
117148
}
118-
noExecuteNodesMutex.Unlock() // END CRITICAL SECTION
149+
nodeInfoMutex.Unlock() // END CRITICAL SECTION
119150

120-
// Safe to log outside the mutex because because this method is the only writer of noExecuteNodes
121-
// and the controller runtime is configured to not allow concurrent execution of this controller.
122151
if noExecuteNodesChanged {
123-
log.FromContext(ctx).Info("Updated node NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
152+
r.triggerSlackCQMonitor()
153+
log.FromContext(ctx).Info("Updated NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
124154
}
125155
}
126156

127-
func (r *NodeHealthMonitor) updateNoScheduleNodes(_ context.Context, cq *kueue.ClusterQueue, node *v1.Node) {
128-
// update unschedulable resource quantities for this node
129-
noScheduleQuantities := make(map[string]*resource.Quantity)
157+
// update noScheduleNodes entry for node
158+
func (r *NodeHealthMonitor) updateNoScheduleNodes(ctx context.Context, node *v1.Node) {
159+
var noScheduleResources v1.ResourceList
130160
if node.Spec.Unschedulable {
131-
// add all non-pod resources covered by cq if the node is cordoned
132-
for _, resourceName := range cq.Spec.ResourceGroups[0].Flavors[0].Resources {
133-
if string(resourceName.Name) != "pods" {
134-
noScheduleQuantities[string(resourceName.Name)] = node.Status.Capacity.Name(resourceName.Name, resource.DecimalSI)
135-
}
136-
}
161+
noScheduleResources = node.Status.Capacity.DeepCopy()
162+
delete(noScheduleResources, v1.ResourcePods)
137163
} else {
164+
noScheduleResources = make(v1.ResourceList)
138165
for key, value := range node.GetLabels() {
139166
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
140167
for _, taint := range taints {
141168
if key == taint.Key && value == taint.Value {
142-
noScheduleQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
169+
quantity := node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
170+
if !quantity.IsZero() {
171+
noScheduleResources[v1.ResourceName(resourceName)] = *quantity
172+
}
143173
}
144174
}
145175
}
146176
}
147177
}
148178

149-
if len(noScheduleQuantities) > 0 {
150-
noScheduleNodes[node.GetName()] = noScheduleQuantities
151-
} else {
152-
delete(noScheduleNodes, node.GetName())
153-
}
154-
}
155-
156-
func (r *NodeHealthMonitor) updateLendingLimits(ctx context.Context, cq *kueue.ClusterQueue) (ctrl.Result, error) {
157-
158-
// compute unschedulable resource totals
159-
unschedulableQuantities := map[string]*resource.Quantity{}
160-
for _, quantities := range noScheduleNodes {
161-
for resourceName, quantity := range quantities {
162-
if !quantity.IsZero() {
163-
if unschedulableQuantities[resourceName] == nil {
164-
unschedulableQuantities[resourceName] = ptr.To(*quantity)
165-
} else {
166-
unschedulableQuantities[resourceName].Add(*quantity)
167-
}
168-
}
169-
}
170-
}
171-
172-
// enforce lending limits on 1st flavor of 1st resource group
173-
resources := cq.Spec.ResourceGroups[0].Flavors[0].Resources
174-
limitsChanged := false
175-
for i, quota := range resources {
176-
var lendingLimit *resource.Quantity
177-
if unschedulableQuantity := unschedulableQuantities[quota.Name.String()]; unschedulableQuantity != nil {
178-
if quota.NominalQuota.Cmp(*unschedulableQuantity) > 0 {
179-
lendingLimit = ptr.To(quota.NominalQuota)
180-
lendingLimit.Sub(*unschedulableQuantity)
181-
} else {
182-
lendingLimit = resource.NewQuantity(0, resource.DecimalSI)
183-
}
184-
}
185-
if quota.LendingLimit == nil && lendingLimit != nil ||
186-
quota.LendingLimit != nil && lendingLimit == nil ||
187-
quota.LendingLimit != nil && lendingLimit != nil && quota.LendingLimit.Cmp(*lendingLimit) != 0 {
188-
limitsChanged = true
189-
resources[i].LendingLimit = lendingLimit
179+
noScheduleNodesChanged := false
180+
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
181+
if priorEntry, ok := noScheduleNodes[node.GetName()]; ok {
182+
if len(noScheduleResources) == 0 {
183+
delete(noScheduleNodes, node.GetName())
184+
noScheduleNodesChanged = true
185+
} else if !maps.Equal(priorEntry, noScheduleResources) {
186+
noScheduleNodes[node.GetName()] = noScheduleResources
187+
noScheduleNodesChanged = true
190188
}
189+
} else if len(noScheduleResources) > 0 {
190+
noScheduleNodes[node.GetName()] = noScheduleResources
191+
noScheduleNodesChanged = true
191192
}
193+
nodeInfoMutex.Unlock() // END CRITICAL SECTION
192194

193-
// update lending limits
194-
if limitsChanged {
195-
err := r.Update(ctx, cq)
196-
if err == nil {
197-
log.FromContext(ctx).Info("Updated lending limits", "Resources", resources)
198-
return ctrl.Result{}, nil
199-
} else if errors.IsConflict(err) {
200-
return ctrl.Result{Requeue: true}, nil
201-
} else {
202-
return ctrl.Result{}, err
203-
}
195+
if noScheduleNodesChanged {
196+
r.triggerSlackCQMonitor()
197+
log.FromContext(ctx).Info("Updated NoSchedule information", "Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
204198
}
205-
206-
return ctrl.Result{}, nil
207199
}
208200

209201
// SetupWithManager sets up the controller with the Manager.

internal/controller/appwrapper/node_health_monitor_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,17 @@ import (
2424
"k8s.io/apimachinery/pkg/api/resource"
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/apimachinery/pkg/types"
27+
"sigs.k8s.io/controller-runtime/pkg/event"
2728
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2829
)
2930

3031
var _ = Describe("NodeMonitor Controller", func() {
3132
var slackQueueName = "fake-queue"
3233
var node1Name = types.NamespacedName{Name: "fake-node-1"}
3334
var node2Name = types.NamespacedName{Name: "fake-node-2"}
35+
var dispatch = types.NamespacedName{Name: dispatchEventName}
3436
var nodeMonitor *NodeHealthMonitor
37+
var cqMonitor *SlackClusterQueueMonitor
3538
nodeGPUs := v1.ResourceList{v1.ResourceName("nvidia.com/gpu"): resource.MustParse("4")}
3639

3740
BeforeEach(func() {
@@ -49,9 +52,16 @@ var _ = Describe("NodeMonitor Controller", func() {
4952
// Create reconciller
5053
awConfig := config.NewAppWrapperConfig()
5154
awConfig.SlackQueueName = slackQueueName
55+
conduit := make(chan event.GenericEvent, 1)
5256
nodeMonitor = &NodeHealthMonitor{
5357
Client: k8sClient,
5458
Config: awConfig,
59+
Events: conduit,
60+
}
61+
cqMonitor = &SlackClusterQueueMonitor{
62+
Client: k8sClient,
63+
Config: awConfig,
64+
Events: conduit,
5565
}
5666
})
5767

@@ -124,6 +134,8 @@ var _ = Describe("NodeMonitor Controller", func() {
124134
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
125135
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
126136
Expect(err).NotTo(HaveOccurred())
137+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
138+
Expect(err).NotTo(HaveOccurred())
127139

128140
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
129141
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(2)))
@@ -134,6 +146,8 @@ var _ = Describe("NodeMonitor Controller", func() {
134146
Expect(k8sClient.Update(ctx, node2)).Should(Succeed())
135147
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
136148
Expect(err).NotTo(HaveOccurred())
149+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
150+
Expect(err).NotTo(HaveOccurred())
137151

138152
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
139153
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).ShouldNot(BeNil())
@@ -144,6 +158,8 @@ var _ = Describe("NodeMonitor Controller", func() {
144158
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
145159
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
146160
Expect(err).NotTo(HaveOccurred())
161+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
162+
Expect(err).NotTo(HaveOccurred())
147163

148164
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
149165
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).ShouldNot(BeNil())
@@ -154,6 +170,8 @@ var _ = Describe("NodeMonitor Controller", func() {
154170
Expect(k8sClient.Update(ctx, node2)).Should(Succeed())
155171
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
156172
Expect(err).NotTo(HaveOccurred())
173+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
174+
Expect(err).NotTo(HaveOccurred())
157175

158176
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
159177
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit).Should(BeNil())
@@ -164,10 +182,22 @@ var _ = Describe("NodeMonitor Controller", func() {
164182
Expect(k8sClient.Update(ctx, node1)).Should(Succeed())
165183
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
166184
Expect(err).NotTo(HaveOccurred())
185+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: dispatch})
186+
Expect(err).NotTo(HaveOccurred())
167187

168188
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
169189
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(2)))
170190

191+
// Increase the slack cluster queue's quota by 2 and expect LedningLimit to increase by 2 to become 4
192+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
193+
queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].NominalQuota = resource.MustParse("8")
194+
Expect(k8sClient.Update(ctx, queue)).Should(Succeed())
195+
_, err = cqMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: slackQueueName}})
196+
Expect(err).NotTo(HaveOccurred())
197+
198+
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: slackQueueName}, queue)).Should(Succeed())
199+
Expect(queue.Spec.ResourceGroups[0].Flavors[0].Resources[0].LendingLimit.Value()).Should(Equal(int64(4)))
200+
171201
Expect(k8sClient.Delete(ctx, queue)).To(Succeed())
172202
})
173203
})

0 commit comments

Comments
 (0)