From afecf50bb84375948970d42eb72dcc22203ed581 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Wed, 16 Jun 2021 15:04:01 -0700 Subject: [PATCH 1/7] Add proxy/dequeuer resource requests --- pkg/config/config.go | 7 +- pkg/consts/consts.go | 7 + pkg/lib/k8s/quantity.go | 23 +++ pkg/operator/endpoints/info.go | 5 +- pkg/operator/operator/memory_capacity.go | 8 +- pkg/operator/resources/errors.go | 8 +- pkg/operator/resources/validations.go | 189 +++++++++++++++------- pkg/types/clusterconfig/cluster_config.go | 11 -- pkg/types/spec/errors.go | 8 - pkg/types/spec/validations.go | 14 +- pkg/types/userconfig/api.go | 57 +++---- pkg/workloads/k8s.go | 29 +++- 12 files changed, 228 insertions(+), 138 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index e347a8acac..c442ec6f19 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -40,8 +40,7 @@ import ( var ( OperatorMetadata *clusterconfig.OperatorMetadata - ClusterConfig *clusterconfig.Config - InstancesMetadata []aws.InstanceMetadata + ClusterConfig *clusterconfig.Config AWS *aws.Client K8s *k8s.Client @@ -92,10 +91,6 @@ func Init() error { ClusterConfig = clusterConfig - for _, instanceType := range clusterConfig.GetAllInstanceTypes() { - InstancesMetadata = append(InstancesMetadata, aws.InstanceMetadatas[clusterConfig.Region][instanceType]) - } - AWS, err = aws.NewForRegion(clusterConfig.Region) if err != nil { return err diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index 1d1471fe76..7700318dff 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -18,6 +18,8 @@ package consts import ( "os" + + kresource "k8s.io/apimachinery/pkg/api/resource" ) var ( @@ -38,6 +40,11 @@ var ( StatsDPortStr = "9125" + CortexProxyCPU = kresource.MustParse("100m") + CortexProxyMem = kresource.MustParse("100Mi") + CortexDequeuerCPU = kresource.MustParse("100m") + CortexDequeuerMem = kresource.MustParse("100Mi") + AuthHeader = "X-Cortex-Authorization" DefaultInClusterConfigPath = "/configs/cluster/cluster.yaml" diff --git a/pkg/lib/k8s/quantity.go b/pkg/lib/k8s/quantity.go index 72285ff951..8805543a86 100644 --- a/pkg/lib/k8s/quantity.go +++ b/pkg/lib/k8s/quantity.go @@ -95,6 +95,22 @@ func NewMilliQuantity(milliValue int64) *Quantity { } } +// Returns nil if no quantities are passed in +func NewSummed(quantities ...kresource.Quantity) *Quantity { + if len(quantities) == 0 { + return nil + } + + k8sQuantity := kresource.Quantity{} + for _, q := range quantities { + k8sQuantity.Add(q) + } + + return &Quantity{ + Quantity: k8sQuantity, + } +} + func (quantity *Quantity) MilliString() string { return s.Int64(quantity.Quantity.MilliValue()) + "m" } @@ -169,6 +185,13 @@ func (quantity *Quantity) ID() string { return s.Int64(quantity.MilliValue()) } +func (quantity *Quantity) DeepCopy() Quantity { + return Quantity{ + Quantity: quantity.Quantity.DeepCopy(), + UserString: quantity.UserString, + } +} + func QuantityPtr(k8sQuantity kresource.Quantity) *kresource.Quantity { return &k8sQuantity } diff --git a/pkg/operator/endpoints/info.go b/pkg/operator/endpoints/info.go index c0c04e0ce0..56e8ea1617 100644 --- a/pkg/operator/endpoints/info.go +++ b/pkg/operator/endpoints/info.go @@ -38,9 +38,8 @@ func Info(w http.ResponseWriter, r *http.Request) { } fullClusterConfig := clusterconfig.InternalConfig{ - Config: *config.ClusterConfig, - OperatorMetadata: *config.OperatorMetadata, - InstancesMetadata: config.InstancesMetadata, + Config: *config.ClusterConfig, + OperatorMetadata: *config.OperatorMetadata, } response := schema.InfoResponse{ diff --git a/pkg/operator/operator/memory_capacity.go b/pkg/operator/operator/memory_capacity.go index 18881f34f8..57f0bf64f0 100644 --- a/pkg/operator/operator/memory_capacity.go +++ b/pkg/operator/operator/memory_capacity.go @@ -18,6 +18,7 @@ package operator import ( "github.com/cortexlabs/cortex/pkg/config" + "github.com/cortexlabs/cortex/pkg/lib/aws" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/slices" kresource "k8s.io/apimachinery/pkg/api/resource" @@ -104,9 +105,10 @@ func UpdateMemoryCapacityConfigMap() (map[string]kresource.Quantity, error) { primaryInstances := []string{} minMemMap := map[string]kresource.Quantity{} - for _, instanceMetadata := range config.InstancesMetadata { - minMemMap[instanceMetadata.Type] = instanceMetadata.Memory - primaryInstances = append(primaryInstances, instanceMetadata.Type) + for _, ng := range config.ClusterConfig.NodeGroups { + instanceMetadata := aws.InstanceMetadatas[config.ClusterConfig.Region][ng.InstanceType] + minMemMap[ng.InstanceType] = instanceMetadata.Memory + primaryInstances = append(primaryInstances, ng.InstanceType) } nodeMemCapacityMap, err := getMemoryCapacityFromNodes(primaryInstances) diff --git a/pkg/operator/resources/errors.go b/pkg/operator/resources/errors.go index 6ad110ff44..0c5b324cd4 100644 --- a/pkg/operator/resources/errors.go +++ b/pkg/operator/resources/errors.go @@ -73,14 +73,10 @@ func ErrorCannotChangeKindOfDeployedAPI(name string, newKind, prevKind userconfi }) } -func ErrorNoAvailableNodeComputeLimit(resource string, reqStr string, maxStr string) error { - message := fmt.Sprintf("no instances can satisfy the requested %s quantity - requested %s %s but instances only have %s %s available", resource, reqStr, resource, maxStr, resource) - if maxStr == "0" { - message = fmt.Sprintf("no instances can satisfy the requested %s quantity - requested %s %s but instances don't have any %s", resource, reqStr, resource, resource) - } +func ErrorNoAvailableNodeComputeLimit(msg string) error { return errors.WithStack(&errors.Error{ Kind: ErrNoAvailableNodeComputeLimit, - Message: message, + Message: msg, }) } diff --git a/pkg/operator/resources/validations.go b/pkg/operator/resources/validations.go index 3a01f3b7e8..55f245b172 100644 --- a/pkg/operator/resources/validations.go +++ b/pkg/operator/resources/validations.go @@ -18,15 +18,22 @@ package resources import ( "fmt" + "strings" "github.com/cortexlabs/cortex/pkg/config" + "github.com/cortexlabs/cortex/pkg/consts" + "github.com/cortexlabs/cortex/pkg/lib/aws" + "github.com/cortexlabs/cortex/pkg/lib/console" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" + "github.com/cortexlabs/cortex/pkg/lib/slices" s "github.com/cortexlabs/cortex/pkg/lib/strings" + "github.com/cortexlabs/cortex/pkg/lib/table" "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/types/spec" "github.com/cortexlabs/cortex/pkg/types/userconfig" + "github.com/cortexlabs/cortex/pkg/workloads" istioclientnetworking "istio.io/client-go/pkg/apis/networking/v1beta1" kresource "k8s.io/apimachinery/pkg/api/resource" ) @@ -134,83 +141,149 @@ var _inferentiaCPUReserve = kresource.MustParse("100m") var _inferentiaMemReserve = kresource.MustParse("100Mi") func validateK8sCompute(api *userconfig.API, maxMemMap map[string]kresource.Quantity) error { - allErrors := []error{} - successfulLoops := 0 - clusterNodeGroupNames := strset.New(config.ClusterConfig.GetNodeGroupNames()...) - apiNodeGroupNames := api.NodeGroups - - if apiNodeGroupNames != nil { - for _, ngName := range apiNodeGroupNames { - if !clusterNodeGroupNames.Has(ngName) { - return ErrorInvalidNodeGroupSelector(ngName, config.ClusterConfig.GetNodeGroupNames()) - } + for _, ngName := range api.NodeGroups { + if !clusterNodeGroupNames.Has(ngName) { + return ErrorInvalidNodeGroupSelector(ngName, config.ClusterConfig.GetNodeGroupNames()) } } - compute := userconfig.GetTotalComputeFromContainers(api.Pod.Containers) + compute := userconfig.GetPodComputeRequest(api) - for _, instanceMetadata := range config.InstancesMetadata { - if apiNodeGroupNames != nil { - matchedNodeGroups := 0 - for _, ngName := range apiNodeGroupNames { - if config.ClusterConfig.GetNodeGroupByName(ngName).InstanceType == instanceMetadata.Type { - matchedNodeGroups++ - } - } - if matchedNodeGroups == 0 { - continue - } + for _, ng := range config.ClusterConfig.NodeGroups { + if api.NodeGroups != nil && !slices.HasString(api.NodeGroups, ng.Name) { + continue } - maxMemLoop := maxMemMap[instanceMetadata.Type] - maxMemLoop.Sub(_cortexMemReserve) + nodeCPU, nodeMem, nodeGPU, nodeInf := getNodeCapacity(ng.InstanceType, maxMemMap) - maxCPU := instanceMetadata.CPU - maxCPU.Sub(_cortexCPUReserve) - - maxGPU := instanceMetadata.GPU - if maxGPU > 0 { - // Reserve resources for nvidia device plugin daemonset - maxCPU.Sub(_nvidiaCPUReserve) - maxMemLoop.Sub(_nvidiaMemReserve) - // Reserve resources for nvidia dcgm prometheus exporter - maxCPU.Sub(_nvidiaDCGMExporterCPUReserve) - maxMemLoop.Sub(_nvidiaDCGMExporterMemReserve) + if compute.CPU != nil && nodeCPU.Cmp(compute.CPU.Quantity) < 0 { + continue + } else if compute.Mem != nil && nodeMem.Cmp(compute.Mem.Quantity) < 0 { + continue + } else if compute.GPU > nodeGPU { + continue + } else if compute.Inf > nodeInf { + continue } - maxInf := instanceMetadata.Inf - if maxInf > 0 { - // Reserve resources for inferentia device plugin daemonset - maxCPU.Sub(_inferentiaCPUReserve) - maxMemLoop.Sub(_inferentiaMemReserve) - } + // we found a node group that has capacity + return nil + } - loopErrors := []error{} - if compute.CPU != nil && maxCPU.Cmp(compute.CPU.Quantity) < 0 { - loopErrors = append(loopErrors, ErrorNoAvailableNodeComputeLimit("CPU", compute.CPU.String(), maxCPU.String())) - } - if compute.Mem != nil && maxMemLoop.Cmp(compute.Mem.Quantity) < 0 { - loopErrors = append(loopErrors, ErrorNoAvailableNodeComputeLimit("memory", compute.Mem.String(), maxMemLoop.String())) - } - if compute.GPU > maxGPU { - loopErrors = append(loopErrors, ErrorNoAvailableNodeComputeLimit("GPU", fmt.Sprintf("%d", compute.GPU), fmt.Sprintf("%d", maxGPU))) + // no nodegroups have capacity + errMsg := "no instance types in your cluster are large enough to satisfy the requested resources for your pod\n\n" + errMsg += console.Bold("requested pod resources") + errMsg += podResourceRequestsTable(api, compute) + errMsg += console.Bold("\nnodegroup resources") + errMsg += nodeGroupResourcesTable(api, compute, maxMemMap) + return ErrorNoAvailableNodeComputeLimit(errMsg) +} + +func podResourceRequestsTable(api *userconfig.API, compute userconfig.Compute) string { + sidecarCPUNote := "" + sidecarMemNote := "" + if api.Kind == userconfig.RealtimeAPIKind { + sidecarCPUNote = fmt.Sprintf(" (including %s CPU for the %s sidecar container)", consts.CortexProxyCPU, workloads.ProxyContainerName) + sidecarMemNote = fmt.Sprintf(" (including %s memory for the %s sidecar container)", consts.CortexProxyMem, workloads.ProxyContainerName) + } else if api.Kind == userconfig.AsyncAPIKind || api.Kind == userconfig.BatchAPIKind { + sidecarCPUNote = fmt.Sprintf(" (including %s CPU for the %s sidecar container)", consts.CortexDequeuerCPU, workloads.DequeuerContainerName) + sidecarMemNote = fmt.Sprintf(" (including %s memory for the %s sidecar container)", consts.CortexDequeuerMem, workloads.DequeuerContainerName) + } + + var items table.KeyValuePairs + if compute.CPU != nil { + items.Add("CPU", compute.CPU.String()+sidecarCPUNote) + } + if compute.Mem != nil { + items.Add("memory", compute.Mem.String()+sidecarMemNote) + } + if compute.GPU > 0 { + items.Add("GPU", compute.GPU) + } + if compute.Inf > 0 { + items.Add("Inf", compute.Inf) + } + + return items.String() +} + +func nodeGroupResourcesTable(api *userconfig.API, compute userconfig.Compute, maxMemMap map[string]kresource.Quantity) string { + var skippedNodeGroups []string + var nodeGroupResourceRows [][]interface{} + + showGPU := false + showInf := false + if compute.GPU > 0 { + showGPU = true + } + if compute.Inf > 0 { + showInf = true + } + + for _, ng := range config.ClusterConfig.NodeGroups { + nodeCPU, nodeMem, nodeGPU, nodeInf := getNodeCapacity(ng.InstanceType, maxMemMap) + if nodeGPU > 0 { + showGPU = true } - if compute.Inf > maxInf { - loopErrors = append(loopErrors, ErrorNoAvailableNodeComputeLimit("Inf", fmt.Sprintf("%d", compute.Inf), fmt.Sprintf("%d", maxInf))) + if nodeInf > 0 { + showInf = true } - if errors.HasError(loopErrors) { - allErrors = append(allErrors, errors.FirstError(loopErrors...)) + + if api.NodeGroups != nil && !slices.HasString(api.NodeGroups, ng.Name) { + skippedNodeGroups = append(skippedNodeGroups, ng.Name) } else { - successfulLoops++ + nodeGroupResourceRows = append(nodeGroupResourceRows, []interface{}{ng.Name, ng.InstanceType, nodeCPU, nodeMem, nodeGPU, nodeInf}) } } - if successfulLoops == 0 { - return errors.FirstError(allErrors...) + nodeGroupResourceRowsTable := table.Table{ + Headers: []table.Header{ + {Title: "node group"}, + {Title: "instance type"}, + {Title: "CPU"}, + {Title: "memory"}, + {Title: "GPU", Hidden: !showGPU}, + {Title: "Inf", Hidden: !showInf}, + }, + Rows: nodeGroupResourceRows, } - return nil + out := nodeGroupResourceRowsTable.MustFormat() + if len(skippedNodeGroups) > 0 { + out += fmt.Sprintf("\nthe following node groups were skipped (they are not listed in the api configuration's %s field): %s", userconfig.NodeGroupsKey, strings.Join(skippedNodeGroups, ", ")) + } + + return out +} + +func getNodeCapacity(instanceType string, maxMemMap map[string]kresource.Quantity) (kresource.Quantity, kresource.Quantity, int64, int64) { + instanceMetadata := aws.InstanceMetadatas[config.ClusterConfig.Region][instanceType] + + cpu := instanceMetadata.CPU.DeepCopy() + cpu.Sub(_cortexCPUReserve) + + mem := maxMemMap[instanceType].DeepCopy() + mem.Sub(_cortexMemReserve) + + gpu := instanceMetadata.GPU + if gpu > 0 { + // Reserve resources for nvidia device plugin daemonset + cpu.Sub(_nvidiaCPUReserve) + mem.Sub(_nvidiaMemReserve) + // Reserve resources for nvidia dcgm prometheus exporter + cpu.Sub(_nvidiaDCGMExporterCPUReserve) + mem.Sub(_nvidiaDCGMExporterMemReserve) + } + + inf := instanceMetadata.Inf + if inf > 0 { + // Reserve resources for inferentia device plugin daemonset + cpu.Sub(_inferentiaCPUReserve) + mem.Sub(_inferentiaMemReserve) + } + + return cpu, mem, gpu, inf } func validateEndpointCollisions(api *userconfig.API, virtualServices []istioclientnetworking.VirtualService) error { diff --git a/pkg/types/clusterconfig/cluster_config.go b/pkg/types/clusterconfig/cluster_config.go index d82d2fa627..57215680a0 100644 --- a/pkg/types/clusterconfig/cluster_config.go +++ b/pkg/types/clusterconfig/cluster_config.go @@ -173,8 +173,6 @@ type InternalConfig struct { // Populated by operator OperatorMetadata - - InstancesMetadata []aws.InstanceMetadata `json:"instance_metadata"` } // The bare minimum to identify a cluster @@ -1490,15 +1488,6 @@ func (mc *ManagedConfig) TelemetryEvent() map[string]interface{} { return event } -func (mc *ManagedConfig) GetAllInstanceTypes() []string { - allInstanceTypes := strset.New() - for _, ng := range mc.NodeGroups { - allInstanceTypes.Add(ng.InstanceType) - } - - return allInstanceTypes.Slice() -} - func (mc *ManagedConfig) GetNodeGroupByName(name string) *NodeGroup { for _, ng := range mc.NodeGroups { if ng.Name == name { diff --git a/pkg/types/spec/errors.go b/pkg/types/spec/errors.go index fcb6643fa9..24d3a7af5e 100644 --- a/pkg/types/spec/errors.go +++ b/pkg/types/spec/errors.go @@ -55,7 +55,6 @@ const ( ErrCortexPrefixedEnvVarNotAllowed = "spec.cortex_prefixed_env_var_not_allowed" ErrDisallowedEnvVars = "spec.disallowed_env_vars" ErrComputeResourceConflict = "spec.compute_resource_conflict" - ErrInvalidNumberOfInfs = "spec.invalid_number_of_infs" ErrIncorrectTrafficSplitterWeight = "spec.incorrect_traffic_splitter_weight" ErrTrafficSplitterAPIsNotUnique = "spec.traffic_splitter_apis_not_unique" ErrOneShadowPerTrafficSplitter = "spec.one_shadow_per_traffic_splitter" @@ -250,13 +249,6 @@ func ErrorComputeResourceConflict(resourceA, resourceB string) error { }) } -func ErrorInvalidNumberOfInfs(requestedInfs int64) error { - return errors.WithStack(&errors.Error{ - Kind: ErrInvalidNumberOfInfs, - Message: fmt.Sprintf("cannot request %d Infs (currently only 1 Inf can be used per API replica, due to AWS's bug: https://github.com/aws/aws-neuron-sdk/issues/110)", requestedInfs), - }) -} - func ErrorIncorrectTrafficSplitterWeightTotal(totalWeight int32) error { return errors.WithStack(&errors.Error{ Kind: ErrIncorrectTrafficSplitterWeight, diff --git a/pkg/types/spec/validations.go b/pkg/types/spec/validations.go index 3cc2385287..d203db77c0 100644 --- a/pkg/types/spec/validations.go +++ b/pkg/types/spec/validations.go @@ -711,8 +711,6 @@ func validatePod( awsClient *aws.Client, k8sClient *k8s.Client, ) error { - containers := api.Pod.Containers - totalCompute := userconfig.GetTotalComputeFromContainers(containers) if api.Pod.Port != nil && api.Kind == userconfig.TaskAPIKind { return ErrorFieldIsNotSupportedForKind(userconfig.PortKey, api.Kind) @@ -721,11 +719,11 @@ func validatePod( api.Pod.Port = pointer.Int32(consts.DefaultUserPodPortInt32) } - if err := validateCompute(totalCompute); err != nil { + if err := validateCompute(api); err != nil { return errors.Wrap(err, userconfig.ComputeKey) } - if err := validateContainers(containers, api.Kind, awsClient, k8sClient); err != nil { + if err := validateContainers(api.Pod.Containers, api.Kind, awsClient, k8sClient); err != nil { return errors.Wrap(err, userconfig.ContainersKey) } @@ -844,15 +842,13 @@ func validateAutoscaling(api *userconfig.API) error { return nil } -func validateCompute(compute userconfig.Compute) error { +func validateCompute(api *userconfig.API) error { + compute := userconfig.GetPodComputeRequest(api) + if compute.GPU > 0 && compute.Inf > 0 { return ErrorComputeResourceConflict(userconfig.GPUKey, userconfig.InfKey) } - if compute.Inf > 1 { - return ErrorInvalidNumberOfInfs(compute.Inf) - } - return nil } diff --git a/pkg/types/userconfig/api.go b/pkg/types/userconfig/api.go index d601112e20..c2f8585941 100644 --- a/pkg/types/userconfig/api.go +++ b/pkg/types/userconfig/api.go @@ -21,12 +21,14 @@ import ( "strings" "time" + "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/pointer" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" s "github.com/cortexlabs/cortex/pkg/lib/strings" "github.com/cortexlabs/cortex/pkg/lib/urls" "github.com/cortexlabs/yaml" + kresource "k8s.io/apimachinery/pkg/api/resource" kmeta "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -463,46 +465,45 @@ func ZeroCompute() Compute { } } -func GetTotalComputeFromContainers(containers []*Container) Compute { - compute := Compute{} +func GetPodComputeRequest(api *API) Compute { + var cpuQtys []kresource.Quantity + var memQtys []kresource.Quantity + var shmQtys []kresource.Quantity + var totalGPU int64 + var totalInf int64 - for _, container := range containers { + for _, container := range api.Pod.Containers { if container == nil || container.Compute == nil { continue } - if container.Compute.CPU != nil { - newCPUQuantity := k8s.NewMilliQuantity(container.Compute.CPU.ToDec().MilliValue()) - if compute.CPU == nil { - compute.CPU = newCPUQuantity - } else if newCPUQuantity != nil { - compute.CPU.AddQty(*newCPUQuantity) - } + cpuQtys = append(cpuQtys, container.Compute.CPU.Quantity) } - if container.Compute.Mem != nil { - newMemQuantity := k8s.NewMilliQuantity(container.Compute.Mem.ToDec().MilliValue()) - if compute.Mem == nil { - compute.Mem = newMemQuantity - } else if newMemQuantity != nil { - compute.Mem.AddQty(*newMemQuantity) - } + memQtys = append(memQtys, container.Compute.Mem.Quantity) } - if container.Compute.Shm != nil { - newShmQuantity := k8s.NewMilliQuantity(container.Compute.Shm.ToDec().MilliValue()) - if compute.Shm == nil { - compute.Shm = newShmQuantity - } else if newShmQuantity != nil { - compute.Shm.AddQty(*newShmQuantity) - } + shmQtys = append(shmQtys, container.Compute.Shm.Quantity) } + totalGPU += container.Compute.GPU + totalInf += container.Compute.Inf + } - compute.GPU += container.Compute.GPU - compute.Inf += container.Compute.Inf + if api.Kind == RealtimeAPIKind { + cpuQtys = append(cpuQtys, consts.CortexProxyCPU) + memQtys = append(memQtys, consts.CortexProxyMem) + } else if api.Kind == AsyncAPIKind || api.Kind == BatchAPIKind { + cpuQtys = append(cpuQtys, consts.CortexDequeuerCPU) + memQtys = append(memQtys, consts.CortexDequeuerMem) } - return compute + return Compute{ + CPU: k8s.NewSummed(cpuQtys...), + Mem: k8s.NewSummed(memQtys...), + Shm: k8s.NewSummed(shmQtys...), + GPU: totalGPU, + Inf: totalInf, + } } func GetContainerNames(containers []*Container) strset.Set { @@ -558,7 +559,7 @@ func (api *API) TelemetryEvent() map[string]interface{} { event["pod.containers._num_readiness_probes"] = numReadinessProbes event["pod.containers._num_liveness_probes"] = numLivenessProbes - totalCompute := GetTotalComputeFromContainers(api.Pod.Containers) + totalCompute := GetPodComputeRequest(api) if totalCompute.CPU != nil { event["pod.containers.compute.cpu._is_defined"] = true event["pod.containers.compute.cpu"] = float64(totalCompute.CPU.MilliValue()) / 1000 diff --git a/pkg/workloads/k8s.go b/pkg/workloads/k8s.go index 6519d29012..c4ebaa2e60 100644 --- a/pkg/workloads/k8s.go +++ b/pkg/workloads/k8s.go @@ -47,9 +47,8 @@ const ( _gatewayContainerName = "gateway" - _proxyContainerName = "proxy" - - _dequeuerContainerName = "dequeuer" + ProxyContainerName = "proxy" + DequeuerContainerName = "dequeuer" _kubexitGraveyardName = "graveyard" _kubexitGraveyardMountPath = "/graveyard" @@ -115,7 +114,7 @@ func AsyncGatewayContainer(api spec.API, queueURL string, volumeMounts []kcore.V func asyncDequeuerProxyContainer(api spec.API, queueURL string) (kcore.Container, kcore.Volume) { return kcore.Container{ - Name: _dequeuerContainerName, + Name: DequeuerContainerName, Image: config.ClusterConfig.ImageDequeuer, ImagePullPolicy: kcore.PullAlways, Command: []string{ @@ -146,6 +145,12 @@ func asyncDequeuerProxyContainer(api spec.API, queueURL string) (kcore.Container ContainerPort: consts.AdminPortInt32, }, }, + Resources: kcore.ResourceRequirements{ + Requests: kcore.ResourceList{ + kcore.ResourceCPU: consts.CortexDequeuerCPU, + kcore.ResourceMemory: consts.CortexDequeuerMem, + }, + }, ReadinessProbe: &kcore.Probe{ Handler: kcore.Handler{ HTTPGet: &kcore.HTTPGetAction{ @@ -167,7 +172,7 @@ func asyncDequeuerProxyContainer(api spec.API, queueURL string) (kcore.Container func batchDequeuerProxyContainer(api spec.API, jobID, queueURL string) (kcore.Container, kcore.Volume) { return kcore.Container{ - Name: _dequeuerContainerName, + Name: DequeuerContainerName, Image: config.ClusterConfig.ImageDequeuer, ImagePullPolicy: kcore.PullAlways, Command: []string{ @@ -193,6 +198,12 @@ func batchDequeuerProxyContainer(api spec.API, jobID, queueURL string) (kcore.Co }, }, }), + Resources: kcore.ResourceRequirements{ + Requests: kcore.ResourceList{ + kcore.ResourceCPU: consts.CortexDequeuerCPU, + kcore.ResourceMemory: consts.CortexDequeuerMem, + }, + }, ReadinessProbe: &kcore.Probe{ Handler: kcore.Handler{ HTTPGet: &kcore.HTTPGetAction{ @@ -215,7 +226,7 @@ func batchDequeuerProxyContainer(api spec.API, jobID, queueURL string) (kcore.Co func realtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) { return kcore.Container{ - Name: _proxyContainerName, + Name: ProxyContainerName, Image: config.ClusterConfig.ImageProxy, ImagePullPolicy: kcore.PullAlways, Args: []string{ @@ -241,6 +252,12 @@ func realtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) { VolumeMounts: []kcore.VolumeMount{ ClusterConfigMount(), }, + Resources: kcore.ResourceRequirements{ + Requests: kcore.ResourceList{ + kcore.ResourceCPU: consts.CortexProxyCPU, + kcore.ResourceMemory: consts.CortexProxyMem, + }, + }, ReadinessProbe: &kcore.Probe{ Handler: kcore.Handler{ HTTPGet: &kcore.HTTPGetAction{ From c5ebb86cb91a4d1ede288df15777b595291fbabe Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Wed, 16 Jun 2021 15:12:27 -0700 Subject: [PATCH 2/7] Update docs --- docs/workloads/async/containers.md | 4 ++++ docs/workloads/batch/containers.md | 4 ++++ docs/workloads/realtime/containers.md | 4 ++++ 3 files changed, 12 insertions(+) diff --git a/docs/workloads/async/containers.md b/docs/workloads/async/containers.md index d61d1ced57..8f557ad11c 100644 --- a/docs/workloads/async/containers.md +++ b/docs/workloads/async/containers.md @@ -27,6 +27,10 @@ Your API pod can contain multiple containers, only one of which can be listening The `/mnt` directory is mounted to each container's filesystem, and is shared across all containers. +## Resource requests + +Each container in the pod requests its own amount of CPU, memory, GPU, and Inferentia resources. In addition, Cortex's dequeuer sidecar container (which is automatically added to the pod) requests 100m CPU and 100Mi memory. + ## Observability See docs for [logging](../../clusters/observability/logging.md), [metrics](../../clusters/observability/metrics.md), and [alerting](../../clusters/observability/metrics.md). diff --git a/docs/workloads/batch/containers.md b/docs/workloads/batch/containers.md index eb3a971e79..67263f4b33 100644 --- a/docs/workloads/batch/containers.md +++ b/docs/workloads/batch/containers.md @@ -33,6 +33,10 @@ Your API pod can contain multiple containers, only one of which can be listening The `/mnt` directory is mounted to each container's filesystem, and is shared across all containers. +## Resource requests + +Each container in the pod requests its own amount of CPU, memory, GPU, and Inferentia resources. In addition, Cortex's dequeuer sidecar container (which is automatically added to the pod) requests 100m CPU and 100Mi memory. + ## Observability See docs for [logging](../../clusters/observability/logging.md), [metrics](../../clusters/observability/metrics.md), and [alerting](../../clusters/observability/metrics.md). diff --git a/docs/workloads/realtime/containers.md b/docs/workloads/realtime/containers.md index b3b502a135..f700b54cee 100644 --- a/docs/workloads/realtime/containers.md +++ b/docs/workloads/realtime/containers.md @@ -25,6 +25,10 @@ Your API pod can contain multiple containers, only one of which can be listening The `/mnt` directory is mounted to each container's file system, and is shared across all containers. +## Resource requests + +Each container in the pod requests its own amount of CPU, memory, GPU, and Inferentia resources. In addition, Cortex's proxy sidecar container (which is automatically added to the pod) requests 100m CPU and 100Mi memory. + ## Observability See docs for [logging](../../clusters/observability/logging.md), [metrics](../../clusters/observability/metrics.md), and [alerting](../../clusters/observability/metrics.md). From 0ba24c42f1c803dd389537f26e8ebdf9ea0b5c9e Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Wed, 16 Jun 2021 16:38:58 -0700 Subject: [PATCH 3/7] Fix lint --- pkg/operator/resources/validations.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/operator/resources/validations.go b/pkg/operator/resources/validations.go index 55f245b172..3242ef8ce0 100644 --- a/pkg/operator/resources/validations.go +++ b/pkg/operator/resources/validations.go @@ -184,11 +184,11 @@ func podResourceRequestsTable(api *userconfig.API, compute userconfig.Compute) s sidecarCPUNote := "" sidecarMemNote := "" if api.Kind == userconfig.RealtimeAPIKind { - sidecarCPUNote = fmt.Sprintf(" (including %s CPU for the %s sidecar container)", consts.CortexProxyCPU, workloads.ProxyContainerName) - sidecarMemNote = fmt.Sprintf(" (including %s memory for the %s sidecar container)", consts.CortexProxyMem, workloads.ProxyContainerName) + sidecarCPUNote = fmt.Sprintf(" (including %s CPU for the %s sidecar container)", consts.CortexProxyCPU.String(), workloads.ProxyContainerName) + sidecarMemNote = fmt.Sprintf(" (including %s memory for the %s sidecar container)", consts.CortexProxyMem.String(), workloads.ProxyContainerName) } else if api.Kind == userconfig.AsyncAPIKind || api.Kind == userconfig.BatchAPIKind { - sidecarCPUNote = fmt.Sprintf(" (including %s CPU for the %s sidecar container)", consts.CortexDequeuerCPU, workloads.DequeuerContainerName) - sidecarMemNote = fmt.Sprintf(" (including %s memory for the %s sidecar container)", consts.CortexDequeuerMem, workloads.DequeuerContainerName) + sidecarCPUNote = fmt.Sprintf(" (including %s CPU for the %s sidecar container)", consts.CortexDequeuerCPU.String(), workloads.DequeuerContainerName) + sidecarMemNote = fmt.Sprintf(" (including %s memory for the %s sidecar container)", consts.CortexDequeuerMem.String(), workloads.DequeuerContainerName) } var items table.KeyValuePairs From 954535a9a9e533540329c550e9026259fb293cde Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Wed, 16 Jun 2021 17:03:03 -0700 Subject: [PATCH 4/7] Improve formatting --- pkg/lib/k8s/quantity.go | 16 ++++++++++++++-- pkg/operator/resources/validations.go | 13 ++++++------- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/lib/k8s/quantity.go b/pkg/lib/k8s/quantity.go index 8805543a86..68de2ebc66 100644 --- a/pkg/lib/k8s/quantity.go +++ b/pkg/lib/k8s/quantity.go @@ -119,11 +119,23 @@ func (quantity *Quantity) ToFloat32() float32 { return float32(quantity.Quantity.MilliValue()) / float32(1000) } -func (quantity *Quantity) ToKi() int64 { - kiFloat := float64(quantity.Quantity.Value()) / float64(1024) +func ToKi(k8sQuantity kresource.Quantity) int64 { + kiFloat := float64(k8sQuantity.Value()) / float64(1024) return int64(math.Round(kiFloat)) } +func ToKiStr(k8sQuantity kresource.Quantity) string { + return s.Int64(ToKi(k8sQuantity)) + "Ki" +} + +func (quantity *Quantity) ToKi() int64 { + return ToKi(quantity.Quantity) +} + +func (quantity *Quantity) ToKiStr() string { + return ToKiStr(quantity.Quantity) +} + // SplitInTwo divides the quantity in two and return both halves (ensuring they add up to the original value) func (quantity *Quantity) SplitInTwo() (*kresource.Quantity, *kresource.Quantity) { return SplitInTwo(&quantity.Quantity) diff --git a/pkg/operator/resources/validations.go b/pkg/operator/resources/validations.go index 3242ef8ce0..be0817f4e1 100644 --- a/pkg/operator/resources/validations.go +++ b/pkg/operator/resources/validations.go @@ -173,10 +173,9 @@ func validateK8sCompute(api *userconfig.API, maxMemMap map[string]kresource.Quan // no nodegroups have capacity errMsg := "no instance types in your cluster are large enough to satisfy the requested resources for your pod\n\n" - errMsg += console.Bold("requested pod resources") + errMsg += console.Bold("requested pod resources\n") errMsg += podResourceRequestsTable(api, compute) - errMsg += console.Bold("\nnodegroup resources") - errMsg += nodeGroupResourcesTable(api, compute, maxMemMap) + errMsg += "\n" + s.TrimTrailingNewLines(nodeGroupResourcesTable(api, compute, maxMemMap)) return ErrorNoAvailableNodeComputeLimit(errMsg) } @@ -185,10 +184,10 @@ func podResourceRequestsTable(api *userconfig.API, compute userconfig.Compute) s sidecarMemNote := "" if api.Kind == userconfig.RealtimeAPIKind { sidecarCPUNote = fmt.Sprintf(" (including %s CPU for the %s sidecar container)", consts.CortexProxyCPU.String(), workloads.ProxyContainerName) - sidecarMemNote = fmt.Sprintf(" (including %s memory for the %s sidecar container)", consts.CortexProxyMem.String(), workloads.ProxyContainerName) + sidecarMemNote = fmt.Sprintf(" (including %s memory for the %s sidecar container)", k8s.ToKiStr(consts.CortexProxyMem), workloads.ProxyContainerName) } else if api.Kind == userconfig.AsyncAPIKind || api.Kind == userconfig.BatchAPIKind { sidecarCPUNote = fmt.Sprintf(" (including %s CPU for the %s sidecar container)", consts.CortexDequeuerCPU.String(), workloads.DequeuerContainerName) - sidecarMemNote = fmt.Sprintf(" (including %s memory for the %s sidecar container)", consts.CortexDequeuerMem.String(), workloads.DequeuerContainerName) + sidecarMemNote = fmt.Sprintf(" (including %s memory for the %s sidecar container)", k8s.ToKiStr(consts.CortexDequeuerMem), workloads.DequeuerContainerName) } var items table.KeyValuePairs @@ -196,7 +195,7 @@ func podResourceRequestsTable(api *userconfig.API, compute userconfig.Compute) s items.Add("CPU", compute.CPU.String()+sidecarCPUNote) } if compute.Mem != nil { - items.Add("memory", compute.Mem.String()+sidecarMemNote) + items.Add("memory", compute.Mem.ToKiStr()+sidecarMemNote) } if compute.GPU > 0 { items.Add("GPU", compute.GPU) @@ -251,7 +250,7 @@ func nodeGroupResourcesTable(api *userconfig.API, compute userconfig.Compute, ma out := nodeGroupResourceRowsTable.MustFormat() if len(skippedNodeGroups) > 0 { - out += fmt.Sprintf("\nthe following node groups were skipped (they are not listed in the api configuration's %s field): %s", userconfig.NodeGroupsKey, strings.Join(skippedNodeGroups, ", ")) + out += fmt.Sprintf("\nthe following %s skipped (based on the api configuration's %s field): %s", s.PluralCustom("node group was", "node groups were", len(skippedNodeGroups)), userconfig.NodeGroupsKey, strings.Join(skippedNodeGroups, ", ")) } return out From 030efb22eeceff81f310472a24604347ea9b3a40 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Wed, 16 Jun 2021 17:28:33 -0700 Subject: [PATCH 5/7] Improve formatting --- pkg/lib/k8s/quantity.go | 81 ++++++++++++++++++++++++--- pkg/operator/resources/errors.go | 2 +- pkg/operator/resources/validations.go | 16 +++--- 3 files changed, 83 insertions(+), 16 deletions(-) diff --git a/pkg/lib/k8s/quantity.go b/pkg/lib/k8s/quantity.go index 68de2ebc66..90e3dba3b5 100644 --- a/pkg/lib/k8s/quantity.go +++ b/pkg/lib/k8s/quantity.go @@ -119,21 +119,88 @@ func (quantity *Quantity) ToFloat32() float32 { return float32(quantity.Quantity.MilliValue()) / float32(1000) } -func ToKi(k8sQuantity kresource.Quantity) int64 { +func ToKiRounded(k8sQuantity kresource.Quantity) int64 { kiFloat := float64(k8sQuantity.Value()) / float64(1024) return int64(math.Round(kiFloat)) } +func ToKiRoundedStr(k8sQuantity kresource.Quantity) string { + return s.Int64(ToKiRounded(k8sQuantity)) + "Ki" +} +func (quantity *Quantity) ToKiRounded() int64 { + return ToKiRounded(quantity.Quantity) +} +func (quantity *Quantity) ToKiRoundedStr() string { + return ToKiRoundedStr(quantity.Quantity) +} + +func ToKiCeil(k8sQuantity kresource.Quantity) int64 { + kiFloat := float64(k8sQuantity.Value()) / float64(1024) + return int64(math.Ceil(kiFloat)) +} +func ToKiCeilStr(k8sQuantity kresource.Quantity) string { + return s.Int64(ToKiCeil(k8sQuantity)) + "Ki" +} +func (quantity *Quantity) ToKiCeil() int64 { + return ToKiCeil(quantity.Quantity) +} +func (quantity *Quantity) ToKiCeilStr() string { + return ToKiCeilStr(quantity.Quantity) +} + +func ToKiFloor(k8sQuantity kresource.Quantity) int64 { + kiFloat := float64(k8sQuantity.Value()) / float64(1024) + return int64(math.Floor(kiFloat)) +} +func ToKiFloorStr(k8sQuantity kresource.Quantity) string { + return s.Int64(ToKiFloor(k8sQuantity)) + "Ki" +} +func (quantity *Quantity) ToKiFloor() int64 { + return ToKiFloor(quantity.Quantity) +} +func (quantity *Quantity) ToKiFloorStr() string { + return ToKiFloorStr(quantity.Quantity) +} -func ToKiStr(k8sQuantity kresource.Quantity) string { - return s.Int64(ToKi(k8sQuantity)) + "Ki" +func ToMiRounded(k8sQuantity kresource.Quantity) int64 { + miFloat := float64(k8sQuantity.Value()) / float64(1024*1024) + return int64(math.Round(miFloat)) +} +func ToMiRoundedStr(k8sQuantity kresource.Quantity) string { + return s.Int64(ToMiRounded(k8sQuantity)) + "Mi" +} +func (quantity *Quantity) ToMiRounded() int64 { + return ToMiRounded(quantity.Quantity) +} +func (quantity *Quantity) ToMiRoundedStr() string { + return ToMiRoundedStr(quantity.Quantity) } -func (quantity *Quantity) ToKi() int64 { - return ToKi(quantity.Quantity) +func ToMiCeil(k8sQuantity kresource.Quantity) int64 { + miFloat := float64(k8sQuantity.Value()) / float64(1024*1024) + return int64(math.Ceil(miFloat)) +} +func ToMiCeilStr(k8sQuantity kresource.Quantity) string { + return s.Int64(ToMiCeil(k8sQuantity)) + "Mi" +} +func (quantity *Quantity) ToMiCeil() int64 { + return ToMiCeil(quantity.Quantity) +} +func (quantity *Quantity) ToMiCeilStr() string { + return ToMiCeilStr(quantity.Quantity) } -func (quantity *Quantity) ToKiStr() string { - return ToKiStr(quantity.Quantity) +func ToMiFloor(k8sQuantity kresource.Quantity) int64 { + miFloat := float64(k8sQuantity.Value()) / float64(1024*1024) + return int64(math.Floor(miFloat)) +} +func ToMiFloorStr(k8sQuantity kresource.Quantity) string { + return s.Int64(ToMiFloor(k8sQuantity)) + "Mi" +} +func (quantity *Quantity) ToMiFloor() int64 { + return ToMiFloor(quantity.Quantity) +} +func (quantity *Quantity) ToMiFloorStr() string { + return ToMiFloorStr(quantity.Quantity) } // SplitInTwo divides the quantity in two and return both halves (ensuring they add up to the original value) diff --git a/pkg/operator/resources/errors.go b/pkg/operator/resources/errors.go index 0c5b324cd4..0379b267b8 100644 --- a/pkg/operator/resources/errors.go +++ b/pkg/operator/resources/errors.go @@ -101,6 +101,6 @@ func ErrorAPIsNotDeployed(notDeployedAPIs []string) error { func ErrorInvalidNodeGroupSelector(selected string, availableNodeGroups []string) error { return errors.WithStack(&errors.Error{ Kind: ErrInvalidNodeGroupSelector, - Message: fmt.Sprintf("node group %s doesn't exist; remove the node group selector to let Cortex determine automatically where to place the API or specify a valid node group name (%s)", selected, s.StrsOr(availableNodeGroups)), + Message: fmt.Sprintf("node group \"%s\" doesn't exist; remove the node group selector to let Cortex determine automatically where to place the API, or specify a valid node group name (%s)", selected, s.StrsOr(availableNodeGroups)), }) } diff --git a/pkg/operator/resources/validations.go b/pkg/operator/resources/validations.go index be0817f4e1..3621bad931 100644 --- a/pkg/operator/resources/validations.go +++ b/pkg/operator/resources/validations.go @@ -92,7 +92,7 @@ func ValidateClusterAPIs(apis []userconfig.API) error { api := &apis[i] if api.Kind != userconfig.TrafficSplitterKind { if err := validateK8sCompute(api, maxMemMap); err != nil { - return err + return errors.Wrap(err, api.Identify()) } } } @@ -144,7 +144,7 @@ func validateK8sCompute(api *userconfig.API, maxMemMap map[string]kresource.Quan clusterNodeGroupNames := strset.New(config.ClusterConfig.GetNodeGroupNames()...) for _, ngName := range api.NodeGroups { if !clusterNodeGroupNames.Has(ngName) { - return ErrorInvalidNodeGroupSelector(ngName, config.ClusterConfig.GetNodeGroupNames()) + return errors.Wrap(ErrorInvalidNodeGroupSelector(ngName, config.ClusterConfig.GetNodeGroupNames()), userconfig.NodeGroupsKey) } } @@ -183,11 +183,11 @@ func podResourceRequestsTable(api *userconfig.API, compute userconfig.Compute) s sidecarCPUNote := "" sidecarMemNote := "" if api.Kind == userconfig.RealtimeAPIKind { - sidecarCPUNote = fmt.Sprintf(" (including %s CPU for the %s sidecar container)", consts.CortexProxyCPU.String(), workloads.ProxyContainerName) - sidecarMemNote = fmt.Sprintf(" (including %s memory for the %s sidecar container)", k8s.ToKiStr(consts.CortexProxyMem), workloads.ProxyContainerName) + sidecarCPUNote = fmt.Sprintf(" (including %s for the %s sidecar container)", consts.CortexProxyCPU.String(), workloads.ProxyContainerName) + sidecarMemNote = fmt.Sprintf(" (including %s for the %s sidecar container)", k8s.ToMiCeilStr(consts.CortexProxyMem), workloads.ProxyContainerName) } else if api.Kind == userconfig.AsyncAPIKind || api.Kind == userconfig.BatchAPIKind { - sidecarCPUNote = fmt.Sprintf(" (including %s CPU for the %s sidecar container)", consts.CortexDequeuerCPU.String(), workloads.DequeuerContainerName) - sidecarMemNote = fmt.Sprintf(" (including %s memory for the %s sidecar container)", k8s.ToKiStr(consts.CortexDequeuerMem), workloads.DequeuerContainerName) + sidecarCPUNote = fmt.Sprintf(" (including %s for the %s sidecar container)", consts.CortexDequeuerCPU.String(), workloads.DequeuerContainerName) + sidecarMemNote = fmt.Sprintf(" (including %s for the %s sidecar container)", k8s.ToMiCeilStr(consts.CortexDequeuerMem), workloads.DequeuerContainerName) } var items table.KeyValuePairs @@ -195,7 +195,7 @@ func podResourceRequestsTable(api *userconfig.API, compute userconfig.Compute) s items.Add("CPU", compute.CPU.String()+sidecarCPUNote) } if compute.Mem != nil { - items.Add("memory", compute.Mem.ToKiStr()+sidecarMemNote) + items.Add("memory", compute.Mem.ToMiCeilStr()+sidecarMemNote) } if compute.GPU > 0 { items.Add("GPU", compute.GPU) @@ -232,7 +232,7 @@ func nodeGroupResourcesTable(api *userconfig.API, compute userconfig.Compute, ma if api.NodeGroups != nil && !slices.HasString(api.NodeGroups, ng.Name) { skippedNodeGroups = append(skippedNodeGroups, ng.Name) } else { - nodeGroupResourceRows = append(nodeGroupResourceRows, []interface{}{ng.Name, ng.InstanceType, nodeCPU, nodeMem, nodeGPU, nodeInf}) + nodeGroupResourceRows = append(nodeGroupResourceRows, []interface{}{ng.Name, ng.InstanceType, nodeCPU, k8s.ToMiFloorStr(nodeMem), nodeGPU, nodeInf}) } } From 562598012ce9d6026b366e1ab9ca3456cc7b39cb Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Wed, 16 Jun 2021 17:32:53 -0700 Subject: [PATCH 6/7] Use power of 2 resource requests in examples --- test/apis/async/hello-world/cortex_cpu.yaml | 2 +- test/apis/async/text-generator/cortex_cpu.yaml | 2 +- test/apis/async/text-generator/cortex_gpu.yaml | 2 +- test/apis/batch/image-classifier-alexnet/cortex_cpu.yaml | 2 +- test/apis/realtime/hello-world/cortex_cpu.yaml | 2 +- .../apis/realtime/image-classifier-resnet50/cortex_cpu.yaml | 2 +- test/apis/realtime/prime-generator/cortex_cpu.yaml | 2 +- test/apis/realtime/sleep/cortex_cpu.yaml | 2 +- test/apis/realtime/text-generator/cortex_cpu.yaml | 2 +- test/apis/realtime/text-generator/cortex_gpu.yaml | 2 +- test/apis/trafficsplitter/hello-world/cortex_cpu.yaml | 6 +++--- 11 files changed, 13 insertions(+), 13 deletions(-) diff --git a/test/apis/async/hello-world/cortex_cpu.yaml b/test/apis/async/hello-world/cortex_cpu.yaml index 4a1143af7b..9348aeff99 100644 --- a/test/apis/async/hello-world/cortex_cpu.yaml +++ b/test/apis/async/hello-world/cortex_cpu.yaml @@ -11,4 +11,4 @@ port: 8080 compute: cpu: 200m - mem: 128M + mem: 128Mi diff --git a/test/apis/async/text-generator/cortex_cpu.yaml b/test/apis/async/text-generator/cortex_cpu.yaml index d78afacdcb..29f632f6ba 100644 --- a/test/apis/async/text-generator/cortex_cpu.yaml +++ b/test/apis/async/text-generator/cortex_cpu.yaml @@ -11,4 +11,4 @@ port: 8080 compute: cpu: 1 - mem: 2.5G + mem: 2.5Gi diff --git a/test/apis/async/text-generator/cortex_gpu.yaml b/test/apis/async/text-generator/cortex_gpu.yaml index a18a8bc612..fa72ac6708 100644 --- a/test/apis/async/text-generator/cortex_gpu.yaml +++ b/test/apis/async/text-generator/cortex_gpu.yaml @@ -14,4 +14,4 @@ compute: cpu: 1 gpu: 1 - mem: 512M + mem: 512Mi diff --git a/test/apis/batch/image-classifier-alexnet/cortex_cpu.yaml b/test/apis/batch/image-classifier-alexnet/cortex_cpu.yaml index a96c29b861..0e23c3653c 100644 --- a/test/apis/batch/image-classifier-alexnet/cortex_cpu.yaml +++ b/test/apis/batch/image-classifier-alexnet/cortex_cpu.yaml @@ -11,4 +11,4 @@ port: 8080 compute: cpu: 1 - mem: 2G + mem: 2Gi diff --git a/test/apis/realtime/hello-world/cortex_cpu.yaml b/test/apis/realtime/hello-world/cortex_cpu.yaml index 3edc98541d..fd36e6e3d1 100644 --- a/test/apis/realtime/hello-world/cortex_cpu.yaml +++ b/test/apis/realtime/hello-world/cortex_cpu.yaml @@ -12,4 +12,4 @@ port: 8080 compute: cpu: 200m - mem: 128M + mem: 128Mi diff --git a/test/apis/realtime/image-classifier-resnet50/cortex_cpu.yaml b/test/apis/realtime/image-classifier-resnet50/cortex_cpu.yaml index 8e74412630..c687163840 100644 --- a/test/apis/realtime/image-classifier-resnet50/cortex_cpu.yaml +++ b/test/apis/realtime/image-classifier-resnet50/cortex_cpu.yaml @@ -11,4 +11,4 @@ command: ["tfs_model_status_probe", "-addr", "localhost:8500", "-model-name", "resnet50"] compute: cpu: 1 - mem: 2G + mem: 2Gi diff --git a/test/apis/realtime/prime-generator/cortex_cpu.yaml b/test/apis/realtime/prime-generator/cortex_cpu.yaml index 146c78267a..70f3ae3ab2 100644 --- a/test/apis/realtime/prime-generator/cortex_cpu.yaml +++ b/test/apis/realtime/prime-generator/cortex_cpu.yaml @@ -12,4 +12,4 @@ port: 8080 compute: cpu: 200m - mem: 128M + mem: 128Mi diff --git a/test/apis/realtime/sleep/cortex_cpu.yaml b/test/apis/realtime/sleep/cortex_cpu.yaml index 8b27f7e92d..029c847e46 100644 --- a/test/apis/realtime/sleep/cortex_cpu.yaml +++ b/test/apis/realtime/sleep/cortex_cpu.yaml @@ -13,6 +13,6 @@ port: 8080 compute: cpu: 200m - mem: 128M + mem: 128Mi autoscaling: target_in_flight: 1 diff --git a/test/apis/realtime/text-generator/cortex_cpu.yaml b/test/apis/realtime/text-generator/cortex_cpu.yaml index c31bb959b0..3b1fba338b 100644 --- a/test/apis/realtime/text-generator/cortex_cpu.yaml +++ b/test/apis/realtime/text-generator/cortex_cpu.yaml @@ -12,4 +12,4 @@ port: 8080 compute: cpu: 1 - mem: 2.5G + mem: 2.5Gi diff --git a/test/apis/realtime/text-generator/cortex_gpu.yaml b/test/apis/realtime/text-generator/cortex_gpu.yaml index a0659ff234..cc694a372f 100644 --- a/test/apis/realtime/text-generator/cortex_gpu.yaml +++ b/test/apis/realtime/text-generator/cortex_gpu.yaml @@ -15,4 +15,4 @@ compute: cpu: 1 gpu: 1 - mem: 512M + mem: 512Mi diff --git a/test/apis/trafficsplitter/hello-world/cortex_cpu.yaml b/test/apis/trafficsplitter/hello-world/cortex_cpu.yaml index 20292f1fc6..f445b6bda2 100644 --- a/test/apis/trafficsplitter/hello-world/cortex_cpu.yaml +++ b/test/apis/trafficsplitter/hello-world/cortex_cpu.yaml @@ -14,7 +14,7 @@ port: 8080 compute: cpu: 200m - mem: 128M + mem: 128Mi - name: hello-world-b kind: RealtimeAPI @@ -32,7 +32,7 @@ port: 8080 compute: cpu: 200m - mem: 128M + mem: 128Mi - name: hello-world-shadow kind: RealtimeAPI @@ -50,7 +50,7 @@ port: 8080 compute: cpu: 200m - mem: 128M + mem: 128Mi - name: hello-world kind: TrafficSplitter From 58e49493ffde24781754d1007c37e3c7f6fe1d27 Mon Sep 17 00:00:00 2001 From: David Eliahu Date: Thu, 17 Jun 2021 10:59:43 -0700 Subject: [PATCH 7/7] Address PR comments --- pkg/lib/k8s/quantity.go | 23 ------- pkg/operator/resources/errors.go | 98 +++++++++++++++++++++++++-- pkg/operator/resources/validations.go | 90 +----------------------- pkg/workloads/k8s.go | 5 +- 4 files changed, 97 insertions(+), 119 deletions(-) diff --git a/pkg/lib/k8s/quantity.go b/pkg/lib/k8s/quantity.go index 90e3dba3b5..fc32cfe4fe 100644 --- a/pkg/lib/k8s/quantity.go +++ b/pkg/lib/k8s/quantity.go @@ -203,29 +203,6 @@ func (quantity *Quantity) ToMiFloorStr() string { return ToMiFloorStr(quantity.Quantity) } -// SplitInTwo divides the quantity in two and return both halves (ensuring they add up to the original value) -func (quantity *Quantity) SplitInTwo() (*kresource.Quantity, *kresource.Quantity) { - return SplitInTwo(&quantity.Quantity) -} - -// SplitInTwo divides the quantity in two and return both halves (ensuring they add up to the original value) -func SplitInTwo(quantity *kresource.Quantity) (*kresource.Quantity, *kresource.Quantity) { - milliValue := quantity.MilliValue() - halfMilliValue := milliValue / 2 - q1 := kresource.NewMilliQuantity(milliValue-halfMilliValue, kresource.DecimalSI) - q2 := kresource.NewMilliQuantity(halfMilliValue, kresource.DecimalSI) - return q1, q2 -} - -func SplitInThree(quantity *kresource.Quantity) (*kresource.Quantity, *kresource.Quantity, *kresource.Quantity) { - milliValue := quantity.MilliValue() - thirdMilliValue := milliValue / 3 - q1 := kresource.NewMilliQuantity(milliValue-2*thirdMilliValue, kresource.DecimalSI) - q2 := kresource.NewMilliQuantity(thirdMilliValue, kresource.DecimalSI) - q3 := kresource.NewMilliQuantity(thirdMilliValue, kresource.DecimalSI) - return q1, q2, q3 -} - func (quantity *Quantity) Sub(q2 kresource.Quantity) { quantity.Quantity.Sub(q2) quantity.UserString = "" diff --git a/pkg/operator/resources/errors.go b/pkg/operator/resources/errors.go index 0379b267b8..a5e4748b5a 100644 --- a/pkg/operator/resources/errors.go +++ b/pkg/operator/resources/errors.go @@ -18,12 +18,20 @@ package resources import ( "fmt" + "strings" + "github.com/cortexlabs/cortex/pkg/config" + "github.com/cortexlabs/cortex/pkg/consts" + "github.com/cortexlabs/cortex/pkg/lib/console" "github.com/cortexlabs/cortex/pkg/lib/errors" - "github.com/cortexlabs/cortex/pkg/lib/strings" + "github.com/cortexlabs/cortex/pkg/lib/k8s" + "github.com/cortexlabs/cortex/pkg/lib/slices" s "github.com/cortexlabs/cortex/pkg/lib/strings" + "github.com/cortexlabs/cortex/pkg/lib/table" "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/types/userconfig" + "github.com/cortexlabs/cortex/pkg/workloads" + kresource "k8s.io/apimachinery/pkg/api/resource" ) const ( @@ -73,7 +81,12 @@ func ErrorCannotChangeKindOfDeployedAPI(name string, newKind, prevKind userconfi }) } -func ErrorNoAvailableNodeComputeLimit(msg string) error { +func ErrorNoAvailableNodeComputeLimit(api *userconfig.API, compute userconfig.Compute, maxMemMap map[string]kresource.Quantity) error { + msg := "no instance types in your cluster are large enough to satisfy the requested resources for your pod\n\n" + msg += console.Bold("requested pod resources\n") + msg += podResourceRequestsTable(api, compute) + msg += "\n" + s.TrimTrailingNewLines(nodeGroupResourcesTable(api, compute, maxMemMap)) + return errors.WithStack(&errors.Error{ Kind: ErrNoAvailableNodeComputeLimit, Message: msg, @@ -83,12 +96,12 @@ func ErrorNoAvailableNodeComputeLimit(msg string) error { func ErrorAPIUsedByTrafficSplitter(trafficSplitters []string) error { return errors.WithStack(&errors.Error{ Kind: ErrRealtimeAPIUsedByTrafficSplitter, - Message: fmt.Sprintf("cannot delete api because it is used by the following %s: %s", strings.PluralS("TrafficSplitter", len(trafficSplitters)), strings.StrsSentence(trafficSplitters, "")), + Message: fmt.Sprintf("cannot delete api because it is used by the following %s: %s", s.PluralS("TrafficSplitter", len(trafficSplitters)), s.StrsSentence(trafficSplitters, "")), }) } func ErrorAPIsNotDeployed(notDeployedAPIs []string) error { - message := fmt.Sprintf("apis %s were either not found or are not RealtimeAPIs", strings.StrsAnd(notDeployedAPIs)) + message := fmt.Sprintf("apis %s were either not found or are not RealtimeAPIs", s.StrsAnd(notDeployedAPIs)) if len(notDeployedAPIs) == 1 { message = fmt.Sprintf("api %s was either not found or is not a RealtimeAPI", notDeployedAPIs[0]) } @@ -104,3 +117,80 @@ func ErrorInvalidNodeGroupSelector(selected string, availableNodeGroups []string Message: fmt.Sprintf("node group \"%s\" doesn't exist; remove the node group selector to let Cortex determine automatically where to place the API, or specify a valid node group name (%s)", selected, s.StrsOr(availableNodeGroups)), }) } + +func podResourceRequestsTable(api *userconfig.API, compute userconfig.Compute) string { + sidecarCPUNote := "" + sidecarMemNote := "" + if api.Kind == userconfig.RealtimeAPIKind { + sidecarCPUNote = fmt.Sprintf(" (including %s for the %s sidecar container)", consts.CortexProxyCPU.String(), workloads.ProxyContainerName) + sidecarMemNote = fmt.Sprintf(" (including %s for the %s sidecar container)", k8s.ToMiCeilStr(consts.CortexProxyMem), workloads.ProxyContainerName) + } else if api.Kind == userconfig.AsyncAPIKind || api.Kind == userconfig.BatchAPIKind { + sidecarCPUNote = fmt.Sprintf(" (including %s for the %s sidecar container)", consts.CortexDequeuerCPU.String(), workloads.DequeuerContainerName) + sidecarMemNote = fmt.Sprintf(" (including %s for the %s sidecar container)", k8s.ToMiCeilStr(consts.CortexDequeuerMem), workloads.DequeuerContainerName) + } + + var items table.KeyValuePairs + if compute.CPU != nil { + items.Add("CPU", compute.CPU.String()+sidecarCPUNote) + } + if compute.Mem != nil { + items.Add("memory", compute.Mem.ToMiCeilStr()+sidecarMemNote) + } + if compute.GPU > 0 { + items.Add("GPU", compute.GPU) + } + if compute.Inf > 0 { + items.Add("Inf", compute.Inf) + } + + return items.String() +} + +func nodeGroupResourcesTable(api *userconfig.API, compute userconfig.Compute, maxMemMap map[string]kresource.Quantity) string { + var skippedNodeGroups []string + var nodeGroupResourceRows [][]interface{} + + showGPU := false + showInf := false + if compute.GPU > 0 { + showGPU = true + } + if compute.Inf > 0 { + showInf = true + } + + for _, ng := range config.ClusterConfig.NodeGroups { + nodeCPU, nodeMem, nodeGPU, nodeInf := getNodeCapacity(ng.InstanceType, maxMemMap) + if nodeGPU > 0 { + showGPU = true + } + if nodeInf > 0 { + showInf = true + } + + if api.NodeGroups != nil && !slices.HasString(api.NodeGroups, ng.Name) { + skippedNodeGroups = append(skippedNodeGroups, ng.Name) + } else { + nodeGroupResourceRows = append(nodeGroupResourceRows, []interface{}{ng.Name, ng.InstanceType, nodeCPU, k8s.ToMiFloorStr(nodeMem), nodeGPU, nodeInf}) + } + } + + nodeGroupResourceRowsTable := table.Table{ + Headers: []table.Header{ + {Title: "node group"}, + {Title: "instance type"}, + {Title: "CPU"}, + {Title: "memory"}, + {Title: "GPU", Hidden: !showGPU}, + {Title: "Inf", Hidden: !showInf}, + }, + Rows: nodeGroupResourceRows, + } + + out := nodeGroupResourceRowsTable.MustFormat() + if len(skippedNodeGroups) > 0 { + out += fmt.Sprintf("\nthe following %s skipped (based on the api configuration's %s field): %s", s.PluralCustom("node group was", "node groups were", len(skippedNodeGroups)), userconfig.NodeGroupsKey, strings.Join(skippedNodeGroups, ", ")) + } + + return out +} diff --git a/pkg/operator/resources/validations.go b/pkg/operator/resources/validations.go index 3621bad931..d9d90bb649 100644 --- a/pkg/operator/resources/validations.go +++ b/pkg/operator/resources/validations.go @@ -17,23 +17,16 @@ limitations under the License. package resources import ( - "fmt" - "strings" - "github.com/cortexlabs/cortex/pkg/config" - "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/aws" - "github.com/cortexlabs/cortex/pkg/lib/console" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/k8s" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" "github.com/cortexlabs/cortex/pkg/lib/slices" s "github.com/cortexlabs/cortex/pkg/lib/strings" - "github.com/cortexlabs/cortex/pkg/lib/table" "github.com/cortexlabs/cortex/pkg/operator/operator" "github.com/cortexlabs/cortex/pkg/types/spec" "github.com/cortexlabs/cortex/pkg/types/userconfig" - "github.com/cortexlabs/cortex/pkg/workloads" istioclientnetworking "istio.io/client-go/pkg/apis/networking/v1beta1" kresource "k8s.io/apimachinery/pkg/api/resource" ) @@ -172,88 +165,7 @@ func validateK8sCompute(api *userconfig.API, maxMemMap map[string]kresource.Quan } // no nodegroups have capacity - errMsg := "no instance types in your cluster are large enough to satisfy the requested resources for your pod\n\n" - errMsg += console.Bold("requested pod resources\n") - errMsg += podResourceRequestsTable(api, compute) - errMsg += "\n" + s.TrimTrailingNewLines(nodeGroupResourcesTable(api, compute, maxMemMap)) - return ErrorNoAvailableNodeComputeLimit(errMsg) -} - -func podResourceRequestsTable(api *userconfig.API, compute userconfig.Compute) string { - sidecarCPUNote := "" - sidecarMemNote := "" - if api.Kind == userconfig.RealtimeAPIKind { - sidecarCPUNote = fmt.Sprintf(" (including %s for the %s sidecar container)", consts.CortexProxyCPU.String(), workloads.ProxyContainerName) - sidecarMemNote = fmt.Sprintf(" (including %s for the %s sidecar container)", k8s.ToMiCeilStr(consts.CortexProxyMem), workloads.ProxyContainerName) - } else if api.Kind == userconfig.AsyncAPIKind || api.Kind == userconfig.BatchAPIKind { - sidecarCPUNote = fmt.Sprintf(" (including %s for the %s sidecar container)", consts.CortexDequeuerCPU.String(), workloads.DequeuerContainerName) - sidecarMemNote = fmt.Sprintf(" (including %s for the %s sidecar container)", k8s.ToMiCeilStr(consts.CortexDequeuerMem), workloads.DequeuerContainerName) - } - - var items table.KeyValuePairs - if compute.CPU != nil { - items.Add("CPU", compute.CPU.String()+sidecarCPUNote) - } - if compute.Mem != nil { - items.Add("memory", compute.Mem.ToMiCeilStr()+sidecarMemNote) - } - if compute.GPU > 0 { - items.Add("GPU", compute.GPU) - } - if compute.Inf > 0 { - items.Add("Inf", compute.Inf) - } - - return items.String() -} - -func nodeGroupResourcesTable(api *userconfig.API, compute userconfig.Compute, maxMemMap map[string]kresource.Quantity) string { - var skippedNodeGroups []string - var nodeGroupResourceRows [][]interface{} - - showGPU := false - showInf := false - if compute.GPU > 0 { - showGPU = true - } - if compute.Inf > 0 { - showInf = true - } - - for _, ng := range config.ClusterConfig.NodeGroups { - nodeCPU, nodeMem, nodeGPU, nodeInf := getNodeCapacity(ng.InstanceType, maxMemMap) - if nodeGPU > 0 { - showGPU = true - } - if nodeInf > 0 { - showInf = true - } - - if api.NodeGroups != nil && !slices.HasString(api.NodeGroups, ng.Name) { - skippedNodeGroups = append(skippedNodeGroups, ng.Name) - } else { - nodeGroupResourceRows = append(nodeGroupResourceRows, []interface{}{ng.Name, ng.InstanceType, nodeCPU, k8s.ToMiFloorStr(nodeMem), nodeGPU, nodeInf}) - } - } - - nodeGroupResourceRowsTable := table.Table{ - Headers: []table.Header{ - {Title: "node group"}, - {Title: "instance type"}, - {Title: "CPU"}, - {Title: "memory"}, - {Title: "GPU", Hidden: !showGPU}, - {Title: "Inf", Hidden: !showInf}, - }, - Rows: nodeGroupResourceRows, - } - - out := nodeGroupResourceRowsTable.MustFormat() - if len(skippedNodeGroups) > 0 { - out += fmt.Sprintf("\nthe following %s skipped (based on the api configuration's %s field): %s", s.PluralCustom("node group was", "node groups were", len(skippedNodeGroups)), userconfig.NodeGroupsKey, strings.Join(skippedNodeGroups, ", ")) - } - - return out + return ErrorNoAvailableNodeComputeLimit(api, compute, maxMemMap) } func getNodeCapacity(instanceType string, maxMemMap map[string]kresource.Quantity) (kresource.Quantity, kresource.Quantity, int64, int64) { diff --git a/pkg/workloads/k8s.go b/pkg/workloads/k8s.go index c4ebaa2e60..738a59c96a 100644 --- a/pkg/workloads/k8s.go +++ b/pkg/workloads/k8s.go @@ -45,10 +45,9 @@ const ( _emptyDirVolumeName = "mnt" _emptyDirMountPath = "/mnt" - _gatewayContainerName = "gateway" - ProxyContainerName = "proxy" DequeuerContainerName = "dequeuer" + GatewayContainerName = "gateway" _kubexitGraveyardName = "graveyard" _kubexitGraveyardMountPath = "/graveyard" @@ -73,7 +72,7 @@ var ( func AsyncGatewayContainer(api spec.API, queueURL string, volumeMounts []kcore.VolumeMount) kcore.Container { return kcore.Container{ - Name: _gatewayContainerName, + Name: GatewayContainerName, Image: config.ClusterConfig.ImageAsyncGateway, ImagePullPolicy: kcore.PullAlways, Args: []string{