Skip to content

Commit 25bbff0

Browse files
committed
review comments
1 parent f4ff9d3 commit 25bbff0

File tree

4 files changed

+24
-27
lines changed

4 files changed

+24
-27
lines changed

internal/controller/appwrapper/appwrapper_controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
559559
if pod.DeletionTimestamp.IsZero() {
560560
summary.running += 1
561561
if checkNoExecuteNodes {
562-
nodeInfoMutex.RLock() // BEGIN CRITICAL SECTION
562+
noExecuteNodesMutex.RLock() // BEGIN CRITICAL SECTION
563563
if len(noExecuteNodes) > 0 {
564564
if resources, ok := noExecuteNodes[pod.Spec.NodeName]; ok {
565565
for badResource := range resources {
@@ -584,7 +584,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
584584
}
585585
}
586586
}
587-
nodeInfoMutex.RUnlock() // END CRITICAL SECTION
587+
noExecuteNodesMutex.RUnlock() // END CRITICAL SECTION
588588
}
589589
}
590590
case v1.PodSucceeded:

internal/controller/appwrapper/node_health_monitor.go

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import (
3838

3939
// NodeHealthMonitor watches Nodes and maintains mappings of Nodes that have either
4040
// been marked as Unschedulable or that have been labeled to indicate that
41-
// they have resources that Autopilot has tainted as NoSchedule or NoExeucte.
41+
// they have resources that Autopilot has tainted as NoSchedule or NoExecute.
4242
// This information is used to automate the maintenance of the lendingLimit of
4343
// a designated slack ClusterQueue and to migrate running workloads away from NoExecute resources.
4444
type NodeHealthMonitor struct {
@@ -48,21 +48,18 @@ type NodeHealthMonitor struct {
4848
}
4949

5050
var (
51-
// nodeInfoMutex synchronizes writes by NodeHealthMonitor with reads from AppWrapperReconciler and SlackClusterQueueMonitor
52-
nodeInfoMutex sync.RWMutex
53-
5451
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExecute taint
5552
noExecuteNodes = make(map[string]sets.Set[string])
53+
// noExecuteNodesMutex synchronizes access to noExecuteNodes
54+
noExecuteNodesMutex sync.RWMutex
5655

5756
// noScheduleNodes is a mapping from Node names to ResourceLists of unschedulable resources.
5857
// A resource may be unschedulable either because:
5958
// (a) the Node is cordoned (node.Spec.Unschedulable is true) or
6059
// (b) Autopilot has labeled the Node with a NoExecute or NoSchedule taint for the resource.
6160
noScheduleNodes = make(map[string]v1.ResourceList)
62-
)
63-
64-
const (
65-
dispatchEventName = "*trigger*"
61+
// noScheduleNodesMutex synchronizes access to noScheduleNodes
62+
noScheduleNodesMutex sync.RWMutex
6663
)
6764

6865
// permission to watch nodes
@@ -91,7 +88,7 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
9188
func (r *NodeHealthMonitor) triggerSlackCQMonitor() {
9289
if r.Config.SlackQueueName != "" {
9390
select {
94-
case r.Events <- event.GenericEvent{Object: &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Name: dispatchEventName}}}:
91+
case r.Events <- event.GenericEvent{Object: &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Name: r.Config.SlackQueueName}}}:
9592
default:
9693
// do not block if event is already in channel
9794
}
@@ -101,20 +98,20 @@ func (r *NodeHealthMonitor) triggerSlackCQMonitor() {
10198
// update noExecuteNodes and noScheduleNodes for the deletion of nodeName
10299
func (r *NodeHealthMonitor) updateForNodeDeletion(ctx context.Context, nodeName string) {
103100
if _, ok := noExecuteNodes[nodeName]; ok {
104-
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
101+
noExecuteNodesMutex.Lock() // BEGIN CRITICAL SECTION
105102
delete(noExecuteNodes, nodeName)
106-
nodeInfoMutex.Unlock() // END CRITICAL SECTION
107-
r.triggerSlackCQMonitor()
103+
noExecuteNodesMutex.Unlock() // END CRITICAL SECTION
108104
log.FromContext(ctx).Info("Updated NoExecute information due to Node deletion",
109105
"Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
106+
r.triggerSlackCQMonitor()
110107
}
111108
if _, ok := noScheduleNodes[nodeName]; ok {
112-
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
109+
noScheduleNodesMutex.Lock() // BEGIN CRITICAL SECTION
113110
delete(noScheduleNodes, nodeName)
114-
nodeInfoMutex.Unlock() // END CRITICAL SECTION
115-
r.triggerSlackCQMonitor()
111+
noScheduleNodesMutex.Unlock() // END CRITICAL SECTION
116112
log.FromContext(ctx).Info("Updated NoSchedule information due to Node deletion",
117113
"Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
114+
r.triggerSlackCQMonitor()
118115
}
119116
}
120117

@@ -132,7 +129,7 @@ func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.N
132129
}
133130

134131
noExecuteNodesChanged := false
135-
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
132+
noExecuteNodesMutex.Lock() // BEGIN CRITICAL SECTION
136133
if priorEntry, ok := noExecuteNodes[node.GetName()]; ok {
137134
if len(noExecuteResources) == 0 {
138135
delete(noExecuteNodes, node.GetName())
@@ -145,11 +142,11 @@ func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.N
145142
noExecuteNodes[node.GetName()] = noExecuteResources
146143
noExecuteNodesChanged = true
147144
}
148-
nodeInfoMutex.Unlock() // END CRITICAL SECTION
145+
noExecuteNodesMutex.Unlock() // END CRITICAL SECTION
149146

150147
if noExecuteNodesChanged {
151-
r.triggerSlackCQMonitor()
152148
log.FromContext(ctx).Info("Updated NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
149+
r.triggerSlackCQMonitor()
153150
}
154151
}
155152

@@ -176,7 +173,7 @@ func (r *NodeHealthMonitor) updateNoScheduleNodes(ctx context.Context, node *v1.
176173
}
177174

178175
noScheduleNodesChanged := false
179-
nodeInfoMutex.Lock() // BEGIN CRITICAL SECTION
176+
noScheduleNodesMutex.Lock() // BEGIN CRITICAL SECTION
180177
if priorEntry, ok := noScheduleNodes[node.GetName()]; ok {
181178
if len(noScheduleResources) == 0 {
182179
delete(noScheduleNodes, node.GetName())
@@ -189,11 +186,11 @@ func (r *NodeHealthMonitor) updateNoScheduleNodes(ctx context.Context, node *v1.
189186
noScheduleNodes[node.GetName()] = noScheduleResources
190187
noScheduleNodesChanged = true
191188
}
192-
nodeInfoMutex.Unlock() // END CRITICAL SECTION
189+
noScheduleNodesMutex.Unlock() // END CRITICAL SECTION
193190

194191
if noScheduleNodesChanged {
195-
r.triggerSlackCQMonitor()
196192
log.FromContext(ctx).Info("Updated NoSchedule information", "Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
193+
r.triggerSlackCQMonitor()
197194
}
198195
}
199196

internal/controller/appwrapper/node_health_monitor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@ import (
3030

3131
var _ = Describe("NodeMonitor Controller", func() {
3232
var slackQueueName = "fake-queue"
33+
var dispatch = types.NamespacedName{Name: slackQueueName}
3334
var node1Name = types.NamespacedName{Name: "fake-node-1"}
3435
var node2Name = types.NamespacedName{Name: "fake-node-2"}
35-
var dispatch = types.NamespacedName{Name: dispatchEventName}
3636
var nodeMonitor *NodeHealthMonitor
3737
var cqMonitor *SlackClusterQueueMonitor
3838
nodeGPUs := v1.ResourceList{v1.ResourceName("nvidia.com/gpu"): resource.MustParse("4")}

internal/controller/appwrapper/slackcq_monitor.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type SlackClusterQueueMonitor struct {
4848
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch
4949

5050
func (r *SlackClusterQueueMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
51-
if !(req.Name == dispatchEventName || req.Name == r.Config.SlackQueueName) {
51+
if req.Name != r.Config.SlackQueueName {
5252
return ctrl.Result{}, nil
5353
}
5454

@@ -62,7 +62,7 @@ func (r *SlackClusterQueueMonitor) Reconcile(ctx context.Context, req ctrl.Reque
6262

6363
// Compute the total quantities of unschedulable resources
6464
unschedulableQuantities := map[v1.ResourceName]*resource.Quantity{}
65-
nodeInfoMutex.RLock() // BEGIN CRITICAL SECTION
65+
noScheduleNodesMutex.RLock() // BEGIN CRITICAL SECTION
6666
for _, quantities := range noScheduleNodes {
6767
for resourceName, quantity := range quantities {
6868
if !quantity.IsZero() {
@@ -74,7 +74,7 @@ func (r *SlackClusterQueueMonitor) Reconcile(ctx context.Context, req ctrl.Reque
7474
}
7575
}
7676
}
77-
nodeInfoMutex.RUnlock() // END CRITICAL SECTION
77+
noScheduleNodesMutex.RUnlock() // END CRITICAL SECTION
7878

7979
// enforce lending limits on 1st flavor of 1st resource group
8080
resources := cq.Spec.ResourceGroups[0].Flavors[0].Resources

0 commit comments

Comments
 (0)