diff --git a/internal/controller/appwrapper/appwrapper_controller_test.go b/internal/controller/appwrapper/appwrapper_controller_test.go index 4114321..3cc6314 100644 --- a/internal/controller/appwrapper/appwrapper_controller_test.go +++ b/internal/controller/appwrapper/appwrapper_controller_test.go @@ -23,6 +23,7 @@ import ( . "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" @@ -63,6 +64,8 @@ var _ = Describe("AppWrapper Controller", func() { awConfig.FaultTolerance.RetryPausePeriod = 0 * time.Second awConfig.FaultTolerance.RetryLimit = 0 awConfig.FaultTolerance.SuccessTTL = 0 * time.Second + awConfig.Autopilot.ResourceTaints["nvidia.com/gpu"] = append(awConfig.Autopilot.ResourceTaints["nvidia.com/gpu"], v1.Taint{Key: "extra", Value: "test", Effect: v1.TaintEffectNoExecute}) + awReconciler = &AppWrapperReconciler{ Client: k8sClient, Recorder: &record.FakeRecorder{}, @@ -156,6 +159,42 @@ var _ = Describe("AppWrapper Controller", func() { Expect(finished).Should(BeFalse()) } + validateMarkers := func(p *v1.Pod) { + for k, v := range markerPodSet.Annotations { + Expect(p.Annotations).Should(HaveKeyWithValue(k, v)) + } + for k, v := range markerPodSet.Labels { + Expect(p.Labels).Should(HaveKeyWithValue(k, v)) + } + for _, v := range markerPodSet.Tolerations { + Expect(p.Spec.Tolerations).Should(ContainElement(v)) + } + for k, v := range markerPodSet.NodeSelector { + Expect(p.Spec.NodeSelector).Should(HaveKeyWithValue(k, v)) + } + } + + validateAutopilot := func(p *v1.Pod) { + if p.Spec.Containers[0].Resources.Requests.Name("nvidia.com/gpu", resource.DecimalSI).IsZero() { + Expect(p.Spec.Affinity).Should(BeNil()) + } else { + Expect(p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).ShouldNot(BeNil()) + Expect(p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms).Should(HaveLen(1)) + mes := p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions + for _, taint := range awReconciler.Config.Autopilot.ResourceTaints["nvidia.com/gpu"] { + found := false + for _, me := range mes { + if me.Key == taint.Key { + Expect(me.Operator).Should(Equal(v1.NodeSelectorOpNotIn)) + Expect(me.Values).Should(ContainElement(taint.Value)) + found = true + } + } + Expect(found).Should(BeTrue()) + } + } + } + AfterEach(func() { By("Cleanup the AppWrapper and ensure no Pods remain") aw := &workloadv1beta2.AppWrapper{} @@ -318,6 +357,54 @@ var _ = Describe("AppWrapper Controller", func() { Expect(err).NotTo(HaveOccurred()) Expect(podStatus.pending).Should(Equal(int32(1))) }) + + It("Validating PodSet Injection invariants on minimal pods", func() { + advanceToResuming(pod(100, 0, false), pod(100, 1, true)) + beginRunning() + aw := getAppWrapper(awName) + pods := getPods(aw) + Expect(pods).Should(HaveLen(2)) + + By("Validate expected markers and Autopilot anti-affinities were injected") + for _, p := range pods { + Expect(p.Labels).Should(HaveKeyWithValue(AppWrapperLabel, awName.Name)) + validateMarkers(&p) + validateAutopilot(&p) + } + }) + + It("Validating PodSet Injection invariants on complex pods", func() { + advanceToResuming(complexPodYaml(), complexPodYaml()) + beginRunning() + aw := getAppWrapper(awName) + pods := getPods(aw) + Expect(pods).Should(HaveLen(2)) + + By("Validate expected markers and Autopilot anti-affinities were injected") + for _, p := range pods { + Expect(p.Labels).Should(HaveKeyWithValue(AppWrapperLabel, awName.Name)) + validateMarkers(&p) + validateAutopilot(&p) + } + + By("Validate complex pod elements were not removed") + for _, p := range pods { + Expect(p.Labels).Should(HaveKeyWithValue("myComplexLabel", "myComplexValue")) + Expect(p.Annotations).Should(HaveKeyWithValue("myComplexAnnotation", "myComplexValue")) + Expect(p.Spec.NodeSelector).Should(HaveKeyWithValue("myComplexSelector", "myComplexValue")) + Expect(p.Spec.Tolerations).Should(ContainElement(v1.Toleration{Key: "myComplexKey", Value: "myComplexValue", Operator: v1.TolerationOpEqual, Effect: v1.TaintEffectNoSchedule})) + mes := p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions + found := false + for _, me := range mes { + if me.Key == "kubernetes.io/hostname" { + Expect(me.Operator).Should(Equal(v1.NodeSelectorOpNotIn)) + Expect(me.Values).Should(ContainElement("badHost1")) + found = true + } + } + Expect(found).Should(BeTrue()) + } + }) }) var _ = Describe("AppWrapper Annotations", func() { @@ -433,5 +520,4 @@ var _ = Describe("AppWrapper Annotations", func() { Expect(awReconciler.terminalExitCodes(ctx, aw)).Should(Equal([]int{3, 10, 42})) Expect(awReconciler.retryableExitCodes(ctx, aw)).Should(Equal([]int{10, 20})) }) - }) diff --git a/internal/controller/appwrapper/fixtures_test.go b/internal/controller/appwrapper/fixtures_test.go index dbbdaf5..9fb4c47 100644 --- a/internal/controller/appwrapper/fixtures_test.go +++ b/internal/controller/appwrapper/fixtures_test.go @@ -69,6 +69,19 @@ func getNode(name string) *v1.Node { return node } +func getPods(aw *workloadv1beta2.AppWrapper) []v1.Pod { + result := []v1.Pod{} + podList := &v1.PodList{} + err := k8sClient.List(ctx, podList, &client.ListOptions{Namespace: aw.Namespace}) + Expect(err).NotTo(HaveOccurred()) + for _, pod := range podList.Items { + if awn, found := pod.Labels[AppWrapperLabel]; found && awn == aw.Name { + result = append(result, pod) + } + } + return result +} + // envTest doesn't have a Pod controller; so simulate it func setPodStatus(aw *workloadv1beta2.AppWrapper, phase v1.PodPhase, numToChange int32) error { podList := &v1.PodList{} @@ -128,6 +141,54 @@ func pod(milliCPU int64, numGPU int64, declarePodSets bool) workloadv1beta2.AppW return *awc } +const complexPodYAML = ` +apiVersion: v1 +kind: Pod +metadata: + name: %v + labels: + myComplexLabel: myComplexValue + annotations: + myComplexAnnotation: myComplexValue +spec: + restartPolicy: Never + nodeSelector: + myComplexSelector: myComplexValue + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: kubernetes.io/hostname + operator: NotIn + values: + - badHost1 + tolerations: + - key: myComplexKey + value: myComplexValue + operator: Equal + effect: NoSchedule + containers: + - name: busybox + image: quay.io/project-codeflare/busybox:1.36 + command: ["sh", "-c", "sleep 10"] + resources: + requests: + cpu: 100m + nvidia.com/gpu: 1 + limits: + nvidia.com/gpu: 1` + +func complexPodYaml() workloadv1beta2.AppWrapperComponent { + yamlString := fmt.Sprintf(complexPodYAML, randName("pod")) + jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString)) + Expect(err).NotTo(HaveOccurred()) + awc := &workloadv1beta2.AppWrapperComponent{ + Template: runtime.RawExtension{Raw: jsonBytes}, + } + return *awc +} + const malformedPodYAML = ` apiVersion: v1 kind: Pod diff --git a/internal/controller/appwrapper/resource_management.go b/internal/controller/appwrapper/resource_management.go index f6e596a..eff0a7f 100644 --- a/internal/controller/appwrapper/resource_management.go +++ b/internal/controller/appwrapper/resource_management.go @@ -108,7 +108,7 @@ func hasResourceRequest(spec map[string]interface{}, resource string) bool { return false } -func addNodeSelectorsToAffinity(spec map[string]interface{}, selectorTerms []v1.NodeSelectorTerm) error { +func addNodeSelectorsToAffinity(spec map[string]interface{}, exprsToAdd []v1.NodeSelectorRequirement) error { if _, ok := spec["affinity"]; !ok { spec["affinity"] = map[string]interface{}{} } @@ -131,24 +131,37 @@ func addNodeSelectorsToAffinity(spec map[string]interface{}, selectorTerms []v1. return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution is not a map") } if _, ok := nodeSelector["nodeSelectorTerms"]; !ok { - nodeSelector["nodeSelectorTerms"] = []interface{}{} + nodeSelector["nodeSelectorTerms"] = []interface{}{map[string]interface{}{}} } existingTerms, ok := nodeSelector["nodeSelectorTerms"].([]interface{}) if !ok { return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms is not an array") } - for _, termToAdd := range selectorTerms { - bytes, err := json.Marshal(termToAdd) - if err != nil { - return fmt.Errorf("marshalling selectorTerm %v: %w", termToAdd, err) + for idx, term := range existingTerms { + selTerm, ok := term.(map[string]interface{}) + if !ok { + return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[%v] is not an map", idx) } - var obj interface{} - if err = json.Unmarshal(bytes, &obj); err != nil { - return fmt.Errorf("unmarshalling selectorTerm %v: %w", termToAdd, err) + if _, ok := selTerm["matchExpressions"]; !ok { + selTerm["matchExpressions"] = []interface{}{} + } + matchExpressions, ok := selTerm["matchExpressions"].([]interface{}) + if !ok { + return fmt.Errorf("spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms[%v].matchExpressions is not an map", idx) + } + for _, expr := range exprsToAdd { + bytes, err := json.Marshal(expr) + if err != nil { + return fmt.Errorf("marshalling selectorTerm %v: %w", expr, err) + } + var obj interface{} + if err = json.Unmarshal(bytes, &obj); err != nil { + return fmt.Errorf("unmarshalling selectorTerm %v: %w", expr, err) + } + matchExpressions = append(matchExpressions, obj) } - existingTerms = append(existingTerms, obj) + selTerm["matchExpressions"] = matchExpressions } - nodeSelector["nodeSelectorTerms"] = existingTerms return nil } @@ -262,13 +275,11 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload } } if len(toAdd) > 0 { - nodeSelectors := []v1.NodeSelectorTerm{} + matchExpressions := []v1.NodeSelectorRequirement{} for k, v := range toAdd { - nodeSelectors = append(nodeSelectors, v1.NodeSelectorTerm{ - MatchExpressions: []v1.NodeSelectorRequirement{{Operator: v1.NodeSelectorOpNotIn, Key: k, Values: v}}, - }) + matchExpressions = append(matchExpressions, v1.NodeSelectorRequirement{Operator: v1.NodeSelectorOpNotIn, Key: k, Values: v}) } - if err := addNodeSelectorsToAffinity(spec, nodeSelectors); err != nil { + if err := addNodeSelectorsToAffinity(spec, matchExpressions); err != nil { log.FromContext(ctx).Error(err, "failed to inject Autopilot affinities") } }