Skip to content
8 changes: 8 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,14 @@ const (
// Enables support for the StorageVersionMigrator controller.
StorageVersionMigrator featuregate.Feature = "StorageVersionMigrator"

// owner: @serathius
// Allow API server JSON encoder to encode collections item by item, instead of all at once.
StreamingCollectionEncodingToJSON featuregate.Feature = "StreamingCollectionEncodingToJSON"

// owner: serathius
// Allow API server Protobuf encoder to encode collections item by item, instead of all at once.
StreamingCollectionEncodingToProtobuf featuregate.Feature = "StreamingCollectionEncodingToProtobuf"

// owner: @robscott
// kep: https://kep.k8s.io/2433
// alpha: v1.21
Expand Down
8 changes: 8 additions & 0 deletions pkg/features/versioned_kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package features

import (
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/component-base/featuregate"
)

Expand All @@ -31,4 +32,11 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
// genericfeatures.EmulationVersion: {
// {Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha},
// },
StreamingCollectionEncodingToJSON: {
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Beta},
},

StreamingCollectionEncodingToProtobuf: {
{Version: version.MustParse("1.31"), Default: false, PreRelease: featuregate.Beta},
},
}
14 changes: 14 additions & 0 deletions pkg/registry/core/rest/storage_core_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (
"k8s.io/client-go/informers"
restclient "k8s.io/client-go/rest"

"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
configmapstore "k8s.io/kubernetes/pkg/registry/core/configmap/storage"
Expand Down Expand Up @@ -69,6 +72,17 @@ func (c *GenericConfig) NewRESTStorage(apiResourceConfigSource serverstorage.API
NegotiatedSerializer: legacyscheme.Codecs,
}

opts := []serializer.CodecFactoryOptionsMutator{}
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
}
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf())
}
if len(opts) != 0 {
apiGroupInfo.NegotiatedSerializer = serializer.NewCodecFactory(legacyscheme.Scheme, opts...)
}

eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))
if err != nil {
return genericapiserver.APIGroupInfo{}, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ import (
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/generic"
genericfilters "k8s.io/apiserver/pkg/server/filters"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/apiserver/pkg/warning"
"k8s.io/client-go/scale"
Expand Down Expand Up @@ -848,6 +850,7 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
clusterScoped := crd.Spec.Scope == apiextensionsv1.ClusterScoped

// CRDs explicitly do not support protobuf, but some objects returned by the API server do
streamingCollections := utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON)
negotiatedSerializer := unstructuredNegotiatedSerializer{
typer: typer,
creator: creator,
Expand All @@ -861,10 +864,11 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
MediaTypeType: "application",
MediaTypeSubType: "json",
EncodesAsText: true,
Serializer: json.NewSerializer(json.DefaultMetaFactory, creator, typer, false),
PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, creator, typer, true),
Serializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{StreamingCollectionsEncoding: streamingCollections}),
PrettySerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{Pretty: true}),
StrictSerializer: json.NewSerializerWithOptions(json.DefaultMetaFactory, creator, typer, json.SerializerOptions{
Strict: true,
Strict: true,
StreamingCollectionsEncoding: streamingCollections,
}),
StreamSerializer: &runtime.StreamSerializerInfo{
EncodesAsText: true,
Expand All @@ -887,7 +891,9 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
MediaType: "application/vnd.kubernetes.protobuf",
MediaTypeType: "application",
MediaTypeSubType: "vnd.kubernetes.protobuf",
Serializer: protobuf.NewSerializer(creator, typer),
Serializer: protobuf.NewSerializerWithOptions(creator, typer, protobuf.SerializerOptions{
StreamingCollectionsEncoding: utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf),
}),
StreamSerializer: &runtime.StreamSerializerInfo{
Serializer: protobuf.NewRawSerializer(creator, typer),
Framer: protobuf.LengthDelimitedFramer,
Expand Down Expand Up @@ -958,7 +964,14 @@ func (r *crdHandler) getOrCreateServingInfoFor(uid types.UID, name string) (*crd
scaleScope := *requestScopes[v.Name]
scaleConverter := scale.NewScaleConverter()
scaleScope.Subresource = "scale"
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme())
var opts []serializer.CodecFactoryOptionsMutator
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToJSON) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToJSON())
}
if utilfeature.DefaultFeatureGate.Enabled(features.StreamingCollectionEncodingToProtobuf) {
opts = append(opts, serializer.WithStreamingCollectionEncodingToProtobuf())
}
scaleScope.Serializer = serializer.NewCodecFactory(scaleConverter.Scheme(), opts...)
scaleScope.Kind = autoscalingv1.SchemeGroupVersion.WithKind("Scale")
scaleScope.Namer = handlers.ContextBasedNaming{
Namer: meta.NewAccessor(),
Expand Down
3 changes: 3 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/api/meta/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ func extractList(obj runtime.Object, allocNew bool) ([]runtime.Object, error) {
if err != nil {
return nil, err
}
if items.IsNil() {
return nil, nil
}
list := make([]runtime.Object, items.Len())
if len(list) == 0 {
return list, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type serializerType struct {
func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, options CodecFactoryOptions) []serializerType {
jsonSerializer := json.NewSerializerWithOptions(
mf, scheme, scheme,
json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict},
json.SerializerOptions{Yaml: false, Pretty: false, Strict: options.Strict, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
)
jsonSerializerType := serializerType{
AcceptContentTypes: []string{runtime.ContentTypeJSON},
Expand All @@ -73,7 +73,7 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option

strictJSONSerializer := json.NewSerializerWithOptions(
mf, scheme, scheme,
json.SerializerOptions{Yaml: false, Pretty: false, Strict: true},
json.SerializerOptions{Yaml: false, Pretty: false, Strict: true, StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToJSON},
)
jsonSerializerType.StrictSerializer = strictJSONSerializer

Expand All @@ -85,7 +85,9 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory, option
mf, scheme, scheme,
json.SerializerOptions{Yaml: true, Pretty: false, Strict: true},
)
protoSerializer := protobuf.NewSerializer(scheme, scheme)
protoSerializer := protobuf.NewSerializerWithOptions(scheme, scheme, protobuf.SerializerOptions{
StreamingCollectionsEncoding: options.StreamingCollectionsEncodingToProtobuf,
})
protoRawSerializer := protobuf.NewRawSerializer(scheme, scheme)

serializers := []serializerType{
Expand Down Expand Up @@ -136,6 +138,9 @@ type CodecFactoryOptions struct {
Strict bool
// Pretty includes a pretty serializer along with the non-pretty one
Pretty bool

StreamingCollectionsEncodingToJSON bool
StreamingCollectionsEncodingToProtobuf bool
}

// CodecFactoryOptionsMutator takes a pointer to an options struct and then modifies it.
Expand All @@ -162,6 +167,18 @@ func DisableStrict(options *CodecFactoryOptions) {
options.Strict = false
}

func WithStreamingCollectionEncodingToJSON() CodecFactoryOptionsMutator {
return func(options *CodecFactoryOptions) {
options.StreamingCollectionsEncodingToJSON = true
}
}

func WithStreamingCollectionEncodingToProtobuf() CodecFactoryOptionsMutator {
return func(options *CodecFactoryOptions) {
options.StreamingCollectionsEncodingToProtobuf = true
}
}

// NewCodecFactory provides methods for retrieving serializers for the supported wire formats
// and conversion wrappers to define preferred internal and external versions. In the future,
// as the internal version is used less, callers may instead use a defaulting serializer and
Expand Down
Loading