Skip to content

reorganize NodeMonitor code; no semantic change #254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions internal/controller/appwrapper/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type podStatusSummary struct {
succeeded int32
failed int32
terminalFailure bool
unhealthyNodes sets.Set[string]
noExecuteNodes sets.Set[string]
}

type componentStatusSummary struct {
Expand Down Expand Up @@ -334,13 +334,13 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
}

// Initiate migration of workloads that are using resources that Autopilot has flagged as unhealthy
detailMsg = fmt.Sprintf("Workload contains pods using unhealthy resources on Nodes: %v", podStatus.unhealthyNodes)
if len(podStatus.unhealthyNodes) > 0 {
// Initiate migration of workloads that are using resources that Autopilot has flagged as NoExecute
detailMsg = fmt.Sprintf("Workload contains pods using NoExecute resources on Nodes: %v", podStatus.noExecuteNodes)
if len(podStatus.noExecuteNodes) > 0 {
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
Type: string(workloadv1beta2.Unhealthy),
Status: metav1.ConditionTrue,
Reason: "AutopilotUnhealthy",
Reason: "AutopilotNoExecute",
Message: detailMsg,
})
r.Recorder.Event(aw, v1.EventTypeNormal, string(workloadv1beta2.Unhealthy), detailMsg)
Expand Down Expand Up @@ -549,7 +549,7 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
return nil, err
}
summary := &podStatusSummary{expected: pc}
checkUnhealthyNodes := r.Config.Autopilot != nil && r.Config.Autopilot.MonitorNodes
checkNoExecuteNodes := r.Config.Autopilot != nil && r.Config.Autopilot.MonitorNodes

for _, pod := range pods.Items {
switch pod.Status.Phase {
Expand All @@ -558,33 +558,33 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
case v1.PodRunning:
if pod.DeletionTimestamp.IsZero() {
summary.running += 1
if checkUnhealthyNodes {
unhealthyNodesMutex.RLock() // BEGIN CRITICAL SECTION
if len(unhealthyNodes) > 0 {
if resources, ok := unhealthyNodes[pod.Spec.NodeName]; ok {
if checkNoExecuteNodes {
noExecuteNodesMutex.RLock() // BEGIN CRITICAL SECTION
if len(noExecuteNodes) > 0 {
if resources, ok := noExecuteNodes[pod.Spec.NodeName]; ok {
for badResource := range resources {
for _, container := range pod.Spec.Containers {
if limit, ok := container.Resources.Limits[v1.ResourceName(badResource)]; ok {
if !limit.IsZero() {
if summary.unhealthyNodes == nil {
summary.unhealthyNodes = make(sets.Set[string])
if summary.noExecuteNodes == nil {
summary.noExecuteNodes = make(sets.Set[string])
}
summary.unhealthyNodes.Insert(pod.Spec.NodeName)
summary.noExecuteNodes.Insert(pod.Spec.NodeName)
}
}
if request, ok := container.Resources.Requests[v1.ResourceName(badResource)]; ok {
if !request.IsZero() {
if summary.unhealthyNodes == nil {
summary.unhealthyNodes = make(sets.Set[string])
if summary.noExecuteNodes == nil {
summary.noExecuteNodes = make(sets.Set[string])
}
summary.unhealthyNodes.Insert(pod.Spec.NodeName)
summary.noExecuteNodes.Insert(pod.Spec.NodeName)
}
}
}
}
}
}
unhealthyNodesMutex.RUnlock() // END CRITICAL SECTION
noExecuteNodesMutex.RUnlock() // END CRITICAL SECTION
}
}
case v1.PodSucceeded:
Expand Down
120 changes: 68 additions & 52 deletions internal/controller/appwrapper/node_health_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,74 +36,46 @@ import (
"github.com/project-codeflare/appwrapper/pkg/config"
)

// NodeHealthMonitor maintains the set of nodes that Autopilot has labelled as unhealthy
// NodeHealthMonitor watches Nodes and maintains mappings of Nodes that have either
// been marked as Unschedulable or that have been labeled to indicate that
// they have resources that Autopilot has tainted as NoSchedule or NoExeucte.
// This information is used to automate the maintenance of the lendingLimit of
// a designated slack ClusterQueue and to migrate running workloads away from NoExecute resources.
type NodeHealthMonitor struct {
client.Client
Config *config.AppWrapperConfig
}

var (
// unhealthyNodes is a mapping from Node names to a set of resources that Autopilot has labeled as unhealthy on that Node
unhealthyNodes = make(map[string]sets.Set[string])
unhealthyNodesMutex sync.RWMutex

// unschedulableNodes is a mapping from Node names to resource quantities than Autopilot has labeled as unschedulable on that Node
unschedulableNodes = make(map[string]map[string]*resource.Quantity)
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExeucte taint
noExecuteNodes = make(map[string]sets.Set[string])
noExecuteNodesMutex sync.RWMutex

// noScheduleNodes is a mapping from Node names to resource quantities that are unschedulable.
// A resource may be unscheduable either because:
// (a) the Node is cordoned (node.Spec.Unschedulable is true) or
// (b) Autopilot has labeled the with either a NoExecute or NoSchedule taint.
noScheduleNodes = make(map[string]map[string]*resource.Quantity)
)

// permission to watch nodes
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch

//gocyclo:ignore
func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
node := &v1.Node{}
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
return ctrl.Result{}, nil
}

flaggedResources := make(sets.Set[string])
for key, value := range node.GetLabels() {
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
for _, taint := range taints {
if key == taint.Key && value == taint.Value && taint.Effect == v1.TaintEffectNoExecute {
flaggedResources.Insert(resourceName)
}
}
}
}

nodeChanged := false
unhealthyNodesMutex.Lock() // BEGIN CRITICAL SECTION
if priorEntry, ok := unhealthyNodes[node.GetName()]; ok {
if len(flaggedResources) == 0 {
delete(unhealthyNodes, node.GetName())
nodeChanged = true
} else if !priorEntry.Equal(flaggedResources) {
unhealthyNodes[node.GetName()] = flaggedResources
nodeChanged = true
}
} else if len(flaggedResources) > 0 {
unhealthyNodes[node.GetName()] = flaggedResources
nodeChanged = true
}
unhealthyNodesMutex.Unlock() // END CRITICAL SECTION
r.updateNoExecuteNodes(ctx, node)

// Unsynchronized reads of unhealthyNodes below are safe because this method
// is the only writer to the map and the controller runtime is configured to
// not allow concurrent execution of this method.

if nodeChanged {
log.FromContext(ctx).Info("Updated node health information", "Number Unhealthy Nodes", len(unhealthyNodes), "Unhealthy Resource Details", unhealthyNodes)
}

// update lending limits on slack quota if configured
// If there is a slack ClusterQueue, update its lending limits

if r.Config.SlackQueueName == "" {
return ctrl.Result{}, nil
}

// get slack quota
cq := &kueue.ClusterQueue{}
if err := r.Get(ctx, types.NamespacedName{Name: r.Config.SlackQueueName}, cq); err != nil {
if errors.IsNotFound(err) {
Expand All @@ -112,36 +84,80 @@ func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, err
}

r.updateNoScheduleNodes(ctx, cq, node)

return r.updateLendingLimits(ctx, cq)
}

func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.Node) {
noExecuteResources := make(sets.Set[string])
for key, value := range node.GetLabels() {
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
for _, taint := range taints {
if key == taint.Key && value == taint.Value && taint.Effect == v1.TaintEffectNoExecute {
noExecuteResources.Insert(resourceName)
}
}
}
}

noExecuteNodesChanged := false
noExecuteNodesMutex.Lock() // BEGIN CRITICAL SECTION
if priorEntry, ok := noExecuteNodes[node.GetName()]; ok {
if len(noExecuteResources) == 0 {
delete(noExecuteNodes, node.GetName())
noExecuteNodesChanged = true
} else if !priorEntry.Equal(noExecuteResources) {
noExecuteNodes[node.GetName()] = noExecuteResources
noExecuteNodesChanged = true
}
} else if len(noExecuteResources) > 0 {
noExecuteNodes[node.GetName()] = noExecuteResources
noExecuteNodesChanged = true
}
noExecuteNodesMutex.Unlock() // END CRITICAL SECTION

// Safe to log outside the mutex because because this method is the only writer of noExecuteNodes
// and the controller runtime is configured to not allow concurrent execution of this controller.
if noExecuteNodesChanged {
log.FromContext(ctx).Info("Updated node NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
}
}

func (r *NodeHealthMonitor) updateNoScheduleNodes(_ context.Context, cq *kueue.ClusterQueue, node *v1.Node) {
// update unschedulable resource quantities for this node
flaggedQuantities := make(map[string]*resource.Quantity)
noScheduleQuantities := make(map[string]*resource.Quantity)
if node.Spec.Unschedulable {
// flag all non-pod resources covered by cq if the node is cordoned
// add all non-pod resources covered by cq if the node is cordoned
for _, resourceName := range cq.Spec.ResourceGroups[0].Flavors[0].Resources {
if string(resourceName.Name) != "pods" {
flaggedQuantities[string(resourceName.Name)] = node.Status.Capacity.Name(resourceName.Name, resource.DecimalSI)
noScheduleQuantities[string(resourceName.Name)] = node.Status.Capacity.Name(resourceName.Name, resource.DecimalSI)
}
}
} else {
for key, value := range node.GetLabels() {
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
for _, taint := range taints {
if key == taint.Key && value == taint.Value {
flaggedQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
noScheduleQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
}
}
}
}
}

if len(flaggedQuantities) > 0 {
unschedulableNodes[node.GetName()] = flaggedQuantities
if len(noScheduleQuantities) > 0 {
noScheduleNodes[node.GetName()] = noScheduleQuantities
} else {
delete(unschedulableNodes, node.GetName())
delete(noScheduleNodes, node.GetName())
}
}

func (r *NodeHealthMonitor) updateLendingLimits(ctx context.Context, cq *kueue.ClusterQueue) (ctrl.Result, error) {

// compute unschedulable resource totals
unschedulableQuantities := map[string]*resource.Quantity{}
for _, quantities := range unschedulableNodes {
for _, quantities := range noScheduleNodes {
for resourceName, quantity := range quantities {
if !quantity.IsZero() {
if unschedulableQuantities[resourceName] == nil {
Expand Down
16 changes: 8 additions & 8 deletions internal/controller/appwrapper/node_health_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(err).NotTo(HaveOccurred())

By("Healthy cluster has no unhealthy nodes")
Expect(len(unhealthyNodes)).Should(Equal(0))
Expect(len(noExecuteNodes)).Should(Equal(0))

By("A node labeled EVICT is detected as unhealthy")
node := getNode(node1Name.Name)
Expand All @@ -84,25 +84,25 @@ var _ = Describe("NodeMonitor Controller", func() {
Expect(err).NotTo(HaveOccurred())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
Expect(err).NotTo(HaveOccurred())
Expect(len(unhealthyNodes)).Should(Equal(1))
Expect(unhealthyNodes).Should(HaveKey(node1Name.Name))
Expect(unhealthyNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu"))
Expect(len(noExecuteNodes)).Should(Equal(1))
Expect(noExecuteNodes).Should(HaveKey(node1Name.Name))
Expect(noExecuteNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu"))

By("Repeated reconcile does not change map")
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expect(err).NotTo(HaveOccurred())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node2Name})
Expect(err).NotTo(HaveOccurred())
Expect(len(unhealthyNodes)).Should(Equal(1))
Expect(unhealthyNodes).Should(HaveKey(node1Name.Name))
Expect(unhealthyNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu"))
Expect(len(noExecuteNodes)).Should(Equal(1))
Expect(noExecuteNodes).Should(HaveKey(node1Name.Name))
Expect(noExecuteNodes[node1Name.Name]).Should(HaveKey("nvidia.com/gpu"))

By("Removing the EVICT label updates unhealthyNodes")
node.Labels["autopilot.ibm.com/gpuhealth"] = "ERR"
Expect(k8sClient.Update(ctx, node)).Should(Succeed())
_, err = nodeMonitor.Reconcile(ctx, reconcile.Request{NamespacedName: node1Name})
Expect(err).NotTo(HaveOccurred())
Expect(len(unhealthyNodes)).Should(Equal(0))
Expect(len(noExecuteNodes)).Should(Equal(0))
})

It("ClusterQueue Lending Adjustment", func() {
Expand Down