Skip to content

Redesign node monitoring to account for Node deletion #255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 81 additions & 93 deletions internal/controller/appwrapper/node_health_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,77 +18,104 @@ package appwrapper

import (
"context"
"maps"
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"

"github.com/project-codeflare/appwrapper/pkg/config"
)

// NodeHealthMonitor watches Nodes and maintains mappings of Nodes that have either
// been marked as Unschedulable or that have been labeled to indicate that
// they have resources that Autopilot has tainted as NoSchedule or NoExeucte.
// they have resources that Autopilot has tainted as NoSchedule or NoExecute.
// This information is used to automate the maintenance of the lendingLimit of
// a designated slack ClusterQueue and to migrate running workloads away from NoExecute resources.
type NodeHealthMonitor struct {
client.Client
Config *config.AppWrapperConfig
Events chan event.GenericEvent // event channel for NodeHealthMonitor to trigger SlackClusterQueueMonitor
}

var (
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExeucte taint
noExecuteNodes = make(map[string]sets.Set[string])
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExecute taint
noExecuteNodes = make(map[string]sets.Set[string])
// noExecuteNodesMutex synchronizes access to noExecuteNodes
noExecuteNodesMutex sync.RWMutex

// noScheduleNodes is a mapping from Node names to resource quantities that are unschedulable.
// A resource may be unscheduable either because:
// noScheduleNodes is a mapping from Node names to ResourceLists of unschedulable resources.
// A resource may be unschedulable either because:
// (a) the Node is cordoned (node.Spec.Unschedulable is true) or
// (b) Autopilot has labeled the with either a NoExecute or NoSchedule taint.
noScheduleNodes = make(map[string]map[string]*resource.Quantity)
// (b) Autopilot has labeled the Node with a NoExecute or NoSchedule taint for the resource.
noScheduleNodes = make(map[string]v1.ResourceList)
// noScheduleNodesMutex synchronizes access to noScheduleNodes
noScheduleNodesMutex sync.RWMutex
)

// permission to watch nodes
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch

func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
node := &v1.Node{}
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
return ctrl.Result{}, nil
if errors.IsNotFound(err) {
r.updateForNodeDeletion(ctx, req.Name)
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

r.updateNoExecuteNodes(ctx, node)

// If there is a slack ClusterQueue, update its lending limits

if r.Config.SlackQueueName == "" {
return ctrl.Result{}, nil
if node.DeletionTimestamp.IsZero() {
r.updateNoExecuteNodes(ctx, node)
r.updateNoScheduleNodes(ctx, node)
} else {
r.updateForNodeDeletion(ctx, req.Name)
}

cq := &kueue.ClusterQueue{}
if err := r.Get(ctx, types.NamespacedName{Name: r.Config.SlackQueueName}, cq); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil // give up if slack quota is not defined
return ctrl.Result{}, nil
}

func (r *NodeHealthMonitor) triggerSlackCQMonitor() {
if r.Config.SlackQueueName != "" {
select {
case r.Events <- event.GenericEvent{Object: &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Name: r.Config.SlackQueueName}}}:
default:
// do not block if event is already in channel
}
return ctrl.Result{}, err
}
}

r.updateNoScheduleNodes(ctx, cq, node)

return r.updateLendingLimits(ctx, cq)
// update noExecuteNodes and noScheduleNodes for the deletion of nodeName
func (r *NodeHealthMonitor) updateForNodeDeletion(ctx context.Context, nodeName string) {
if _, ok := noExecuteNodes[nodeName]; ok {
noExecuteNodesMutex.Lock() // BEGIN CRITICAL SECTION
delete(noExecuteNodes, nodeName)
noExecuteNodesMutex.Unlock() // END CRITICAL SECTION
log.FromContext(ctx).Info("Updated NoExecute information due to Node deletion",
"Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
r.triggerSlackCQMonitor()
}
if _, ok := noScheduleNodes[nodeName]; ok {
noScheduleNodesMutex.Lock() // BEGIN CRITICAL SECTION
delete(noScheduleNodes, nodeName)
noScheduleNodesMutex.Unlock() // END CRITICAL SECTION
log.FromContext(ctx).Info("Updated NoSchedule information due to Node deletion",
"Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
r.triggerSlackCQMonitor()
}
}

// update noExecuteNodes entry for node
func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.Node) {
noExecuteResources := make(sets.Set[string])
for key, value := range node.GetLabels() {
Expand Down Expand Up @@ -117,93 +144,54 @@ func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.N
}
noExecuteNodesMutex.Unlock() // END CRITICAL SECTION

// Safe to log outside the mutex because because this method is the only writer of noExecuteNodes
// and the controller runtime is configured to not allow concurrent execution of this controller.
if noExecuteNodesChanged {
log.FromContext(ctx).Info("Updated node NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
log.FromContext(ctx).Info("Updated NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
r.triggerSlackCQMonitor()
}
}

func (r *NodeHealthMonitor) updateNoScheduleNodes(_ context.Context, cq *kueue.ClusterQueue, node *v1.Node) {
// update unschedulable resource quantities for this node
noScheduleQuantities := make(map[string]*resource.Quantity)
// update noScheduleNodes entry for node
func (r *NodeHealthMonitor) updateNoScheduleNodes(ctx context.Context, node *v1.Node) {
var noScheduleResources v1.ResourceList
if node.Spec.Unschedulable {
// add all non-pod resources covered by cq if the node is cordoned
for _, resourceName := range cq.Spec.ResourceGroups[0].Flavors[0].Resources {
if string(resourceName.Name) != "pods" {
noScheduleQuantities[string(resourceName.Name)] = node.Status.Capacity.Name(resourceName.Name, resource.DecimalSI)
}
}
noScheduleResources = node.Status.Capacity.DeepCopy()
delete(noScheduleResources, v1.ResourcePods)
} else {
noScheduleResources = make(v1.ResourceList)
for key, value := range node.GetLabels() {
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
for _, taint := range taints {
if key == taint.Key && value == taint.Value {
noScheduleQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
quantity := node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
if !quantity.IsZero() {
noScheduleResources[v1.ResourceName(resourceName)] = *quantity
}
}
}
}
}
}

if len(noScheduleQuantities) > 0 {
noScheduleNodes[node.GetName()] = noScheduleQuantities
} else {
delete(noScheduleNodes, node.GetName())
}
}

func (r *NodeHealthMonitor) updateLendingLimits(ctx context.Context, cq *kueue.ClusterQueue) (ctrl.Result, error) {

// compute unschedulable resource totals
unschedulableQuantities := map[string]*resource.Quantity{}
for _, quantities := range noScheduleNodes {
for resourceName, quantity := range quantities {
if !quantity.IsZero() {
if unschedulableQuantities[resourceName] == nil {
unschedulableQuantities[resourceName] = ptr.To(*quantity)
} else {
unschedulableQuantities[resourceName].Add(*quantity)
}
}
}
}

// enforce lending limits on 1st flavor of 1st resource group
resources := cq.Spec.ResourceGroups[0].Flavors[0].Resources
limitsChanged := false
for i, quota := range resources {
var lendingLimit *resource.Quantity
if unschedulableQuantity := unschedulableQuantities[quota.Name.String()]; unschedulableQuantity != nil {
if quota.NominalQuota.Cmp(*unschedulableQuantity) > 0 {
lendingLimit = ptr.To(quota.NominalQuota)
lendingLimit.Sub(*unschedulableQuantity)
} else {
lendingLimit = resource.NewQuantity(0, resource.DecimalSI)
}
}
if quota.LendingLimit == nil && lendingLimit != nil ||
quota.LendingLimit != nil && lendingLimit == nil ||
quota.LendingLimit != nil && lendingLimit != nil && quota.LendingLimit.Cmp(*lendingLimit) != 0 {
limitsChanged = true
resources[i].LendingLimit = lendingLimit
noScheduleNodesChanged := false
noScheduleNodesMutex.Lock() // BEGIN CRITICAL SECTION
if priorEntry, ok := noScheduleNodes[node.GetName()]; ok {
if len(noScheduleResources) == 0 {
delete(noScheduleNodes, node.GetName())
noScheduleNodesChanged = true
} else if !maps.Equal(priorEntry, noScheduleResources) {
noScheduleNodes[node.GetName()] = noScheduleResources
noScheduleNodesChanged = true
}
} else if len(noScheduleResources) > 0 {
noScheduleNodes[node.GetName()] = noScheduleResources
noScheduleNodesChanged = true
}
noScheduleNodesMutex.Unlock() // END CRITICAL SECTION

// update lending limits
if limitsChanged {
err := r.Update(ctx, cq)
if err == nil {
log.FromContext(ctx).Info("Updated lending limits", "Resources", resources)
return ctrl.Result{}, nil
} else if errors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
} else {
return ctrl.Result{}, err
}
if noScheduleNodesChanged {
log.FromContext(ctx).Info("Updated NoSchedule information", "Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
r.triggerSlackCQMonitor()
}

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
Loading