diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index bc8d3b3ab2798..f022827e02ece 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -702,6 +702,14 @@ const ( // Enables support for the StorageVersionMigrator controller. StorageVersionMigrator featuregate.Feature = "StorageVersionMigrator" + // owner: @serathius + // Allow API server JSON encoder to encode collections item by item, instead of all at once. + StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON" + + // owner: serathius + // Allow API server Protobuf encoder to encode collections item by item, instead of all at once. + StreamingCollectionEncodingToProtobuf featuregate.Feature = "StreamingCollectionEncodingToProtobuf" + // owner: @robscott // kep: https://kep.k8s.io/2433 // diff --git a/pkg/features/versioned_kube_features.go b/pkg/features/versioned_kube_features.go index 348c04df2ba0d..af2eaafa9ce29 100644 --- a/pkg/features/versioned_kube_features.go +++ b/pkg/features/versioned_kube_features.go @@ -756,6 +756,14 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha}, }, + StreamingCollectionEncodingToJSON: { + {Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Beta}, + }, + + StreamingCollectionEncodingToProtobuf: { + {Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Beta}, + }, + SupplementalGroupsPolicy: { {Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha}, }, diff --git a/pkg/registry/core/rest/storage_core_generic.go b/pkg/registry/core/rest/storage_core_generic.go index 664724f3bc021..a7e49dd526aca 100644 --- a/pkg/registry/core/rest/storage_core_generic.go +++ b/pkg/registry/core/rest/storage_core_generic.go @@ -73,8 +73,18 @@ func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.API ParameterCodec: legacyscheme.ParameterCodec, NegotiatedSerializer: legacyscheme.Codecs, } + opts := []serializer.CodecFactoryOptionsMutator{} if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) { - apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, serializer.WithSerializer(cbor.NewSerializerInfo)) + opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo)) + } + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON()) + } + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf()) + } + if len(opts) != 0 { + apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, opts...) } eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds())) diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go index d5ecec4922c7b..7ecaf61b11405 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler.go @@ -854,6 +854,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd clusterScoped := crd.Spec.Scope == apiextensionsv1.ClusterScoped // CRDs explicitly do not support protobuf, but some objects returned by the API server do + streamingCollections := utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) negotiatedSerializer := unstructuredNegotiatedSerializer{ typer: typer, creator: creator, @@ -867,10 +868,11 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd MediaTypeType: "application", MediaTypeSubType: "json", EncodesAsText: true, - Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{}), + Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{StreamingCollectionsEncoding: streamingCollections}), PrettySerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{Pretty: true}), StrictSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{ - Strict: true, + Strict: true, + StreamingCollectionsEncoding: streamingCollections, }), StreamSerializer: &runtime.StreamSerializerInfo{ EncodesAsText: true, @@ -893,7 +895,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd MediaType: "application/vnd.kubernetes.protobuf", MediaTypeType: "application", MediaTypeSubType: "vnd.kubernetes.protobuf", - Serializer: protobuf.NewSerializer(creator, typer), + Serializer: protobuf.NewSerializerWithOptions(creator, typer, protobuf.SerializerOptions{ + StreamingCollectionsEncoding: utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf), + }), StreamSerializer: &runtime.StreamSerializerInfo{ Serializer: protobuf.NewRawSerializer(creator, typer), Framer: protobuf.LengthDelimitedFramer, @@ -973,6 +977,12 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) { opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo)) } + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON()) + } + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf()) + } scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...) scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale") scaleScope.Namer = handlers.ContextBasedNaming{ diff --git a/staging/src/k8s.io/apimachinery/pkg/api/meta/help.go b/staging/src/k8s.io/apimachinery/pkg/api/meta/help.go index 1fdd32c4ba3e0..468afd0e9ee09 100644 --- a/staging/src/k8s.io/apimachinery/pkg/api/meta/help.go +++ b/staging/src/k8s.io/apimachinery/pkg/api/meta/help.go @@ -221,6 +221,9 @@ func extractList(obj runtime.Object, allocNew bool) ([]runtime.Object, error) { if err != nil { return nil, err } + if items.IsNil() { + return nil, nil + } list := make([]runtime.Object, items.Len()) if len(list) == 0 { return list, nil diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go index 77bb30745253e..81286fccb49f0 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/codec_factory.go @@ -28,7 +28,7 @@ import ( func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, options CodecFactoryOptions) []runtime.SerializerInfo { jsonSerializer := json.NewSerializerWithOptions( mf, scheme, scheme, - json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict}, + json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON}, ) jsonSerializerType := runtime.SerializerInfo{ MediaType: runtime.ContentTypeJSON, @@ -38,7 +38,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option Serializer: jsonSerializer, StrictSerializer: json.NewSerializerWithOptions( mf, scheme, scheme, - json.SerializerOptions{Yaml: false, Pretty: false, Strict: true}, + json.SerializerOptions{Yaml: false, Pretty: false, Strict: true, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON}, ), StreamSerializer: &runtime.StreamSerializerInfo{ EncodesAsText: true, @@ -61,7 +61,9 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option mf, scheme, scheme, json.SerializerOptions{Yaml: true, Pretty: false, Strict: true}, ) - protoSerializer := protobuf.NewSerializer(scheme, scheme) + protoSerializer := protobuf.NewSerializerWithOptions(scheme, scheme, protobuf.SerializerOptions{ + StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToProtobuf, + }) protoRawSerializer := protobuf.NewRawSerializer(scheme, scheme) serializers := []runtime.SerializerInfo{ @@ -113,6 +115,9 @@ type CodecFactoryOptions struct { // Pretty includes a pretty serializer along with the non-pretty one Pretty bool + StreamingCollectionsEncodingToJSON bool + StreamingCollectionsEncodingToProtobuf bool + serializers []func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.SerializerInfo } @@ -147,6 +152,18 @@ func WithSerializer(f func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.S } } +func WithStreamingCollectionEncodingToJSON() CodecFactoryOptionsMutator { + return func(options *CodecFactoryOptions) { + options.StreamingCollectionsEncodingToJSON = true + } +} + +func WithStreamingCollectionEncodingToProtobuf() CodecFactoryOptionsMutator { + return func(options *CodecFactoryOptions) { + options.StreamingCollectionsEncodingToProtobuf = true + } +} + // NewCodecFactory provides methods for retrieving serializers for the supported wire formats // and conversion wrappers to define preferred internal and external versions. In the future, // as the internal version is used less, callers may instead use a defaulting serializer and diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections.go new file mode 100644 index 0000000000000..075163ddd81e3 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections.go @@ -0,0 +1,230 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package json + +import ( + "encoding/json" + "fmt" + "io" + "maps" + "slices" + "sort" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/conversion" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" +) + +func streamEncodeCollections(obj runtime.Object, w io.Writer) (bool, error) { + list, ok := obj.(*unstructured.UnstructuredList) + if ok { + return true, streamingEncodeUnstructuredList(w, list) + } + if _, ok := obj.(json.Marshaler); ok { + return false, nil + } + typeMeta, listMeta, items, err := getListMeta(obj) + if err == nil { + return true, streamingEncodeList(w, typeMeta, listMeta, items) + } + return false, nil +} + +// getListMeta implements list extraction logic for json stream serialization. +// +// Reason for a custom logic instead of reusing accessors from meta package: +// * Validate json tags to prevent incompatibility with json standard package. +// * ListMetaAccessor doesn't distinguish empty from nil value. +// * TypeAccessort reparsing "apiVersion" and serializing it with "{group}/{version}" +func getListMeta(list runtime.Object) (metav1.TypeMeta, metav1.ListMeta, []runtime.Object, error) { + listValue, err := conversion.EnforcePtr(list) + if err != nil { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err + } + listType := listValue.Type() + if listType.NumField() != 3 { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListType to have 3 fields") + } + // TypeMeta + typeMeta, ok := listValue.Field(0).Interface().(metav1.TypeMeta) + if !ok { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected TypeMeta field to have TypeMeta type") + } + if listType.Field(0).Tag.Get("json") != ",inline" { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag to be ",inline"`) + } + // ListMeta + listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta) + if !ok { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListMeta field to have ListMeta type") + } + if listType.Field(1).Tag.Get("json") != "metadata,omitempty" { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected ListMeta json field tag to be "metadata,omitempty"`) + } + // Items + items, err := meta.ExtractList(list) + if err != nil { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err + } + if listType.Field(2).Tag.Get("json") != "items" { + return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected Items json field tag to be "items"`) + } + return typeMeta, listMeta, items, nil +} + +func streamingEncodeList(w io.Writer, typeMeta metav1.TypeMeta, listMeta metav1.ListMeta, items []runtime.Object) error { + // Start + if _, err := w.Write([]byte(`{`)); err != nil { + return err + } + + // TypeMeta + if typeMeta.Kind != "" { + if err := encodeKeyValuePair(w, "kind", typeMeta.Kind, []byte(",")); err != nil { + return err + } + } + if typeMeta.APIVersion != "" { + if err := encodeKeyValuePair(w, "apiVersion", typeMeta.APIVersion, []byte(",")); err != nil { + return err + } + } + + // ListMeta + if err := encodeKeyValuePair(w, "metadata", listMeta, []byte(",")); err != nil { + return err + } + + // Items + if err := encodeItemsObjectSlice(w, items); err != nil { + return err + } + + // End + _, err := w.Write([]byte("}\n")) + return err +} + +func encodeItemsObjectSlice(w io.Writer, items []runtime.Object) (err error) { + if items == nil { + err := encodeKeyValuePair(w, "items", nil, nil) + return err + } + _, err = w.Write([]byte(`"items":[`)) + if err != nil { + return err + } + suffix := []byte(",") + for i, item := range items { + if i == len(items)-1 { + suffix = nil + } + err := encodeValue(w, item, suffix) + if err != nil { + return err + } + } + _, err = w.Write([]byte("]")) + if err != nil { + return err + } + return err +} + +func streamingEncodeUnstructuredList(w io.Writer, list *unstructured.UnstructuredList) error { + _, err := w.Write([]byte(`{`)) + if err != nil { + return err + } + keys := slices.Collect(maps.Keys(list.Object)) + if _, exists := list.Object["items"]; !exists { + keys = append(keys, "items") + } + sort.Strings(keys) + + suffix := []byte(",") + for i, key := range keys { + if i == len(keys)-1 { + suffix = nil + } + if key == "items" { + err = encodeItemsUnstructuredSlice(w, list.Items, suffix) + } else { + err = encodeKeyValuePair(w, key, list.Object[key], suffix) + } + if err != nil { + return err + } + } + _, err = w.Write([]byte("}\n")) + return err +} + +func encodeItemsUnstructuredSlice(w io.Writer, items []unstructured.Unstructured, suffix []byte) (err error) { + _, err = w.Write([]byte(`"items":[`)) + if err != nil { + return err + } + comma := []byte(",") + for i, item := range items { + if i == len(items)-1 { + comma = nil + } + err := encodeValue(w, item.Object, comma) + if err != nil { + return err + } + } + _, err = w.Write([]byte("]")) + if err != nil { + return err + } + if len(suffix) > 0 { + _, err = w.Write(suffix) + } + return err +} + +func encodeKeyValuePair(w io.Writer, key string, value any, suffix []byte) (err error) { + err = encodeValue(w, key, []byte(":")) + if err != nil { + return err + } + err = encodeValue(w, value, suffix) + if err != nil { + return err + } + return err +} + +func encodeValue(w io.Writer, value any, suffix []byte) error { + data, err := json.Marshal(value) + if err != nil { + return err + } + _, err = w.Write(data) + if err != nil { + return err + } + if len(suffix) > 0 { + _, err = w.Write(suffix) + } + return err +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections_test.go new file mode 100644 index 0000000000000..03ec4d88f2236 --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/collections_test.go @@ -0,0 +1,794 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package json + +import ( + "bytes" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + fuzz "github.com/google/gofuzz" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +func TestCollectionsEncoding(t *testing.T) { + t.Run("Normal", func(t *testing.T) { + testCollectionsEncoding(t, NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{}), false) + }) + t.Run("Streaming", func(t *testing.T) { + testCollectionsEncoding(t, NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{StreamingCollectionsEncoding: true}), true) + }) +} + +// testCollectionsEncoding should provide comprehensive tests to validate streaming implementation of encoder. +func testCollectionsEncoding(t *testing.T, s *Serializer, streamingEnabled bool) { + var buf writeCountingBuffer + var remainingItems int64 = 1 + // As defined in KEP-5116 we it should include the following scenarios: + // Context: https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/5116-streaming-response-encoding#unit-tests + for _, tc := range []struct { + name string + in runtime.Object + cannotStream bool + expect string + }{ + // Preserving the distinction between integers and floating-point numbers + { + name: "Struct with floats", + in: &StructWithFloatsList{ + Items: []StructWithFloats{ + { + Int: 1, + Float32: float32(1), + Float64: 1.1, + }, + }, + }, + expect: "{\"metadata\":{},\"items\":[{\"metadata\":{\"creationTimestamp\":null},\"Int\":1,\"Float32\":1,\"Float64\":1.1}]}\n", + }, + { + name: "Unstructured object float", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "int": 1, + "float32": float32(1), + "float64": 1.1, + }, + }, + expect: "{\"float32\":1,\"float64\":1.1,\"int\":1,\"items\":[]}\n", + }, + { + name: "Unstructured items float", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "int": 1, + "float32": float32(1), + "float64": 1.1, + }, + }, + }, + }, + expect: "{\"items\":[{\"float32\":1,\"float64\":1.1,\"int\":1}]}\n", + }, + // Handling structs with duplicate field names (JSON tag names) without producing duplicate keys in the encoded output + { + name: "StructWithDuplicatedTags", + in: &StructWithDuplicatedTagsList{ + Items: []StructWithDuplicatedTags{ + { + Key1: "key1", + Key2: "key2", + }, + }, + }, + expect: "{\"metadata\":{},\"items\":[{\"metadata\":{\"creationTimestamp\":null}}]}\n", + }, + // Encoding Go strings containing invalid UTF-8 sequences without error + { + name: "UnstructuredList object invalid UTF-8 ", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "key": "\x80", // first byte is a continuation byte + }, + }, + expect: "{\"items\":[],\"key\":\"\\ufffd\"}\n", + }, + { + name: "UnstructuredList items invalid UTF-8 ", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "key": "\x80", // first byte is a continuation byte + }, + }, + }, + }, + expect: "{\"items\":[{\"key\":\"\\ufffd\"}]}\n", + }, + // Preserving the distinction between absent, present-but-null, and present-and-empty states for slices and maps + { + name: "CarpList items nil", + in: &testapigroupv1.CarpList{ + Items: nil, + }, + expect: "{\"metadata\":{},\"items\":null}\n", + }, + { + name: "CarpList slice nil", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Status: testapigroupv1.CarpStatus{ + Conditions: nil, + }, + }, + }, + }, + expect: "{\"metadata\":{},\"items\":[{\"metadata\":{\"creationTimestamp\":null},\"spec\":{},\"status\":{}}]}\n", + }, + { + name: "CarpList map nil", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Spec: testapigroupv1.CarpSpec{ + NodeSelector: nil, + }, + }, + }, + }, + expect: "{\"metadata\":{},\"items\":[{\"metadata\":{\"creationTimestamp\":null},\"spec\":{},\"status\":{}}]}\n", + }, + { + name: "UnstructuredList items nil", + in: &unstructured.UnstructuredList{ + Items: nil, + }, + expect: "{\"items\":[]}\n", + }, + { + name: "UnstructuredList items slice nil", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "slice": ([]string)(nil), + }, + }, + }, + }, + expect: "{\"items\":[{\"slice\":null}]}\n", + }, + { + name: "UnstructuredList items map nil", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "map": (map[string]string)(nil), + }, + }, + }, + }, + expect: "{\"items\":[{\"map\":null}]}\n", + }, + { + name: "UnstructuredList object nil", + in: &unstructured.UnstructuredList{ + Object: nil, + }, + expect: "{\"items\":[]}\n", + }, + { + name: "UnstructuredList object slice nil", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "slice": ([]string)(nil), + }, + }, + expect: "{\"items\":[],\"slice\":null}\n", + }, + { + name: "UnstructuredList object map nil", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "map": (map[string]string)(nil), + }, + }, + expect: "{\"items\":[],\"map\":null}\n", + }, + { + name: "CarpList items empty", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{}, + }, + expect: "{\"metadata\":{},\"items\":[]}\n", + }, + { + name: "CarpList slice empty", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Status: testapigroupv1.CarpStatus{ + Conditions: []testapigroupv1.CarpCondition{}, + }, + }, + }, + }, + expect: "{\"metadata\":{},\"items\":[{\"metadata\":{\"creationTimestamp\":null},\"spec\":{},\"status\":{}}]}\n", + }, + { + name: "CarpList map empty", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Spec: testapigroupv1.CarpSpec{ + NodeSelector: map[string]string{}, + }, + }, + }, + }, + expect: "{\"metadata\":{},\"items\":[{\"metadata\":{\"creationTimestamp\":null},\"spec\":{},\"status\":{}}]}\n", + }, + { + name: "UnstructuredList items empty", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{}, + }, + expect: "{\"items\":[]}\n", + }, + { + name: "UnstructuredList items slice empty", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "slice": []string{}, + }, + }, + }, + }, + expect: "{\"items\":[{\"slice\":[]}]}\n", + }, + { + name: "UnstructuredList items map empty", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "map": map[string]string{}, + }, + }, + }, + }, + expect: "{\"items\":[{\"map\":{}}]}\n", + }, + { + name: "UnstructuredList object empty", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{}, + }, + expect: "{\"items\":[]}\n", + }, + { + name: "UnstructuredList object slice empty", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "slice": []string{}, + }, + }, + expect: "{\"items\":[],\"slice\":[]}\n", + }, + { + name: "UnstructuredList object map empty", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "map": map[string]string{}, + }, + }, + expect: "{\"items\":[],\"map\":{}}\n", + }, + // Handling structs implementing MarshallJSON method, especially built-in collection types. + { + name: "List with MarshallJSON cannot be streamed", + in: &ListWithMarshalJSONList{}, + expect: "\"marshallJSON\"\n", + cannotStream: true, + }, + { + name: "Struct with MarshallJSON", + in: &StructWithMarshalJSONList{ + Items: []StructWithMarshalJSON{ + {}, + }, + }, + expect: "{\"metadata\":{},\"items\":[\"marshallJSON\"]}\n", + }, + // Handling raw bytes. + { + name: "Struct with raw bytes", + in: &StructWithRawBytesList{ + Items: []StructWithRawBytes{ + { + Slice: []byte{0x01, 0x02, 0x03}, + Array: [3]byte{0x01, 0x02, 0x03}, + }, + }, + }, + expect: "{\"metadata\":{},\"items\":[{\"metadata\":{\"creationTimestamp\":null},\"Slice\":\"AQID\",\"Array\":[1,2,3]}]}\n", + }, + { + name: "UnstructuredList object raw bytes", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{ + "slice": []byte{0x01, 0x02, 0x03}, + "array": [3]byte{0x01, 0x02, 0x03}, + }, + }, + expect: "{\"array\":[1,2,3],\"items\":[],\"slice\":\"AQID\"}\n", + }, + { + name: "UnstructuredList items raw bytes", + in: &unstructured.UnstructuredList{ + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "slice": []byte{0x01, 0x02, 0x03}, + "array": [3]byte{0x01, 0x02, 0x03}, + }, + }, + }, + }, + expect: "{\"items\":[{\"array\":[1,2,3],\"slice\":\"AQID\"}]}\n", + }, + // Other scenarios: + { + name: "List just kind", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + }, + }, + expect: "{\"kind\":\"List\",\"metadata\":{},\"items\":null}\n", + }, + { + name: "List just apiVersion", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + }, + }, + expect: "{\"apiVersion\":\"v1\",\"metadata\":{},\"items\":null}\n", + }, + { + name: "List no elements", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + }, + Items: []testapigroupv1.Carp{}, + }, + expect: "{\"kind\":\"List\",\"apiVersion\":\"v1\",\"metadata\":{\"resourceVersion\":\"2345\"},\"items\":[]}\n", + }, + { + name: "List one element with continue", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + Continue: "abc", + RemainingItemCount: &remainingItems, + }, + Items: []testapigroupv1.Carp{ + {TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "default", + }}, + }, + }, + expect: "{\"kind\":\"List\",\"apiVersion\":\"v1\",\"metadata\":{\"resourceVersion\":\"2345\",\"continue\":\"abc\",\"remainingItemCount\":1},\"items\":[{\"kind\":\"Carp\",\"apiVersion\":\"v1\",\"metadata\":{\"name\":\"pod\",\"namespace\":\"default\",\"creationTimestamp\":null},\"spec\":{},\"status\":{}}]}\n", + }, + { + name: "List two elements", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + }, + Items: []testapigroupv1.Carp{ + {TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "default", + }}, + {TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default2", + }}, + }, + }, + expect: `{"kind":"List","apiVersion":"v1","metadata":{"resourceVersion":"2345"},"items":[{"kind":"Carp","apiVersion":"v1","metadata":{"name":"pod","namespace":"default","creationTimestamp":null},"spec":{},"status":{}},{"kind":"Carp","apiVersion":"v1","metadata":{"name":"pod2","namespace":"default2","creationTimestamp":null},"spec":{},"status":{}}]} +`, + }, + { + name: "List with extra field cannot be streamed", + in: &ListWithAdditionalFields{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + }, + Items: []testapigroupv1.Carp{}, + }, + cannotStream: true, + expect: "{\"kind\":\"List\",\"apiVersion\":\"v1\",\"metadata\":{\"resourceVersion\":\"2345\"},\"items\":[],\"AdditionalField\":0}\n", + }, + { + name: "Not a collection cannot be streamed", + in: &testapigroupv1.Carp{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + }, + cannotStream: true, + expect: "{\"kind\":\"List\",\"apiVersion\":\"v1\",\"metadata\":{\"creationTimestamp\":null},\"spec\":{},\"status\":{}}\n", + }, + { + name: "UnstructuredList empty", + in: &unstructured.UnstructuredList{}, + expect: "{\"items\":[]}\n", + }, + { + name: "UnstructuredList just kind", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{"kind": "List"}, + }, + expect: "{\"items\":[],\"kind\":\"List\"}\n", + }, + { + name: "UnstructuredList just apiVersion", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{"apiVersion": "v1"}, + }, + expect: "{\"apiVersion\":\"v1\",\"items\":[]}\n", + }, + { + name: "UnstructuredList no elements", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{"kind": "List", "apiVersion": "v1", "metadata": map[string]interface{}{"resourceVersion": "2345"}}, + Items: []unstructured.Unstructured{}, + }, + expect: "{\"apiVersion\":\"v1\",\"items\":[],\"kind\":\"List\",\"metadata\":{\"resourceVersion\":\"2345\"}}\n", + }, + { + name: "UnstructuredList one element with continue", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{"kind": "List", "apiVersion": "v1", "metadata": map[string]interface{}{ + "resourceVersion": "2345", + "continue": "abc", + "remainingItemCount": "1", + }}, + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Carp", + "metadata": map[string]interface{}{ + "name": "pod", + "namespace": "default", + }, + }, + }, + }, + }, + expect: "{\"apiVersion\":\"v1\",\"items\":[{\"apiVersion\":\"v1\",\"kind\":\"Carp\",\"metadata\":{\"name\":\"pod\",\"namespace\":\"default\"}}],\"kind\":\"List\",\"metadata\":{\"continue\":\"abc\",\"remainingItemCount\":\"1\",\"resourceVersion\":\"2345\"}}\n", + }, + { + name: "UnstructuredList two elements", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{"kind": "List", "apiVersion": "v1", "metadata": map[string]interface{}{ + "resourceVersion": "2345", + }}, + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Carp", + "metadata": map[string]interface{}{ + "name": "pod", + "namespace": "default", + }, + }, + }, + { + Object: map[string]interface{}{ + "apiVersion": "v1", + "kind": "Carp", + "metadata": map[string]interface{}{ + "name": "pod2", + "namespace": "default", + }, + }, + }, + }, + }, + expect: "{\"apiVersion\":\"v1\",\"items\":[{\"apiVersion\":\"v1\",\"kind\":\"Carp\",\"metadata\":{\"name\":\"pod\",\"namespace\":\"default\"}},{\"apiVersion\":\"v1\",\"kind\":\"Carp\",\"metadata\":{\"name\":\"pod2\",\"namespace\":\"default\"}}],\"kind\":\"List\",\"metadata\":{\"resourceVersion\":\"2345\"}}\n", + }, + { + name: "UnstructuredList conflict on items", + in: &unstructured.UnstructuredList{ + Object: map[string]interface{}{"items": []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "name": "pod", + }, + }, + }, + }, + Items: []unstructured.Unstructured{ + { + Object: map[string]interface{}{ + "name": "pod2", + }, + }, + }, + }, + expect: "{\"items\":[{\"name\":\"pod2\"}]}\n", + }, + } { + t.Run(tc.name, func(t *testing.T) { + buf.Reset() + if err := s.Encode(tc.in, &buf); err != nil { + t.Fatalf("unexpected error: %v", err) + } + t.Logf("encoded: %s", buf.String()) + if diff := cmp.Diff(buf.String(), tc.expect); diff != "" { + t.Errorf("not matching:\n%s", diff) + } + expectStreaming := !tc.cannotStream && streamingEnabled + if expectStreaming && buf.writeCount <= 1 { + t.Errorf("expected streaming but Write was called only: %d", buf.writeCount) + } + if !expectStreaming && buf.writeCount > 1 { + t.Errorf("expected non-streaming but Write was called more than once: %d", buf.writeCount) + } + }) + } +} + +type StructWithFloatsList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []StructWithFloats `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (l *StructWithFloatsList) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithFloats struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + + Int int + Float32 float32 + Float64 float64 +} + +func (s *StructWithFloats) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithDuplicatedTagsList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []StructWithDuplicatedTags `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (l *StructWithDuplicatedTagsList) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithDuplicatedTags struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + + Key1 string `json:"key"` + Key2 string `json:"key"` //nolint:govet +} + +func (s *StructWithDuplicatedTags) DeepCopyObject() runtime.Object { + return nil +} + +type ListWithMarshalJSONList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []string `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (l *ListWithMarshalJSONList) DeepCopyObject() runtime.Object { + return nil +} + +func (l *ListWithMarshalJSONList) MarshalJSON() ([]byte, error) { + return []byte(`"marshallJSON"`), nil +} + +type StructWithMarshalJSONList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []StructWithMarshalJSON `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (s *StructWithMarshalJSONList) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithMarshalJSON struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` +} + +func (l *StructWithMarshalJSON) DeepCopyObject() runtime.Object { + return nil +} + +func (l *StructWithMarshalJSON) MarshalJSON() ([]byte, error) { + return []byte(`"marshallJSON"`), nil +} + +type StructWithRawBytesList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []StructWithRawBytes `json:"items" protobuf:"bytes,2,rep,name=items"` +} + +func (s *StructWithRawBytesList) DeepCopyObject() runtime.Object { + return nil +} + +type StructWithRawBytes struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Slice []byte + Array [3]byte +} + +func (s *StructWithRawBytes) DeepCopyObject() runtime.Object { + return nil +} + +type ListWithAdditionalFields struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"` + Items []testapigroupv1.Carp `json:"items" protobuf:"bytes,2,rep,name=items"` + AdditionalField int +} + +func (s *ListWithAdditionalFields) DeepCopyObject() runtime.Object { + return nil +} + +type writeCountingBuffer struct { + writeCount int + bytes.Buffer +} + +func (b *writeCountingBuffer) Write(data []byte) (int, error) { + b.writeCount++ + return b.Buffer.Write(data) +} + +func (b *writeCountingBuffer) Reset() { + b.writeCount = 0 + b.Buffer.Reset() +} + +func TestFuzzCollectionsEncoding(t *testing.T) { + disableFuzzFieldsV1 := func(field *metav1.FieldsV1, c fuzz.Continue) {} + fuzzUnstructuredList := func(list *unstructured.UnstructuredList, c fuzz.Continue) { + list.Object = map[string]interface{}{ + "kind": "List", + "apiVersion": "v1", + c.RandString(): c.RandString(), + c.RandString(): c.RandUint64(), + c.RandString(): c.RandBool(), + "metadata": map[string]interface{}{ + "resourceVersion": fmt.Sprintf("%d", c.RandUint64()), + "continue": c.RandString(), + "remainingItemCount": fmt.Sprintf("%d", c.RandUint64()), + c.RandString(): c.RandString(), + }} + c.Fuzz(&list.Items) + } + fuzzMap := func(kvs map[string]interface{}, c fuzz.Continue) { + kvs[c.RandString()] = c.RandBool() + kvs[c.RandString()] = c.RandUint64() + kvs[c.RandString()] = c.RandString() + } + f := fuzz.New().Funcs(disableFuzzFieldsV1, fuzzUnstructuredList, fuzzMap) + streamingBuffer := &bytes.Buffer{} + normalSerializer := NewSerializerWithOptions(DefaultMetaFactory, nil, nil, SerializerOptions{StreamingCollectionsEncoding: false}) + normalBuffer := &bytes.Buffer{} + t.Run("CarpList", func(t *testing.T) { + for i := 0; i < 1000; i++ { + list := &testapigroupv1.CarpList{} + f.Fuzz(list) + streamingBuffer.Reset() + normalBuffer.Reset() + ok, err := streamEncodeCollections(list, streamingBuffer) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ok { + t.Fatalf("expected streaming encoder to encode %T", list) + } + if err := normalSerializer.Encode(list, normalBuffer); err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(normalBuffer.String(), streamingBuffer.String()); diff != "" { + t.Logf("normal: %s", normalBuffer.String()) + t.Logf("streaming: %s", streamingBuffer.String()) + t.Errorf("not matching:\n%s", diff) + } + } + }) + t.Run("UnstructuredList", func(t *testing.T) { + for i := 0; i < 1000; i++ { + list := &unstructured.UnstructuredList{} + f.Fuzz(list) + streamingBuffer.Reset() + normalBuffer.Reset() + ok, err := streamEncodeCollections(list, streamingBuffer) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if !ok { + t.Fatalf("expected streaming encoder to encode %T", list) + } + if err := normalSerializer.Encode(list, normalBuffer); err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(normalBuffer.String(), streamingBuffer.String()); diff != "" { + t.Logf("normal: %s", normalBuffer.String()) + t.Logf("streaming: %s", streamingBuffer.String()) + t.Errorf("not matching:\n%s", diff) + } + } + }) +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go index 1ae4a32eb720c..24f66a10174b0 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json/json.go @@ -36,7 +36,7 @@ import ( // is not nil, the object has the group, version, and kind fields set. // Deprecated: use NewSerializerWithOptions instead. func NewSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.ObjectTyper, pretty bool) *Serializer { - return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{false, pretty, false}) + return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{false, pretty, false, false}) } // NewYAMLSerializer creates a YAML serializer that handles encoding versioned objects into the proper YAML form. If typer @@ -44,7 +44,7 @@ func NewSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtim // matches JSON, and will error if constructs are used that do not serialize to JSON. // Deprecated: use NewSerializerWithOptions instead. func NewYAMLSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.ObjectTyper) *Serializer { - return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{true, false, false}) + return NewSerializerWithOptions(meta, creater, typer, SerializerOptions{true, false, false, false}) } // NewSerializerWithOptions creates a JSON/YAML serializer that handles encoding versioned objects into the proper JSON/YAML @@ -93,6 +93,9 @@ type SerializerOptions struct { // Strict: configures the Serializer to return strictDecodingError's when duplicate fields are present decoding JSON or YAML. // Note that enabling this option is not as performant as the non-strict variant, and should not be used in fast paths. Strict bool + + // StreamingCollectionsEncoding enables encoding collection, one item at the time, drastically reducing memory needed. + StreamingCollectionsEncoding bool } // Serializer handles encoding versioned objects into the proper JSON form @@ -242,6 +245,15 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer) error { _, err = w.Write(data) return err } + if s.options.StreamingCollectionsEncoding { + ok, err := streamEncodeCollections(obj, w) + if err != nil { + return err + } + if ok { + return nil + } + } encoder := json.NewEncoder(w) return encoder.Encode(obj) } diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections.go new file mode 100644 index 0000000000000..754a80820b06c --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections.go @@ -0,0 +1,174 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protobuf + +import ( + "errors" + "io" + "math/bits" + + "github.com/gogo/protobuf/proto" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/runtime" +) + +var ( + errFieldCount = errors.New("expected ListType to have 3 fields") + errTypeMetaField = errors.New("expected TypeMeta field to have TypeMeta type") + errTypeMetaProtobufTag = errors.New(`expected TypeMeta protobuf field tag to be ""`) + errListMetaField = errors.New("expected ListMeta field to have ListMeta type") + errListMetaProtobufTag = errors.New(`expected ListMeta protobuf field tag to be "bytes,1,opt,name=metadata"`) + errItemsProtobufTag = errors.New(`expected Items protobuf field tag to be "bytes,2,rep,name=items"`) + errItemsSizer = errors.New(`expected Items elements to implement proto.Sizer`) +) + +// getStreamingListData implements list extraction logic for protobuf stream serialization. +// +// Reason for a custom logic instead of reusing accessors from meta package: +// * Validate proto tags to prevent incompatibility with proto standard package. +// * ListMetaAccessor doesn't distinguish empty from nil value. +// * TypeAccessor reparsing "apiVersion" and serializing it with "{group}/{version}" +func getStreamingListData(list runtime.Object) (data streamingListData, err error) { + listValue, err := conversion.EnforcePtr(list) + if err != nil { + return data, err + } + listType := listValue.Type() + if listType.NumField() != 3 { + return data, errFieldCount + } + // TypeMeta: validated, but not returned as is not serialized. + _, ok := listValue.Field(0).Interface().(metav1.TypeMeta) + if !ok { + return data, errTypeMetaField + } + if listType.Field(0).Tag.Get("protobuf") != "" { + return data, errTypeMetaProtobufTag + } + // ListMeta + listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta) + if !ok { + return data, errListMetaField + } + // if we were ever to relax the protobuf tag check we should update the hardcoded `0xa` below when writing ListMeta. + if listType.Field(1).Tag.Get("protobuf") != "bytes,1,opt,name=metadata" { + return data, errListMetaProtobufTag + } + data.listMeta = listMeta + // Items; if we were ever to relax the protobuf tag check we should update the hardcoded `0x12` below when writing Items. + if listType.Field(2).Tag.Get("protobuf") != "bytes,2,rep,name=items" { + return data, errItemsProtobufTag + } + items, err := meta.ExtractList(list) + if err != nil { + return data, err + } + data.items = items + data.totalSize, data.listMetaSize, data.itemsSizes, err = listSize(listMeta, items) + return data, err +} + +type streamingListData struct { + // totalSize is the total size of the serialized List object, including their proto headers/size bytes + totalSize int + + // listMetaSize caches results from .Size() call to listMeta, doesn't include header bytes (field identifier, size) + listMetaSize int + listMeta metav1.ListMeta + + // itemsSizes caches results from .Size() call to items, doesn't include header bytes (field identifier, size) + itemsSizes []int + items []runtime.Object +} + +// listSize return size of ListMeta and items to be later used for preallocations. +// listMetaSize and itemSizes do not include header bytes (field identifier, size). +func listSize(listMeta metav1.ListMeta, items []runtime.Object) (totalSize, listMetaSize int, itemSizes []int, err error) { + // ListMeta + listMetaSize = listMeta.Size() + totalSize += 1 + sovGenerated(uint64(listMetaSize)) + listMetaSize + // Items + itemSizes = make([]int, len(items)) + for i, item := range items { + sizer, ok := item.(proto.Sizer) + if !ok { + return totalSize, listMetaSize, nil, errItemsSizer + } + n := sizer.Size() + itemSizes[i] = n + totalSize += 1 + sovGenerated(uint64(n)) + n + } + return totalSize, listMetaSize, itemSizes, nil +} + +func streamingEncodeUnknownList(w io.Writer, unk runtime.Unknown, listData streamingListData, memAlloc runtime.MemoryAllocator) error { + _, err := w.Write(protoEncodingPrefix) + if err != nil { + return err + } + // encodeList is responsible for encoding the List into the unknown Raw. + encodeList := func(writer io.Writer) (int, error) { + return streamingEncodeList(writer, listData, memAlloc) + } + _, err = unk.MarshalToWriter(w, listData.totalSize, encodeList) + return err +} + +func streamingEncodeList(w io.Writer, listData streamingListData, memAlloc runtime.MemoryAllocator) (size int, err error) { + // ListMeta; 0xa = (1 << 3) | 2; field number: 1, type: 2 (LEN). https://protobuf.dev/programming-guides/encoding/#structure + n, err := doEncodeWithHeader(&listData.listMeta, w, 0xa, listData.listMetaSize, memAlloc) + size += n + if err != nil { + return size, err + } + // Items; 0x12 = (2 << 3) | 2; field number: 2, type: 2 (LEN). https://protobuf.dev/programming-guides/encoding/#structure + for i, item := range listData.items { + n, err := doEncodeWithHeader(item, w, 0x12, listData.itemsSizes[i], memAlloc) + size += n + if err != nil { + return size, err + } + } + return size, nil +} + +func writeVarintGenerated(w io.Writer, v int) (int, error) { + buf := make([]byte, sovGenerated(uint64(v))) + encodeVarintGenerated(buf, len(buf), uint64(v)) + return w.Write(buf) +} + +// sovGenerated is copied from `generated.pb.go` returns size of varint. +func sovGenerated(v uint64) int { + return (bits.Len64(v|1) + 6) / 7 +} + +// encodeVarintGenerated is copied from `generated.pb.go` encodes varint. +func encodeVarintGenerated(dAtA []byte, offset int, v uint64) int { + offset -= sovGenerated(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go new file mode 100644 index 0000000000000..d6e3a6d444daf --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/collections_test.go @@ -0,0 +1,333 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protobuf + +import ( + "bytes" + "encoding/base64" + "io" + "os/exec" + "testing" + + "github.com/gogo/protobuf/proto" + "github.com/google/go-cmp/cmp" + fuzz "github.com/google/gofuzz" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func TestCollectionsEncoding(t *testing.T) { + t.Run("Normal", func(t *testing.T) { + testCollectionsEncoding(t, NewSerializer(nil, nil), false) + }) + t.Run("Streaming", func(t *testing.T) { + testCollectionsEncoding(t, NewSerializerWithOptions(nil, nil, SerializerOptions{StreamingCollectionsEncoding: true}), true) + }) +} + +func testCollectionsEncoding(t *testing.T, s *Serializer, streamingEnabled bool) { + var remainingItems int64 = 1 + testCases := []struct { + name string + in runtime.Object + // expect is base64 encoded protobuf bytes + expect string + }{ + { + name: "CarpList items nil", + in: &testapigroupv1.CarpList{ + Items: nil, + }, + expect: "azhzAAoECgASABIICgYKABIAGgAaACIA", + }, + { + name: "CarpList slice nil", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Status: testapigroupv1.CarpStatus{ + Conditions: nil, + }, + }, + }, + }, + expect: "azhzAAoECgASABJBCgYKABIAGgASNwoQCgASABoAIgAqADIAOABCABIXGgBCAEoAUgBYAGAAaACCAQCKAQCaAQAaCgoAGgAiACoAMgAaACIA", + }, + { + name: "CarpList map nil", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Spec: testapigroupv1.CarpSpec{ + NodeSelector: nil, + }, + }, + }, + }, + expect: "azhzAAoECgASABJBCgYKABIAGgASNwoQCgASABoAIgAqADIAOABCABIXGgBCAEoAUgBYAGAAaACCAQCKAQCaAQAaCgoAGgAiACoAMgAaACIA", + }, + { + name: "CarpList items empty", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{}, + }, + expect: "azhzAAoECgASABIICgYKABIAGgAaACIA", + }, + { + name: "CarpList slice empty", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Status: testapigroupv1.CarpStatus{ + Conditions: []testapigroupv1.CarpCondition{}, + }, + }, + }, + }, + expect: "azhzAAoECgASABJBCgYKABIAGgASNwoQCgASABoAIgAqADIAOABCABIXGgBCAEoAUgBYAGAAaACCAQCKAQCaAQAaCgoAGgAiACoAMgAaACIA", + }, + { + name: "CarpList map empty", + in: &testapigroupv1.CarpList{ + Items: []testapigroupv1.Carp{ + { + Spec: testapigroupv1.CarpSpec{ + NodeSelector: map[string]string{}, + }, + }, + }, + }, + expect: "azhzAAoECgASABJBCgYKABIAGgASNwoQCgASABoAIgAqADIAOABCABIXGgBCAEoAUgBYAGAAaACCAQCKAQCaAQAaCgoAGgAiACoAMgAaACIA", + }, + { + name: "List just kind", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + }, + }, + expect: "azhzAAoICgASBExpc3QSCAoGCgASABoAGgAiAA==", + }, + { + name: "List just apiVersion", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + }, + }, + expect: "azhzAAoGCgJ2MRIAEggKBgoAEgAaABoAIgA=", + }, + { + name: "List no elements", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + }, + Items: []testapigroupv1.Carp{}, + }, + expect: "azhzAAoKCgJ2MRIETGlzdBIMCgoKABIEMjM0NRoAGgAiAA==", + }, + { + name: "List one element with continue", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + Continue: "abc", + RemainingItemCount: &remainingItems, + }, + Items: []testapigroupv1.Carp{ + {TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "default", + }}, + }, + }, + expect: "azhzAAoKCgJ2MRIETGlzdBJUCg8KABIEMjM0NRoDYWJjIAESQQoaCgNwb2QSABoHZGVmYXVsdCIAKgAyADgAQgASFxoAQgBKAFIAWABgAGgAggEAigEAmgEAGgoKABoAIgAqADIAGgAiAA==", + }, + { + name: "List two elements", + in: &testapigroupv1.CarpList{ + TypeMeta: metav1.TypeMeta{ + Kind: "List", + APIVersion: "v1", + }, + ListMeta: metav1.ListMeta{ + ResourceVersion: "2345", + }, + Items: []testapigroupv1.Carp{ + {TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + Namespace: "default", + }}, + {TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "Carp"}, ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default2", + }}, + }, + }, + expect: "azhzAAoKCgJ2MRIETGlzdBKUAQoKCgASBDIzNDUaABJBChoKA3BvZBIAGgdkZWZhdWx0IgAqADIAOABCABIXGgBCAEoAUgBYAGAAaACCAQCKAQCaAQAaCgoAGgAiACoAMgASQwocCgRwb2QyEgAaCGRlZmF1bHQyIgAqADIAOABCABIXGgBCAEoAUgBYAGAAaACCAQCKAQCaAQAaCgoAGgAiACoAMgAaACIA", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + var buf writeCountingBuffer + if err := s.Encode(tc.in, &buf); err != nil { + t.Fatalf("unexpected error: %v", err) + } + actualBytes := buf.Bytes() + expectBytes, err := io.ReadAll(base64.NewDecoder(base64.StdEncoding, bytes.NewBufferString(tc.expect))) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(expectBytes, actualBytes) { + expectedBytes, err := base64.StdEncoding.DecodeString(tc.expect) + if err == nil { + t.Errorf("expected:\n%v\ngot:\n%v", expectedBytes, actualBytes) + } else { + t.Errorf("expected:\n%v\ngot:\n%v", tc.expect, base64.StdEncoding.EncodeToString(actualBytes)) + } + actualProto := dumpProto(t, actualBytes[4:]) + expectedProto := dumpProto(t, expectBytes[4:]) + if actualProto != "" && expectedProto != "" { + t.Log(cmp.Diff(actualProto, expectedProto)) + } else { + t.Log(cmp.Diff(actualBytes, expectBytes)) + } + } + if streamingEnabled && buf.writeCount <= 1 { + t.Errorf("expected streaming but Write was called only: %d", buf.writeCount) + } + if !streamingEnabled && buf.writeCount > 1 { + t.Errorf("expected non-streaming but Write was called more than once: %d", buf.writeCount) + } + }) + } +} + +// dumpProto does a best-effort dump of the given proto bytes using protoc if it can be found in the path. +// This is only used when the test has already failed, to try to give more visibility into the diff of the failure. +func dumpProto(t *testing.T, data []byte) string { + t.Helper() + protoc, err := exec.LookPath("protoc") + if err != nil { + t.Logf("cannot find protoc in path to dump proto contents: %v", err) + return "" + } + cmd := exec.Command(protoc, "--decode_raw") + cmd.Stdin = bytes.NewBuffer(data) + d, err := cmd.CombinedOutput() + if err != nil { + t.Logf("protoc invocation failed: %v", err) + return "" + } + return string(d) +} + +type writeCountingBuffer struct { + writeCount int + bytes.Buffer +} + +func (b *writeCountingBuffer) Write(data []byte) (int, error) { + b.writeCount++ + return b.Buffer.Write(data) +} + +func (b *writeCountingBuffer) Reset() { + b.writeCount = 0 + b.Buffer.Reset() +} + +func TestFuzzCollection(t *testing.T) { + f := fuzz.New() + streamingEncoder := NewSerializerWithOptions(nil, nil, SerializerOptions{StreamingCollectionsEncoding: true}) + streamingBuffer := &bytes.Buffer{} + normalEncoder := NewSerializerWithOptions(nil, nil, SerializerOptions{StreamingCollectionsEncoding: false}) + normalBuffer := &bytes.Buffer{} + for i := 0; i < 1000; i++ { + list := &testapigroupv1.CarpList{} + f.FuzzNoCustom(list) + streamingBuffer.Reset() + normalBuffer.Reset() + if err := streamingEncoder.Encode(list, streamingBuffer); err != nil { + t.Fatal(err) + } + if err := normalEncoder.Encode(list, normalBuffer); err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(streamingBuffer.String(), normalBuffer.String()); diff != "" { + t.Logf("normal: %s", normalBuffer.String()) + t.Logf("streaming: %s", streamingBuffer.String()) + t.Fatalf("unexpected output:\n%s", diff) + } + } +} + +func TestCallsToSize(t *testing.T) { + counter := &countingSizer{data: []byte("abba")} + listMeta := metav1.ListMeta{} + listData := streamingListData{ + totalSize: 14, + listMeta: listMeta, + listMetaSize: listMeta.Size(), + itemsSizes: []int{counter.Size()}, + items: []runtime.Object{counter}, + } + err := streamingEncodeUnknownList(io.Discard, runtime.Unknown{}, listData, &runtime.Allocator{}) + if err != nil { + t.Fatal(err) + } + if counter.count != 1 { + t.Errorf("Expected only 1 call to sizer, got %d", counter.count) + } +} + +type countingSizer struct { + data []byte + count int +} + +var _ proto.Sizer = (*countingSizer)(nil) +var _ runtime.ProtobufMarshaller = (*countingSizer)(nil) + +func (s *countingSizer) MarshalTo(data []byte) (int, error) { + return copy(data, s.data), nil +} +func (s *countingSizer) Size() int { + s.count++ + return len(s.data) +} + +func (s *countingSizer) DeepCopyObject() runtime.Object { + return nil +} + +func (s *countingSizer) GetObjectKind() schema.ObjectKind { + return nil +} diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go index c63e6dc63f6bb..c66c49ac4c2d2 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf/protobuf.go @@ -72,10 +72,18 @@ func IsNotMarshalable(err error) bool { // is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written // as-is (any type info passed with the object will be used). func NewSerializer(creater runtime.ObjectCreater, typer runtime.ObjectTyper) *Serializer { + return NewSerializerWithOptions(creater, typer, SerializerOptions{}) +} + +// NewSerializerWithOptions creates a Protobuf serializer that handles encoding versioned objects into the proper wire form. If a typer +// is passed, the encoded object will have group, version, and kind fields set. If typer is nil, the objects will be written +// as-is (any type info passed with the object will be used). +func NewSerializerWithOptions(creater runtime.ObjectCreater, typer runtime.ObjectTyper, opts SerializerOptions) *Serializer { return &Serializer{ prefix: protoEncodingPrefix, creater: creater, typer: typer, + options: opts, } } @@ -84,6 +92,14 @@ type Serializer struct { prefix []byte creater runtime.ObjectCreater typer runtime.ObjectTyper + + options SerializerOptions +} + +// SerializerOptions holds the options which are used to configure a Proto serializer. +type SerializerOptions struct { + // StreamingCollectionsEncoding enables encoding collection, one item at the time, drastically reducing memory needed. + StreamingCollectionsEncoding bool } var _ runtime.Serializer = &Serializer{} @@ -209,6 +225,13 @@ func (s *Serializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime. }, } } + if s.options.StreamingCollectionsEncoding { + listData, err := getStreamingListData(obj) + if err == nil { + // Doesn't honor custom proto marshaling methods (like json streaming), because all proto objects implement proto methods. + return streamingEncodeUnknownList(w, unk, listData, memAlloc) + } + } switch t := obj.(type) { case bufferedMarshaller: @@ -428,6 +451,39 @@ func (s *RawSerializer) encode(obj runtime.Object, w io.Writer, memAlloc runtime } func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runtime.MemoryAllocator) error { + _, err := doEncode(obj, w, nil, memAlloc) + return err +} + +func doEncodeWithHeader(obj any, w io.Writer, field byte, precomputedSize int, memAlloc runtime.MemoryAllocator) (size int, err error) { + // Field identifier + n, err := w.Write([]byte{field}) + size += n + if err != nil { + return size, err + } + // Size + n, err = writeVarintGenerated(w, precomputedSize) + size += n + if err != nil { + return size, err + } + // Obj + n, err = doEncode(obj, w, &precomputedSize, memAlloc) + size += n + if err != nil { + return size, err + } + if n != precomputedSize { + return size, fmt.Errorf("the size value was %d, but doEncode wrote %d bytes to data", precomputedSize, n) + } + return size, nil +} + +// doEncode encodes provided object into writer using a allocator if possible. +// Avoids call by object Size if precomputedObjSize is provided. +// precomputedObjSize should not include header bytes (field identifier, size). +func doEncode(obj any, w io.Writer, precomputedObjSize *int, memAlloc runtime.MemoryAllocator) (int, error) { if memAlloc == nil { klog.Error("a mandatory memory allocator wasn't provided, this might have a negative impact on performance, check invocations of EncodeWithAllocator method, falling back on runtime.SimpleAllocator") memAlloc = &runtime.SimpleAllocator{} @@ -436,40 +492,43 @@ func (s *RawSerializer) doEncode(obj runtime.Object, w io.Writer, memAlloc runti case bufferedReverseMarshaller: // this path performs a single allocation during write only when the Allocator wasn't provided // it also requires the caller to implement the more efficient Size and MarshalToSizedBuffer methods - encodedSize := uint64(t.Size()) - data := memAlloc.Allocate(encodedSize) + if precomputedObjSize == nil { + s := t.Size() + precomputedObjSize = &s + } + data := memAlloc.Allocate(uint64(*precomputedObjSize)) n, err := t.MarshalToSizedBuffer(data) if err != nil { - return err + return 0, err } - _, err = w.Write(data[:n]) - return err + return w.Write(data[:n]) case bufferedMarshaller: // this path performs a single allocation during write only when the Allocator wasn't provided // it also requires the caller to implement the more efficient Size and MarshalTo methods - encodedSize := uint64(t.Size()) - data := memAlloc.Allocate(encodedSize) + if precomputedObjSize == nil { + s := t.Size() + precomputedObjSize = &s + } + data := memAlloc.Allocate(uint64(*precomputedObjSize)) n, err := t.MarshalTo(data) if err != nil { - return err + return 0, err } - _, err = w.Write(data[:n]) - return err + return w.Write(data[:n]) case proto.Marshaler: // this path performs extra allocations data, err := t.Marshal() if err != nil { - return err + return 0, err } - _, err = w.Write(data) - return err + return w.Write(data) default: - return errNotMarshalable{reflect.TypeOf(obj)} + return 0, errNotMarshalable{reflect.TypeOf(obj)} } } diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto.go b/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto.go index a82227b239af5..27a2064c4163e 100644 --- a/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto.go +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto.go @@ -18,6 +18,7 @@ package runtime import ( "fmt" + "io" ) type ProtobufMarshaller interface { @@ -28,6 +29,124 @@ type ProtobufReverseMarshaller interface { MarshalToSizedBuffer(data []byte) (int, error) } +const ( + typeMetaTag = 0xa + rawTag = 0x12 + contentEncodingTag = 0x1a + contentTypeTag = 0x22 + + // max length of a varint for a uint64 + maxUint64VarIntLength = 10 +) + +// MarshalToWriter allows a caller to provide a streaming writer for raw bytes, +// instead of populating them inside the Unknown struct. +// rawSize is the number of bytes rawWriter will write in a success case. +// writeRaw is called when it is time to write the raw bytes. It must return `rawSize, nil` or an error. +func (m *Unknown) MarshalToWriter(w io.Writer, rawSize int, writeRaw func(io.Writer) (int, error)) (int, error) { + size := 0 + + // reuse the buffer for varint marshaling + varintBuffer := make([]byte, maxUint64VarIntLength) + writeVarint := func(i int) (int, error) { + offset := encodeVarintGenerated(varintBuffer, len(varintBuffer), uint64(i)) + return w.Write(varintBuffer[offset:]) + } + + // TypeMeta + { + n, err := w.Write([]byte{typeMetaTag}) + size += n + if err != nil { + return size, err + } + + typeMetaBytes, err := m.TypeMeta.Marshal() + if err != nil { + return size, err + } + + n, err = writeVarint(len(typeMetaBytes)) + size += n + if err != nil { + return size, err + } + + n, err = w.Write(typeMetaBytes) + size += n + if err != nil { + return size, err + } + } + + // Raw, delegating write to writeRaw() + { + n, err := w.Write([]byte{rawTag}) + size += n + if err != nil { + return size, err + } + + n, err = writeVarint(rawSize) + size += n + if err != nil { + return size, err + } + + n, err = writeRaw(w) + size += n + if err != nil { + return size, err + } + if n != int(rawSize) { + return size, fmt.Errorf("the size value was %d, but encoding wrote %d bytes to data", rawSize, n) + } + } + + // ContentEncoding + { + n, err := w.Write([]byte{contentEncodingTag}) + size += n + if err != nil { + return size, err + } + + n, err = writeVarint(len(m.ContentEncoding)) + size += n + if err != nil { + return size, err + } + + n, err = w.Write([]byte(m.ContentEncoding)) + size += n + if err != nil { + return size, err + } + } + + // ContentEncoding + { + n, err := w.Write([]byte{contentTypeTag}) + size += n + if err != nil { + return size, err + } + + n, err = writeVarint(len(m.ContentType)) + size += n + if err != nil { + return size, err + } + + n, err = w.Write([]byte(m.ContentType)) + size += n + if err != nil { + return size, err + } + } + return size, nil +} + // NestedMarshalTo allows a caller to avoid extra allocations during serialization of an Unknown // that will contain an object that implements ProtobufMarshaller or ProtobufReverseMarshaller. func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64) (int, error) { @@ -43,12 +162,12 @@ func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64 copy(data[i:], m.ContentType) i = encodeVarintGenerated(data, i, uint64(len(m.ContentType))) i-- - data[i] = 0x22 + data[i] = contentTypeTag i -= len(m.ContentEncoding) copy(data[i:], m.ContentEncoding) i = encodeVarintGenerated(data, i, uint64(len(m.ContentEncoding))) i-- - data[i] = 0x1a + data[i] = contentEncodingTag if b != nil { if r, ok := b.(ProtobufReverseMarshaller); ok { n1, err := r.MarshalToSizedBuffer(data[:i]) @@ -75,7 +194,7 @@ func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64 } i = encodeVarintGenerated(data, i, size) i-- - data[i] = 0x12 + data[i] = rawTag } n2, err := m.TypeMeta.MarshalToSizedBuffer(data[:i]) if err != nil { @@ -84,6 +203,6 @@ func (m *Unknown) NestedMarshalTo(data []byte, b ProtobufMarshaller, size uint64 i -= n2 i = encodeVarintGenerated(data, i, uint64(n2)) i-- - data[i] = 0xa + data[i] = typeMetaTag return msgSize - i, nil } diff --git a/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto_test.go b/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto_test.go new file mode 100644 index 0000000000000..39fb709426dcc --- /dev/null +++ b/staging/src/k8s.io/apimachinery/pkg/runtime/types_proto_test.go @@ -0,0 +1,107 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "bytes" + "io" + "math" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestVarint(t *testing.T) { + varintBuffer := make([]byte, maxUint64VarIntLength) + offset := encodeVarintGenerated(varintBuffer, len(varintBuffer), math.MaxUint64) + used := len(varintBuffer) - offset + if used != maxUint64VarIntLength { + t.Fatalf("expected encodeVarintGenerated to use %d bytes to encode MaxUint64, got %d", maxUint64VarIntLength, used) + } +} + +func TestNestedMarshalToWriter(t *testing.T) { + testcases := []struct { + name string + raw []byte + }{ + { + name: "zero-length", + raw: []byte{}, + }, + { + name: "simple", + raw: []byte{0x00, 0x01, 0x02, 0x03}, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + u := &Unknown{ + ContentType: "ct", + ContentEncoding: "ce", + TypeMeta: TypeMeta{ + APIVersion: "v1", + Kind: "k", + }, + } + + // Marshal normally with Raw inlined + u.Raw = tc.raw + marshalData, err := u.Marshal() + if err != nil { + t.Fatal(err) + } + u.Raw = nil + + // Marshal with NestedMarshalTo + nestedMarshalData := make([]byte, len(marshalData)) + n, err := u.NestedMarshalTo(nestedMarshalData, copyMarshaler(tc.raw), uint64(len(tc.raw))) + if err != nil { + t.Fatal(err) + } + if n != len(marshalData) { + t.Errorf("NestedMarshalTo returned %d, expected %d", n, len(marshalData)) + } + if e, a := marshalData, nestedMarshalData; !bytes.Equal(e, a) { + t.Errorf("NestedMarshalTo and Marshal differ:\n%s", cmp.Diff(e, a)) + } + + // Streaming marshal with MarshalToWriter + buf := bytes.NewBuffer(nil) + n, err = u.MarshalToWriter(buf, len(tc.raw), func(w io.Writer) (int, error) { + return w.Write(tc.raw) + }) + if err != nil { + t.Fatal(err) + } + if n != len(marshalData) { + t.Errorf("MarshalToWriter returned %d, expected %d", n, len(marshalData)) + } + if e, a := marshalData, buf.Bytes(); !bytes.Equal(e, a) { + t.Errorf("MarshalToWriter and Marshal differ:\n%s", cmp.Diff(e, a)) + } + }) + } +} + +type copyMarshaler []byte + +func (c copyMarshaler) MarshalTo(dest []byte) (int, error) { + n := copy(dest, []byte(c)) + return n, nil +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go index 3ca5cba8cb633..e1d440f4a6346 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers.go @@ -157,6 +157,9 @@ const ( // (usually the entire object), and if the size is smaller no gzipping will be performed // if the client requests it. defaultGzipThresholdBytes = 128 * 1024 + // Use the length of the first write to recognize streaming implementations. + // When streaming JSON first write is "{", while Kubernetes protobuf starts unique 4 byte header. + firstWriteStreamingThresholdBytes = 4 ) // negotiateContentEncoding returns a supported client-requested content encoding for the @@ -192,14 +195,53 @@ type deferredResponseWriter struct { statusCode int contentEncoding string - hasWritten bool - hw http.ResponseWriter - w io.Writer + hasBuffered bool + buffer []byte + hasWritten bool + hw http.ResponseWriter + w io.Writer ctx context.Context } func (w *deferredResponseWriter) Write(p []byte) (n int, err error) { + switch { + case w.hasWritten: + // already written, cannot buffer + return w.unbufferedWrite(p) + + case w.contentEncoding != "gzip": + // non-gzip, no need to buffer + return w.unbufferedWrite(p) + + case !w.hasBuffered && len(p) > defaultGzipThresholdBytes: + // not yet buffered, first write is long enough to trigger gzip, no need to buffer + return w.unbufferedWrite(p) + + case !w.hasBuffered && len(p) > firstWriteStreamingThresholdBytes: + // not yet buffered, first write is longer than expected for streaming scenarios that would require buffering, no need to buffer + return w.unbufferedWrite(p) + + default: + if !w.hasBuffered { + w.hasBuffered = true + // Start at 80 bytes to avoid rapid reallocation of the buffer. + // The minimum size of a 0-item serialized list object is 80 bytes: + // {"kind":"List","apiVersion":"v1","metadata":{"resourceVersion":"1"},"items":[]}\n + w.buffer = make([]byte, 0, max(80, len(p))) + } + w.buffer = append(w.buffer, p...) + var err error + if len(w.buffer) > defaultGzipThresholdBytes { + // we've accumulated enough to trigger gzip, write and clear buffer + _, err = w.unbufferedWrite(w.buffer) + w.buffer = nil + } + return len(p), err + } +} + +func (w *deferredResponseWriter) unbufferedWrite(p []byte) (n int, err error) { ctx := w.ctx span := tracing.SpanFromContext(ctx) // This Step usually wraps in-memory object serialization. @@ -245,11 +287,17 @@ func (w *deferredResponseWriter) Write(p []byte) (n int, err error) { return w.w.Write(p) } -func (w *deferredResponseWriter) Close() error { +func (w *deferredResponseWriter) Close() (err error) { if !w.hasWritten { - return nil + if !w.hasBuffered { + return nil + } + // never reached defaultGzipThresholdBytes, no need to do the gzip writer cleanup + _, err := w.unbufferedWrite(w.buffer) + w.buffer = nil + return err } - var err error + switch t := w.w.(type) { case *gzip.Writer: err = t.Close() diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go index 874dc1980ebd6..862d6d9601312 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/writers_test.go @@ -19,6 +19,7 @@ package responsewriters import ( "bytes" "compress/gzip" + "context" "encoding/hex" "encoding/json" "errors" @@ -38,8 +39,13 @@ import ( "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + testapigroupv1 "k8s.io/apimachinery/pkg/apis/testapigroup/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + jsonserializer "k8s.io/apimachinery/pkg/runtime/serializer/json" + "k8s.io/apimachinery/pkg/runtime/serializer/protobuf" + rand2 "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apiserver/pkg/features" utilfeature "k8s.io/apiserver/pkg/util/feature" @@ -371,6 +377,261 @@ func TestSerializeObject(t *testing.T) { } } +func TestDeferredResponseWriter_Write(t *testing.T) { + smallChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes-1) + largeChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes+1) + + tests := []struct { + name string + chunks [][]byte + expectGzip bool + expectHeaders http.Header + }{ + { + name: "no writes", + chunks: nil, + expectGzip: false, + expectHeaders: http.Header{}, + }, + { + name: "one empty write", + chunks: [][]byte{{}}, + expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, + }, + { + name: "one single byte write", + chunks: [][]byte{{'{'}}, + expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, + }, + { + name: "one small chunk write", + chunks: [][]byte{smallChunk}, + expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, + }, + { + name: "two small chunk writes", + chunks: [][]byte{smallChunk, smallChunk}, + expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, + }, + { + name: "one single byte and one small chunk write", + chunks: [][]byte{{'{'}, smallChunk}, + expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, + }, + { + name: "two single bytes and one small chunk write", + chunks: [][]byte{{'{'}, {'{'}, smallChunk}, + expectGzip: true, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + "Content-Encoding": []string{"gzip"}, + "Vary": []string{"Accept-Encoding"}, + }, + }, + { + name: "one large chunk writes", + chunks: [][]byte{largeChunk}, + expectGzip: true, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + "Content-Encoding": []string{"gzip"}, + "Vary": []string{"Accept-Encoding"}, + }, + }, + { + name: "two large chunk writes", + chunks: [][]byte{largeChunk, largeChunk}, + expectGzip: true, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + "Content-Encoding": []string{"gzip"}, + "Vary": []string{"Accept-Encoding"}, + }, + }, + { + name: "one small chunk and one large chunk write", + chunks: [][]byte{smallChunk, largeChunk}, + expectGzip: false, + expectHeaders: http.Header{ + "Content-Type": []string{"text/plain"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockResponseWriter := httptest.NewRecorder() + + drw := &deferredResponseWriter{ + mediaType: "text/plain", + statusCode: 200, + contentEncoding: "gzip", + hw: mockResponseWriter, + ctx: context.Background(), + } + + fullPayload := []byte{} + + for _, chunk := range tt.chunks { + n, err := drw.Write(chunk) + + if err != nil { + t.Fatalf("unexpected error while writing chunk: %v", err) + } + if n != len(chunk) { + t.Errorf("write is not complete, expected: %d bytes, written: %d bytes", len(chunk), n) + } + + fullPayload = append(fullPayload, chunk...) + } + + err := drw.Close() + if err != nil { + t.Fatalf("unexpected error when closing deferredResponseWriter: %v", err) + } + + res := mockResponseWriter.Result() + + if res.StatusCode != http.StatusOK { + t.Fatalf("status code is not writtend properly, expected: 200, got: %d", res.StatusCode) + } + if !reflect.DeepEqual(res.Header, tt.expectHeaders) { + t.Fatal(cmp.Diff(tt.expectHeaders, res.Header)) + } + + resBytes, err := io.ReadAll(res.Body) + if err != nil { + t.Fatalf("unexpected error occurred while reading response body: %v", err) + } + + if tt.expectGzip { + gr, err := gzip.NewReader(bytes.NewReader(resBytes)) + if err != nil { + t.Fatalf("failed to create gzip reader: %v", err) + } + + decompressed, err := io.ReadAll(gr) + if err != nil { + t.Fatalf("failed to decompress: %v", err) + } + + if !bytes.Equal(fullPayload, decompressed) { + t.Errorf("payload mismatch, expected: %s, got: %s", fullPayload, decompressed) + } + } else { + if !bytes.Equal(fullPayload, resBytes) { + t.Errorf("payload mismatch, expected: %s, got: %s", fullPayload, resBytes) + } + } + }) + } +} + +func benchmarkChunkingGzip(b *testing.B, count int, chunk []byte) { + mockResponseWriter := httptest.NewRecorder() + mockResponseWriter.Body = nil + + drw := &deferredResponseWriter{ + mediaType: "text/plain", + statusCode: 200, + contentEncoding: "gzip", + hw: mockResponseWriter, + ctx: context.Background(), + } + b.ResetTimer() + for i := 0; i < count; i++ { + n, err := drw.Write(chunk) + if err != nil { + b.Fatalf("unexpected error while writing chunk: %v", err) + } + if n != len(chunk) { + b.Errorf("write is not complete, expected: %d bytes, written: %d bytes", len(chunk), n) + } + } + err := drw.Close() + if err != nil { + b.Fatalf("unexpected error when closing deferredResponseWriter: %v", err) + } + res := mockResponseWriter.Result() + if res.StatusCode != http.StatusOK { + b.Fatalf("status code is not writtend properly, expected: 200, got: %d", res.StatusCode) + } +} + +func BenchmarkChunkingGzip(b *testing.B) { + tests := []struct { + count int + size int + }{ + { + count: 100, + size: 1_000, + }, + { + count: 100, + size: 100_000, + }, + { + count: 1_000, + size: 100_000, + }, + { + count: 1_000, + size: 1_000_000, + }, + { + count: 10_000, + size: 100_000, + }, + { + count: 100_000, + size: 10_000, + }, + { + count: 1, + size: 100_000, + }, + { + count: 1, + size: 1_000_000, + }, + { + count: 1, + size: 10_000_000, + }, + { + count: 1, + size: 100_000_000, + }, + { + count: 1, + size: 1_000_000_000, + }, + } + + for _, t := range tests { + b.Run(fmt.Sprintf("Count=%d/Size=%d", t.count, t.size), func(b *testing.B) { + chunk := []byte(rand2.String(t.size)) + benchmarkChunkingGzip(b, t.count, chunk) + }) + } +} + func randTime(t *time.Time, r *rand.Rand) { *t = time.Unix(r.Int63n(1000*365*24*60*60), r.Int63()) } @@ -547,3 +808,108 @@ func gzipContent(data []byte, level int) []byte { } return buf.Bytes() } + +func TestStreamingGzipIntegration(t *testing.T) { + largeChunk := bytes.Repeat([]byte("b"), defaultGzipThresholdBytes+1) + tcs := []struct { + name string + serializer runtime.Encoder + object runtime.Object + expectGzip bool + expectStreaming bool + }{ + { + name: "JSON, small object, default -> no gzip", + serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{}), + object: &testapigroupv1.CarpList{}, + expectGzip: false, + expectStreaming: false, + }, + { + name: "JSON, small object, streaming -> no gzip", + serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{StreamingCollectionsEncoding: true}), + object: &testapigroupv1.CarpList{}, + expectGzip: false, + expectStreaming: true, + }, + { + name: "JSON, large object, default -> gzip", + serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{}), + object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}}, + expectGzip: true, + expectStreaming: false, + }, + { + name: "JSON, large object, streaming -> gzip", + serializer: jsonserializer.NewSerializerWithOptions(jsonserializer.DefaultMetaFactory, nil, nil, jsonserializer.SerializerOptions{StreamingCollectionsEncoding: true}), + object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}}, + expectGzip: true, + expectStreaming: true, + }, + { + name: "Protobuf, small object, default -> no gzip", + serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{}), + object: &testapigroupv1.CarpList{}, + expectGzip: false, + expectStreaming: false, + }, + { + name: "Protobuf, small object, streaming -> no gzip", + serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{StreamingCollectionsEncoding: true}), + object: &testapigroupv1.CarpList{}, + expectGzip: false, + expectStreaming: true, + }, + { + name: "Protobuf, large object, default -> gzip", + serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{}), + object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}}, + expectGzip: true, + expectStreaming: false, + }, + { + name: "Protobuf, large object, streaming -> gzip", + serializer: protobuf.NewSerializerWithOptions(nil, nil, protobuf.SerializerOptions{StreamingCollectionsEncoding: true}), + object: &testapigroupv1.CarpList{TypeMeta: metav1.TypeMeta{Kind: string(largeChunk)}}, + expectGzip: true, + expectStreaming: true, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + mockResponseWriter := httptest.NewRecorder() + drw := &deferredResponseWriter{ + mediaType: "text/plain", + statusCode: 200, + contentEncoding: "gzip", + hw: mockResponseWriter, + ctx: context.Background(), + } + counter := &writeCounter{Writer: drw} + err := tc.serializer.Encode(tc.object, counter) + if err != nil { + t.Fatal(err) + } + encoding := mockResponseWriter.Header().Get("Content-Encoding") + if (encoding == "gzip") != tc.expectGzip { + t.Errorf("Expect gzip: %v, got: %q", tc.expectGzip, encoding) + } + if counter.writeCount < 1 { + t.Fatalf("Expect at least 1 write") + } + if (counter.writeCount > 1) != tc.expectStreaming { + t.Errorf("Expect streaming: %v, got write count: %d", tc.expectStreaming, counter.writeCount) + } + }) + } +} + +type writeCounter struct { + writeCount int + io.Writer +} + +func (b *writeCounter) Write(data []byte) (int, error) { + b.writeCount++ + return b.Writer.Write(data) +} diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index c23343346e461..ad11b5b8a15ce 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -217,6 +217,14 @@ const ( // document. StorageVersionHash featuregate.Feature = "StorageVersionHash" + // owner: @serathius + // Allow API server JSON encoder to encode collections item by item, instead of all at once. + StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON" + + // owner: @serathius + // Allow API server Protobuf encoder to encode collections item by item, instead of all at once. + StreamingCollectionEncodingToProtobuf featuregate.Feature = "StreamingCollectionEncodingToProtobuf" + // owner: @aramase, @enj, @nabokihms // kep: https://kep.k8s.io/3331 // @@ -393,6 +401,14 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.15"), Default: true, PreRelease: featuregate.Beta}, }, + StreamingCollectionEncodingToJSON: { + {Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Beta}, + }, + + StreamingCollectionEncodingToProtobuf: { + {Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Beta}, + }, + StrictCostEnforcementForVAP: { {Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Beta}, {Version: version.MustParse("1.32"), Default: true, PreRelease: featuregate.GA, LockToDefault: true}, diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 8a98cbf020182..90d38b428a008 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -1023,8 +1023,18 @@ func (s *GenericAPIServer) newAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV // NewDefaultAPIGroupInfo returns an APIGroupInfo stubbed with "normal" values // exposed for easier composition from other packages func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo { + opts := []serializer.CodecFactoryOptionsMutator{} if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) { - codecs = serializer.NewCodecFactory(scheme, serializer.WithSerializer(cbor.NewSerializerInfo)) + opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo)) + } + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON()) + } + if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) { + opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf()) + } + if len(opts) != 0 { + codecs = serializer.NewCodecFactory(scheme, opts...) } return APIGroupInfo{ PrioritizedVersions: scheme.PrioritizedVersionsForGroup(group), diff --git a/test/featuregates_linter/test_data/versioned_feature_list.yaml b/test/featuregates_linter/test_data/versioned_feature_list.yaml index 64733dbbe7b21..cb282738a3e3b 100644 --- a/test/featuregates_linter/test_data/versioned_feature_list.yaml +++ b/test/featuregates_linter/test_data/versioned_feature_list.yaml @@ -1314,6 +1314,18 @@ lockToDefault: false preRelease: Alpha version: "1.30" +- name: StreamingCollectionEncodingToJSON + versionedSpecs: + - default: false + lockToDefault: false + preRelease: Beta + version: "1.32" +- name: StreamingCollectionEncodingToProtobuf + versionedSpecs: + - default: false + lockToDefault: false + preRelease: Beta + version: "1.32" - name: StrictCostEnforcementForVAP versionedSpecs: - default: false