Skip to content

Commit 5bee10e

Browse files
authored
Avoid storing PodSet inference results in Spec (#147)
Store the results of PodSet inference in the Status, not the Spec. This means we end up doing the inference twice for each AppWrapper (once in the ValidatingWebHook for error checking and again later in the Controller when we have access to the Status), but it avoids mutating the user-provided Spec. Avoiding Spec mutations avoids spurious rejections when the user does multiple kubectl apply operations that do not change the Component itself.
1 parent bd05385 commit 5bee10e

17 files changed

+290
-173
lines changed

api/v1beta2/appwrapper_types.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ type AppWrapperComponent struct {
3939
//+optional
4040
Annotations map[string]string `json:"annotations,omitempty"`
4141

42-
// PodSets contained in the Component
42+
// DeclaredPodSets for the Component (optional for known PodCreating GVKs)
4343
//+optional
44-
PodSets []AppWrapperPodSet `json:"podSets,omitempty"`
44+
DeclaredPodSets []AppWrapperPodSet `json:"podSets,omitempty"`
4545

4646
// PodSetInfos assigned to the Component's PodSets by Kueue
4747
//+optional
@@ -121,6 +121,9 @@ type AppWrapperComponentStatus struct {
121121
// APIVersion is the APIVersion of the Component
122122
APIVersion string `json:"apiVersion"`
123123

124+
// PodSets is the validated PodSets for the Component (either from AppWrapperComponent.DeclaredPodSets or inferred by the controller)
125+
PodSets []AppWrapperPodSet `json:"podSets"`
126+
124127
// Conditions hold the latest available observations of the Component's current state.
125128
//
126129
// The type of the condition could be:

api/v1beta2/zz_generated.deepcopy.go

Lines changed: 9 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/workload.codeflare.dev_appwrappers.yaml

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ spec:
129129
type: object
130130
type: array
131131
podSets:
132-
description: PodSets contained in the Component
132+
description: DeclaredPodSets for the Component (optional for
133+
known PodCreating GVKs)
133134
items:
134135
description: AppWrapperPodSet describes an homogeneous set
135136
of pods
@@ -264,10 +265,31 @@ spec:
264265
name:
265266
description: Name is the name of the Component
266267
type: string
268+
podSets:
269+
description: PodSets is the validated PodSets for the Component
270+
(either from AppWrapperComponent.DeclaredPodSets or inferred
271+
by the controller)
272+
items:
273+
description: AppWrapperPodSet describes an homogeneous set
274+
of pods
275+
properties:
276+
path:
277+
description: Path is the path Component.Template to the
278+
PodTemplateSpec for this PodSet
279+
type: string
280+
replicas:
281+
description: Replicas is the number of pods in this PodSet
282+
format: int32
283+
type: integer
284+
required:
285+
- path
286+
type: object
287+
type: array
267288
required:
268289
- apiVersion
269290
- kind
270291
- name
292+
- podSets
271293
type: object
272294
type: array
273295
conditions:

internal/controller/appwrapper/appwrapper_controller.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3838

3939
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
40+
"github.com/project-codeflare/appwrapper/internal/controller/awstatus"
4041
"github.com/project-codeflare/appwrapper/pkg/config"
4142
"github.com/project-codeflare/appwrapper/pkg/utils"
4243
)
@@ -150,13 +151,19 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
150151
return ctrl.Result{}, err
151152
}
152153
}
153-
aw.Status.ComponentStatus = make([]workloadv1beta2.AppWrapperComponentStatus, len(aw.Spec.Components))
154+
154155
return r.updateStatus(ctx, aw, workloadv1beta2.AppWrapperSuspended)
155156

156157
case workloadv1beta2.AppWrapperSuspended: // no components deployed
157158
if aw.Spec.Suspend {
158159
return ctrl.Result{}, nil // remain suspended
159160
}
161+
162+
// Normally already done as a side-effect of Kueue calling PodSets(), but be absolutely certain before we start using it.
163+
if err := awstatus.EnsureComponentStatusInitialized(ctx, aw); err != nil {
164+
return ctrl.Result{}, err
165+
}
166+
160167
// begin deployment
161168
meta.SetStatusCondition(&aw.Status.Conditions, metav1.Condition{
162169
Type: string(workloadv1beta2.QuotaReserved),

internal/controller/appwrapper/fixtures_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ func pod(milliCPU int64) workloadv1beta2.AppWrapperComponent {
107107
jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
108108
Expect(err).NotTo(HaveOccurred())
109109
return workloadv1beta2.AppWrapperComponent{
110-
PodSets: []workloadv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}},
111-
Template: runtime.RawExtension{Raw: jsonBytes},
110+
DeclaredPodSets: []workloadv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}},
111+
Template: runtime.RawExtension{Raw: jsonBytes},
112112
}
113113
}
114114

@@ -134,7 +134,7 @@ func malformedPod(milliCPU int64) workloadv1beta2.AppWrapperComponent {
134134
jsonBytes, err := yaml.YAMLToJSON([]byte(yamlString))
135135
Expect(err).NotTo(HaveOccurred())
136136
return workloadv1beta2.AppWrapperComponent{
137-
PodSets: []workloadv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}},
138-
Template: runtime.RawExtension{Raw: jsonBytes},
137+
DeclaredPodSets: []workloadv1beta2.AppWrapperPodSet{{Replicas: ptr.To(int32(1)), Path: "template"}},
138+
Template: runtime.RawExtension{Raw: jsonBytes},
139139
}
140140
}

internal/controller/appwrapper/resource_management.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,24 +36,25 @@ import (
3636
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
3737
)
3838

39-
func parseComponent(aw *workloadv1beta2.AppWrapper, raw []byte) (*unstructured.Unstructured, error) {
39+
func parseComponent(raw []byte, expectedNamespace string) (*unstructured.Unstructured, error) {
4040
obj := &unstructured.Unstructured{}
4141
if _, _, err := unstructured.UnstructuredJSONScheme.Decode(raw, nil, obj); err != nil {
4242
return nil, err
4343
}
4444
namespace := obj.GetNamespace()
4545
if namespace == "" {
46-
obj.SetNamespace(aw.Namespace)
47-
} else if namespace != aw.Namespace {
46+
obj.SetNamespace(expectedNamespace)
47+
} else if namespace != expectedNamespace {
4848
// Should not happen, namespace equality checked by validateAppWrapperInvariants
49-
return nil, fmt.Errorf("component namespace \"%s\" is different from appwrapper namespace \"%s\"", namespace, aw.Namespace)
49+
return nil, fmt.Errorf("component namespace \"%s\" is different from appwrapper namespace \"%s\"", namespace, expectedNamespace)
5050
}
5151
return obj, nil
5252
}
5353

5454
//gocyclo:ignore
5555
func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workloadv1beta2.AppWrapper, componentIdx int) (*unstructured.Unstructured, error, bool) {
5656
component := aw.Spec.Components[componentIdx]
57+
componentStatus := aw.Status.ComponentStatus[componentIdx]
5758
toMap := func(x interface{}) map[string]string {
5859
if x == nil {
5960
return nil
@@ -77,7 +78,7 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
7778
}
7879
}
7980

80-
obj, err := parseComponent(aw, component.Template.Raw)
81+
obj, err := parseComponent(component.Template.Raw, aw.Namespace)
8182
if err != nil {
8283
return nil, err, true
8384
}
@@ -88,7 +89,7 @@ func (r *AppWrapperReconciler) createComponent(ctx context.Context, aw *workload
8889
}
8990

9091
awLabels := map[string]string{AppWrapperLabel: aw.Name}
91-
for podSetsIdx, podSet := range component.PodSets {
92+
for podSetsIdx, podSet := range componentStatus.PodSets {
9293
toInject := &workloadv1beta2.AppWrapperPodSetInfo{}
9394
if r.Config.EnableKueueIntegrations {
9495
if podSetsIdx < len(component.PodSetInfos) {

internal/controller/appwrapper/suite_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"sigs.k8s.io/controller-runtime/pkg/log/zap"
4040

4141
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
42+
"github.com/project-codeflare/appwrapper/internal/controller/awstatus"
4243
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
4344
)
4445

@@ -100,6 +101,7 @@ var _ = BeforeSuite(func() {
100101
k8sClient, err = client.New(cfg, client.Options{Scheme: scheme})
101102
Expect(err).NotTo(HaveOccurred())
102103
Expect(k8sClient).NotTo(BeNil())
104+
awstatus.CacheClient(k8sClient)
103105
})
104106

105107
var _ = AfterSuite(func() {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
Copyright 2024 IBM Corporation.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package awstatus
18+
19+
import (
20+
"context"
21+
22+
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
23+
"github.com/project-codeflare/appwrapper/pkg/utils"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
26+
"sigs.k8s.io/controller-runtime/pkg/client"
27+
)
28+
29+
var (
30+
cachedClient client.Client
31+
)
32+
33+
const controllerName = "workload.codeflare.dev-appwrapper"
34+
35+
// CacheClient initializes cachedClient; must be called during startup
36+
func CacheClient(k8sclient client.Client) {
37+
cachedClient = k8sclient
38+
}
39+
40+
// BaseSSAAppWrapper creates a new object based on the input AppWrapper that
41+
// only contains the fields necessary to identify the original object.
42+
// The object can be used as a base for Server-Side-Apply.
43+
func BaseSSAAppWrapper(aw *workloadv1beta2.AppWrapper) *workloadv1beta2.AppWrapper {
44+
patch := &workloadv1beta2.AppWrapper{
45+
ObjectMeta: metav1.ObjectMeta{
46+
UID: aw.UID,
47+
Name: aw.Name,
48+
Namespace: aw.Namespace,
49+
},
50+
TypeMeta: metav1.TypeMeta{
51+
APIVersion: workloadv1beta2.GroupVersion.String(),
52+
Kind: "AppWrapper",
53+
},
54+
}
55+
return patch
56+
}
57+
58+
// EnsureComponentStatusInitialized initializes aw.Status.ComponenetStatus, including performing PodSet inference for known GVKs
59+
func EnsureComponentStatusInitialized(ctx context.Context, aw *workloadv1beta2.AppWrapper) error {
60+
if len(aw.Status.ComponentStatus) == len(aw.Spec.Components) {
61+
return nil
62+
}
63+
64+
// Construct definitive PodSets from the Spec + InferPodSets and cache in the Status (to avoid clashing with user updates to the Spec via apply)
65+
compStatus := make([]workloadv1beta2.AppWrapperComponentStatus, len(aw.Spec.Components))
66+
for idx := range aw.Spec.Components {
67+
if len(aw.Spec.Components[idx].DeclaredPodSets) > 0 {
68+
compStatus[idx].PodSets = aw.Spec.Components[idx].DeclaredPodSets
69+
} else {
70+
obj := &unstructured.Unstructured{}
71+
if _, _, err := unstructured.UnstructuredJSONScheme.Decode(aw.Spec.Components[idx].Template.Raw, nil, obj); err != nil {
72+
// Transient error; Template.Raw was validated by our AdmissionController
73+
return err
74+
}
75+
podSets, err := utils.InferPodSets(obj)
76+
if err != nil {
77+
// Transient error; InferPodSets was validated by our AdmissionController
78+
return err
79+
}
80+
compStatus[idx].PodSets = podSets
81+
}
82+
}
83+
aw.Status.ComponentStatus = compStatus
84+
85+
patch := BaseSSAAppWrapper(aw)
86+
patch.Status.ComponentStatus = compStatus
87+
return cachedClient.Status().Patch(ctx, patch, client.Apply, client.FieldOwner(controllerName), client.ForceOwnership)
88+
}

internal/controller/workload/child_admission_controller.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121
"time"
2222

2323
"k8s.io/apimachinery/pkg/api/meta"
24-
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2524
"k8s.io/apimachinery/pkg/runtime"
25+
"k8s.io/apimachinery/pkg/runtime/schema"
2626

2727
ctrl "sigs.k8s.io/controller-runtime"
2828
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -70,20 +70,19 @@ func (r *ChildWorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Reques
7070
!meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.Unhealthy)) {
7171
admittedChildren := 0
7272
childrenWithPods := 0
73-
for componentIdx, component := range aw.Spec.Components {
74-
if len(component.PodSets) > 0 {
73+
for componentIdx, componentStatus := range aw.Status.ComponentStatus {
74+
if len(componentStatus.PodSets) > 0 {
7575
childrenWithPods += 1
76-
unstruct := &unstructured.Unstructured{}
77-
if _, gvk, err := unstructured.UnstructuredJSONScheme.Decode(component.Template.Raw, nil, unstruct); err == nil {
78-
wlName := jobframework.GetWorkloadNameForOwnerWithGVK(unstruct.GetName(), *gvk)
76+
if gv, err := schema.ParseGroupVersion(componentStatus.APIVersion); err == nil {
77+
wlName := jobframework.GetWorkloadNameForOwnerWithGVK(componentStatus.Name, gv.WithKind(componentStatus.Kind))
7978
wl := &kueue.Workload{}
8079
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: aw.Namespace, Name: wlName}, wl); err == nil {
8180
if workload.IsAdmitted(wl) {
8281
admittedChildren += 1
8382
} else {
8483
admission := kueue.Admission{
8584
ClusterQueue: childJobQueueName,
86-
PodSetAssignments: make([]kueue.PodSetAssignment, len(aw.Spec.Components[componentIdx].PodSets)),
85+
PodSetAssignments: make([]kueue.PodSetAssignment, len(componentStatus.PodSets)),
8786
}
8887
for i := range admission.PodSetAssignments {
8988
admission.PodSetAssignments[i].Name = wl.Spec.PodSets[i].Name

internal/controller/workload/workload_controller.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package workload
1818

1919
import (
20+
"context"
2021
"fmt"
2122

2223
"k8s.io/apimachinery/pkg/api/meta"
@@ -32,6 +33,7 @@ import (
3233
"sigs.k8s.io/kueue/pkg/podset"
3334

3435
workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
36+
"github.com/project-codeflare/appwrapper/internal/controller/awstatus"
3537
"github.com/project-codeflare/appwrapper/pkg/utils"
3638
)
3739

@@ -77,17 +79,21 @@ func (aw *AppWrapper) GVK() schema.GroupVersionKind {
7779

7880
func (aw *AppWrapper) PodSets() []kueue.PodSet {
7981
podSets := []kueue.PodSet{}
80-
for componentIdx, component := range aw.Spec.Components {
81-
if len(component.PodSets) > 0 {
82+
if err := awstatus.EnsureComponentStatusInitialized(context.Background(), (*workloadv1beta2.AppWrapper)(aw)); err != nil {
83+
// Kueue will raise an error on zero length PodSet. Unfortunately, the Kueue API prevents propagating the actual error
84+
return podSets
85+
}
86+
for idx := range aw.Status.ComponentStatus {
87+
if len(aw.Status.ComponentStatus[idx].PodSets) > 0 {
8288
obj := &unstructured.Unstructured{}
83-
if _, _, err := unstructured.UnstructuredJSONScheme.Decode(component.Template.Raw, nil, obj); err != nil {
89+
if _, _, err := unstructured.UnstructuredJSONScheme.Decode(aw.Spec.Components[idx].Template.Raw, nil, obj); err != nil {
8490
continue // Should be unreachable; Template.Raw validated by our AdmissionController
8591
}
86-
for psIdx, podSet := range component.PodSets {
92+
for psIdx, podSet := range aw.Status.ComponentStatus[idx].PodSets {
8793
replicas := utils.Replicas(podSet)
8894
if template, err := utils.GetPodTemplateSpec(obj, podSet.Path); err == nil {
8995
podSets = append(podSets, kueue.PodSet{
90-
Name: fmt.Sprintf("%s-%v-%v", aw.Name, componentIdx, psIdx),
96+
Name: fmt.Sprintf("%s-%v-%v", aw.Name, idx, psIdx),
9197
Template: *template,
9298
Count: replicas,
9399
})
@@ -101,17 +107,16 @@ func (aw *AppWrapper) PodSets() []kueue.PodSet {
101107
// RunWithPodSetsInfo records the assigned PodSetInfos for each component and sets aw.spec.Suspend to false
102108
func (aw *AppWrapper) RunWithPodSetsInfo(podSetsInfo []podset.PodSetInfo) error {
103109
podSetsInfoIndex := 0
104-
for componentIdx := range aw.Spec.Components {
105-
component := &aw.Spec.Components[componentIdx]
106-
if len(component.PodSetInfos) != len(component.PodSets) {
107-
component.PodSetInfos = make([]workloadv1beta2.AppWrapperPodSetInfo, len(component.PodSets))
110+
for idx := range aw.Spec.Components {
111+
if len(aw.Spec.Components[idx].PodSetInfos) != len(aw.Status.ComponentStatus[idx].PodSets) {
112+
aw.Spec.Components[idx].PodSetInfos = make([]workloadv1beta2.AppWrapperPodSetInfo, len(aw.Status.ComponentStatus[idx].PodSets))
108113
}
109-
for podSetIdx := range component.PodSets {
114+
for podSetIdx := range aw.Status.ComponentStatus[idx].PodSets {
110115
podSetsInfoIndex += 1
111116
if podSetsInfoIndex > len(podSetsInfo) {
112117
continue // we will return an error below...continuing to get an accurate count for the error message
113118
}
114-
component.PodSetInfos[podSetIdx] = workloadv1beta2.AppWrapperPodSetInfo{
119+
aw.Spec.Components[idx].PodSetInfos[podSetIdx] = workloadv1beta2.AppWrapperPodSetInfo{
115120
Annotations: podSetsInfo[podSetIdx].Annotations,
116121
Labels: podSetsInfo[podSetIdx].Labels,
117122
NodeSelector: podSetsInfo[podSetIdx].NodeSelector,

0 commit comments

Comments
 (0)