Skip to content

Append autopilot anti-affinities to existing matchExpressions array #260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 87 additions & 1 deletion internal/controller/appwrapper/appwrapper_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -63,6 +64,8 @@ var _ = Describe("AppWrapper Controller", func() {
awConfig.FaultTolerance.RetryPausePeriod = 0 * time.Second
awConfig.FaultTolerance.RetryLimit = 0
awConfig.FaultTolerance.SuccessTTL = 0 * time.Second
awConfig.Autopilot.ResourceTaints["nvidia.com/gpu"] = append(awConfig.Autopilot.ResourceTaints["nvidia.com/gpu"], v1.Taint{Key: "extra", Value: "test", Effect: v1.TaintEffectNoExecute})

awReconciler = &AppWrapperReconciler{
Client: k8sClient,
Recorder: &record.FakeRecorder{},
Expand Down Expand Up @@ -156,6 +159,42 @@ var _ = Describe("AppWrapper Controller", func() {
Expect(finished).Should(BeFalse())
}

validateMarkers := func(p *v1.Pod) {
for k, v := range markerPodSet.Annotations {
Expect(p.Annotations).Should(HaveKeyWithValue(k, v))
}
for k, v := range markerPodSet.Labels {
Expect(p.Labels).Should(HaveKeyWithValue(k, v))
}
for _, v := range markerPodSet.Tolerations {
Expect(p.Spec.Tolerations).Should(ContainElement(v))
}
for k, v := range markerPodSet.NodeSelector {
Expect(p.Spec.NodeSelector).Should(HaveKeyWithValue(k, v))
}
}

validateAutopilot := func(p *v1.Pod) {
if p.Spec.Containers[0].Resources.Requests.Name("nvidia.com/gpu", resource.DecimalSI).IsZero() {
Expect(p.Spec.Affinity).Should(BeNil())
} else {
Expect(p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ShouldNot(BeNil())
Expect(p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms).Should(HaveLen(1))
mes := p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions
for _, taint := range awReconciler.Config.Autopilot.ResourceTaints["nvidia.com/gpu"] {
found := false
for _, me := range mes {
if me.Key == taint.Key {
Expect(me.Operator).Should(Equal(v1.NodeSelectorOpNotIn))
Expect(me.Values).Should(ContainElement(taint.Value))
found = true
}
}
Expect(found).Should(BeTrue())
}
}
}

AfterEach(func() {
By("Cleanup the AppWrapper and ensure no Pods remain")
aw := &workloadv1beta2.AppWrapper{}
Expand Down Expand Up @@ -318,6 +357,54 @@ var _ = Describe("AppWrapper Controller", func() {
Expect(err).NotTo(HaveOccurred())
Expect(podStatus.pending).Should(Equal(int32(1)))
})

It("Validating PodSet Injection invariants on minimal pods", func() {
advanceToResuming(pod(100, 0, false), pod(100, 1, true))
beginRunning()
aw := getAppWrapper(awName)
pods := getPods(aw)
Expect(pods).Should(HaveLen(2))

By("Validate expected markers and Autopilot anti-affinities were injected")
for _, p := range pods {
Expect(p.Labels).Should(HaveKeyWithValue(AppWrapperLabel, awName.Name))
validateMarkers(&p)
validateAutopilot(&p)
}
})

It("Validating PodSet Injection invariants on complex pods", func() {
advanceToResuming(complexPodYaml(), complexPodYaml())
beginRunning()
aw := getAppWrapper(awName)
pods := getPods(aw)
Expect(pods).Should(HaveLen(2))

By("Validate expected markers and Autopilot anti-affinities were injected")
for _, p := range pods {
Expect(p.Labels).Should(HaveKeyWithValue(AppWrapperLabel, awName.Name))
validateMarkers(&p)
validateAutopilot(&p)
}

By("Validate complex pod elements were not removed")
for _, p := range pods {
Expect(p.Labels).Should(HaveKeyWithValue("myComplexLabel", "myComplexValue"))
Expect(p.Annotations).Should(HaveKeyWithValue("myComplexAnnotation", "myComplexValue"))
Expect(p.Spec.NodeSelector).Should(HaveKeyWithValue("myComplexSelector", "myComplexValue"))
Expect(p.Spec.Tolerations).Should(ContainElement(v1.Toleration{Key: "myComplexKey", Value: "myComplexValue", Operator: v1.TolerationOpEqual, Effect: v1.TaintEffectNoSchedule}))
mes := p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions
found := false
for _, me := range mes {
if me.Key == "kubernetes.io/hostname" {
Expect(me.Operator).Should(Equal(v1.NodeSelectorOpNotIn))
Expect(me.Values).Should(ContainElement("badHost1"))
found = true
}
}
Expect(found).Should(BeTrue())
}
})
})

var _ = Describe("AppWrapper Annotations", func() {
Expand Down Expand Up @@ -433,5 +520,4 @@ var _ = Describe("AppWrapper Annotations", func() {
Expect(awReconciler.terminalExitCodes(ctx, aw)).Should(Equal([]int{3, 10, 42}))
Expect(awReconciler.retryableExitCodes(ctx, aw)).Should(Equal([]int{10, 20}))
})

})
61 changes: 61 additions & 0 deletions internal/controller/appwrapper/fixtures_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,19 @@ func getNode(name string) *v1.Node {
return node
}

func getPods(aw *workloadv1beta2.AppWrapper) []v1.Pod {
result := []v1.Pod{}
podList := &v1.PodList{}
err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: aw.Namespace})
Expect(err).NotTo(HaveOccurred())
for _, pod := range podList.Items {
if awn, found := pod.Labels[AppWrapperLabel]; found && awn == aw.Name {
result = append(result, pod)
}
}
return result
}

// envTest doesn't have a Pod controller; so simulate it
func setPodStatus(aw *workloadv1beta2.AppWrapper, phase v1.PodPhase, numToChange int32) error {
podList := &v1.PodList{}
Expand Down Expand Up @@ -128,6 +141,54 @@ func pod(milliCPU int64, numGPU int64, declarePodSets bool) workloadv1beta2.AppW
return *awc
}

const complexPodYAML = `
apiVersion: v1
kind: Pod
metadata:
name: %v
labels:
myComplexLabel: myComplexValue
annotations:
myComplexAnnotation: myComplexValue
spec:
restartPolicy: Never
nodeSelector:
myComplexSelector: myComplexValue
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: NotIn
values:
- badHost1
tolerations:
- key: myComplexKey
value: myComplexValue
operator: Equal
effect: NoSchedule
containers:
- name: busybox
image: quay.io/project-codeflare/busybox:1.36
command: ["sh", "-c", "sleep 10"]
resources:
requests:
cpu: 100m
nvidia.com/gpu: 1
limits:
nvidia.com/gpu: 1`

func complexPodYaml() workloadv1beta2.AppWrapperComponent {
yamlString := fmt.Sprintf(complexPodYAML, randName("pod"))
jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
Expect(err).NotTo(HaveOccurred())
awc := &workloadv1beta2.AppWrapperComponent{
Template: runtime.RawExtension{Raw: jsonBytes},
}
return *awc
}

const malformedPodYAML = `
apiVersion: v1
kind: Pod
Expand Down
43 changes: 27 additions & 16 deletions internal/controller/appwrapper/resource_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func hasResourceRequest(spec map[string]interface{}, resource string) bool {
return false
}

func addNodeSelectorsToAffinity(spec map[string]interface{}, selectorTerms []v1.NodeSelectorTerm) error {
func addNodeSelectorsToAffinity(spec map[string]interface{}, exprsToAdd []v1.NodeSelectorRequirement) error {
if _, ok := spec["affinity"]; !ok {
spec["affinity"] = map[string]interface{}{}
}
Expand All @@ -131,24 +131,37 @@ func addNodeSelectorsToAffinity(spec map[string]interface{}, selectorTerms []v1.
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution is not a map")
}
if _, ok := nodeSelector["nodeSelectorTerms"]; !ok {
nodeSelector["nodeSelectorTerms"] = []interface{}{}
nodeSelector["nodeSelectorTerms"] = []interface{}{map[string]interface{}{}}
}
existingTerms, ok := nodeSelector["nodeSelectorTerms"].([]interface{})
if !ok {
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms is not an array")
}
for _, termToAdd := range selectorTerms {
bytes, err := json.Marshal(termToAdd)
if err != nil {
return fmt.Errorf("marshalling selectorTerm %v: %w", termToAdd, err)
for idx, term := range existingTerms {
selTerm, ok := term.(map[string]interface{})
if !ok {
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[%v] is not an map", idx)
}
var obj interface{}
if err = json.Unmarshal(bytes, &obj); err != nil {
return fmt.Errorf("unmarshalling selectorTerm %v: %w", termToAdd, err)
if _, ok := selTerm["matchExpressions"]; !ok {
selTerm["matchExpressions"] = []interface{}{}
}
matchExpressions, ok := selTerm["matchExpressions"].([]interface{})
if !ok {
return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[%v].matchExpressions is not an map", idx)
}
for _, expr := range exprsToAdd {
bytes, err := json.Marshal(expr)
if err != nil {
return fmt.Errorf("marshalling selectorTerm %v: %w", expr, err)
}
var obj interface{}
if err = json.Unmarshal(bytes, &obj); err != nil {
return fmt.Errorf("unmarshalling selectorTerm %v: %w", expr, err)
}
matchExpressions = append(matchExpressions, obj)
}
existingTerms = append(existingTerms, obj)
selTerm["matchExpressions"] = matchExpressions
}
nodeSelector["nodeSelectorTerms"] = existingTerms

return nil
}
Expand Down Expand Up @@ -262,13 +275,11 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
}
}
if len(toAdd) > 0 {
nodeSelectors := []v1.NodeSelectorTerm{}
matchExpressions := []v1.NodeSelectorRequirement{}
for k, v := range toAdd {
nodeSelectors = append(nodeSelectors, v1.NodeSelectorTerm{
MatchExpressions: []v1.NodeSelectorRequirement{{Operator: v1.NodeSelectorOpNotIn, Key: k, Values: v}},
})
matchExpressions = append(matchExpressions, v1.NodeSelectorRequirement{Operator: v1.NodeSelectorOpNotIn, Key: k, Values: v})
}
if err := addNodeSelectorsToAffinity(spec, nodeSelectors); err != nil {
if err := addNodeSelectorsToAffinity(spec, matchExpressions); err != nil {
log.FromContext(ctx).Error(err, "failed to inject Autopilot affinities")
}
}
Expand Down