Skip to content
8 changes: 8 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,14 @@ const (
// Enables support for the StorageVersionMigrator controller.
StorageVersionMigrator featuregate.Feature = "StorageVersionMigrator"

// owner: @serathius
// Allow API server JSON encoder to encode collections item by item, instead of all at once.
StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON"

// owner: serathius
// Allow API server Protobuf encoder to encode collections item by item, instead of all at once.
StreamingCollectionEncodingToProtobuf featuregate.Feature = "StreamingCollectionEncodingToProtobuf"

// owner: @robscott
// kep: https://kep.k8s.io/2433
//
Expand Down
8 changes: 8 additions & 0 deletions pkg/features/versioned_kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,14 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha},
},

StreamingCollectionEncodingToJSON: {
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Beta},
},

StreamingCollectionEncodingToProtobuf: {
{Version: version.MustParse("1.32"), Default: false, PreRelease: featuregate.Beta},
},

SupplementalGroupsPolicy: {
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Alpha},
},
Expand Down
12 changes: 11 additions & 1 deletion pkg/registry/core/rest/storage_core_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,18 @@ func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.API
ParameterCodec: legacyscheme.ParameterCodec,
NegotiatedSerializer: legacyscheme.Codecs,
}
opts := []serializer.CodecFactoryOptionsMutator{}
if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, serializer.WithSerializer(cbor.NewSerializerInfo))
opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo))
}
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
}
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf())
}
if len(opts) != 0 {
apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, opts...)
}

eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
clusterScoped := crd.Spec.Scope == apiextensionsv1.ClusterScoped

// CRDs explicitly do not support protobuf, but some objects returned by the API server do
streamingCollections := utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON)
negotiatedSerializer := unstructuredNegotiatedSerializer{
typer: typer,
creator: creator,
Expand All @@ -867,10 +868,11 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
MediaTypeType: "application",
MediaTypeSubType: "json",
EncodesAsText: true,
Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{}),
Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{StreamingCollectionsEncoding: streamingCollections}),
PrettySerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{Pretty: true}),
StrictSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{
Strict: true,
Strict: true,
StreamingCollectionsEncoding: streamingCollections,
}),
StreamSerializer: &runtime.StreamSerializerInfo{
EncodesAsText: true,
Expand All @@ -893,7 +895,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
MediaType: "application/vnd.kubernetes.protobuf",
MediaTypeType: "application",
MediaTypeSubType: "vnd.kubernetes.protobuf",
Serializer: protobuf.NewSerializer(creator, typer),
Serializer: protobuf.NewSerializerWithOptions(creator, typer, protobuf.SerializerOptions{
StreamingCollectionsEncoding: utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf),
}),
StreamSerializer: &runtime.StreamSerializerInfo{
Serializer: protobuf.NewRawSerializer(creator, typer),
Framer: protobuf.LengthDelimitedFramer,
Expand Down Expand Up @@ -973,6 +977,12 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
if utilfeature.DefaultFeatureGate.Enabled(features.CBORServingAndStorage) {
opts = append(opts, serializer.WithSerializer(cbor.NewSerializerInfo))
}
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
}
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf())
}
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...)
scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
scaleScope.Namer = handlers.ContextBasedNaming{
Expand Down
3 changes: 3 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/api/meta/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ func extractList(obj runtime.Object, allocNew bool) ([]runtime.Object, error) {
if err != nil {
return nil, err
}
if items.IsNil() {
return nil, nil
}
list := make([]runtime.Object, items.Len())
if len(list) == 0 {
return list, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, options CodecFactoryOptions) []runtime.SerializerInfo {
jsonSerializer := json.NewSerializerWithOptions(
mf, scheme, scheme,
json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict},
json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
)
jsonSerializerType := runtime.SerializerInfo{
MediaType: runtime.ContentTypeJSON,
Expand All @@ -38,7 +38,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option
Serializer: jsonSerializer,
StrictSerializer: json.NewSerializerWithOptions(
mf, scheme, scheme,
json.SerializerOptions{Yaml: false, Pretty: false, Strict: true},
json.SerializerOptions{Yaml: false, Pretty: false, Strict: true, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
),
StreamSerializer: &runtime.StreamSerializerInfo{
EncodesAsText: true,
Expand All @@ -61,7 +61,9 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option
mf, scheme, scheme,
json.SerializerOptions{Yaml: true, Pretty: false, Strict: true},
)
protoSerializer := protobuf.NewSerializer(scheme, scheme)
protoSerializer := protobuf.NewSerializerWithOptions(scheme, scheme, protobuf.SerializerOptions{
StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToProtobuf,
})
protoRawSerializer := protobuf.NewRawSerializer(scheme, scheme)

serializers := []runtime.SerializerInfo{
Expand Down Expand Up @@ -113,6 +115,9 @@ type CodecFactoryOptions struct {
// Pretty includes a pretty serializer along with the non-pretty one
Pretty bool

StreamingCollectionsEncodingToJSON bool
StreamingCollectionsEncodingToProtobuf bool

serializers []func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.SerializerInfo
}

Expand Down Expand Up @@ -147,6 +152,18 @@ func WithSerializer(f func(runtime.ObjectCreater, runtime.ObjectTyper) runtime.S
}
}

func WithStreamingCollectionEncodingToJSON() CodecFactoryOptionsMutator {
return func(options *CodecFactoryOptions) {
options.StreamingCollectionsEncodingToJSON = true
}
}

func WithStreamingCollectionEncodingToProtobuf() CodecFactoryOptionsMutator {
return func(options *CodecFactoryOptions) {
options.StreamingCollectionsEncodingToProtobuf = true
}
}

// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
// and conversion wrappers to define preferred internal and external versions. In the future,
// as the internal version is used less, callers may instead use a defaulting serializer and
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package json

import (
"encoding/json"
"fmt"
"io"
"maps"
"slices"
"sort"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/conversion"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
)

func streamEncodeCollections(obj runtime.Object, w io.Writer) (bool, error) {
list, ok := obj.(*unstructured.UnstructuredList)
if ok {
return true, streamingEncodeUnstructuredList(w, list)
}
if _, ok := obj.(json.Marshaler); ok {
return false, nil
}
typeMeta, listMeta, items, err := getListMeta(obj)
if err == nil {
return true, streamingEncodeList(w, typeMeta, listMeta, items)
}
return false, nil
}

// getListMeta implements list extraction logic for json stream serialization.
//
// Reason for a custom logic instead of reusing accessors from meta package:
// * Validate json tags to prevent incompatibility with json standard package.
// * ListMetaAccessor doesn't distinguish empty from nil value.
// * TypeAccessort reparsing "apiVersion" and serializing it with "{group}/{version}"
func getListMeta(list runtime.Object) (metav1.TypeMeta, metav1.ListMeta, []runtime.Object, error) {
listValue, err := conversion.EnforcePtr(list)
if err != nil {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err
}
listType := listValue.Type()
if listType.NumField() != 3 {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListType to have 3 fields")
}
// TypeMeta
typeMeta, ok := listValue.Field(0).Interface().(metav1.TypeMeta)
if !ok {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected TypeMeta field to have TypeMeta type")
}
if listType.Field(0).Tag.Get("json") != ",inline" {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected TypeMeta json field tag to be ",inline"`)
}
// ListMeta
listMeta, ok := listValue.Field(1).Interface().(metav1.ListMeta)
if !ok {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf("expected ListMeta field to have ListMeta type")
}
if listType.Field(1).Tag.Get("json") != "metadata,omitempty" {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected ListMeta json field tag to be "metadata,omitempty"`)
}
// Items
items, err := meta.ExtractList(list)
if err != nil {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, err
}
if listType.Field(2).Tag.Get("json") != "items" {
return metav1.TypeMeta{}, metav1.ListMeta{}, nil, fmt.Errorf(`expected Items json field tag to be "items"`)
}
return typeMeta, listMeta, items, nil
}

func streamingEncodeList(w io.Writer, typeMeta metav1.TypeMeta, listMeta metav1.ListMeta, items []runtime.Object) error {
// Start
if _, err := w.Write([]byte(`{`)); err != nil {
return err
}

// TypeMeta
if typeMeta.Kind != "" {
if err := encodeKeyValuePair(w, "kind", typeMeta.Kind, []byte(",")); err != nil {
return err
}
}
if typeMeta.APIVersion != "" {
if err := encodeKeyValuePair(w, "apiVersion", typeMeta.APIVersion, []byte(",")); err != nil {
return err
}
}

// ListMeta
if err := encodeKeyValuePair(w, "metadata", listMeta, []byte(",")); err != nil {
return err
}

// Items
if err := encodeItemsObjectSlice(w, items); err != nil {
return err
}

// End
_, err := w.Write([]byte("}\n"))
return err
}

func encodeItemsObjectSlice(w io.Writer, items []runtime.Object) (err error) {
if items == nil {
err := encodeKeyValuePair(w, "items", nil, nil)
return err
}
_, err = w.Write([]byte(`"items":[`))
if err != nil {
return err
}
suffix := []byte(",")
for i, item := range items {
if i == len(items)-1 {
suffix = nil
}
err := encodeValue(w, item, suffix)
if err != nil {
return err
}
}
_, err = w.Write([]byte("]"))
if err != nil {
return err
}
return err
}

func streamingEncodeUnstructuredList(w io.Writer, list *unstructured.UnstructuredList) error {
_, err := w.Write([]byte(`{`))
if err != nil {
return err
}
keys := slices.Collect(maps.Keys(list.Object))
if _, exists := list.Object["items"]; !exists {
keys = append(keys, "items")
}
sort.Strings(keys)

suffix := []byte(",")
for i, key := range keys {
if i == len(keys)-1 {
suffix = nil
}
if key == "items" {
err = encodeItemsUnstructuredSlice(w, list.Items, suffix)
} else {
err = encodeKeyValuePair(w, key, list.Object[key], suffix)
}
if err != nil {
return err
}
}
_, err = w.Write([]byte("}\n"))
return err
}

func encodeItemsUnstructuredSlice(w io.Writer, items []unstructured.Unstructured, suffix []byte) (err error) {
_, err = w.Write([]byte(`"items":[`))
if err != nil {
return err
}
comma := []byte(",")
for i, item := range items {
if i == len(items)-1 {
comma = nil
}
err := encodeValue(w, item.Object, comma)
if err != nil {
return err
}
}
_, err = w.Write([]byte("]"))
if err != nil {
return err
}
if len(suffix) > 0 {
_, err = w.Write(suffix)
}
return err
}

func encodeKeyValuePair(w io.Writer, key string, value any, suffix []byte) (err error) {
err = encodeValue(w, key, []byte(":"))
if err != nil {
return err
}
err = encodeValue(w, value, suffix)
if err != nil {
return err
}
return err
}

func encodeValue(w io.Writer, value any, suffix []byte) error {
data, err := json.Marshal(value)
if err != nil {
return err
}
_, err = w.Write(data)
if err != nil {
return err
}
if len(suffix) > 0 {
_, err = w.Write(suffix)
}
return err
}
Loading