Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/workloads/async/containers.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ Your API pod can contain multiple containers, only one of which can be listening

The `/mnt` directory is mounted to each container's filesystem, and is shared across all containers.

## Resource requests

Each container in the pod requests its own amount of CPU, memory, GPU, and Inferentia resources. In addition, Cortex's dequeuer sidecar container (which is automatically added to the pod) requests 100m CPU and 100Mi memory.

## Observability

See docs for [logging](../../clusters/observability/logging.md), [metrics](../../clusters/observability/metrics.md), and [alerting](../../clusters/observability/metrics.md).
Expand Down
4 changes: 4 additions & 0 deletions docs/workloads/batch/containers.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ Your API pod can contain multiple containers, only one of which can be listening

The `/mnt` directory is mounted to each container's filesystem, and is shared across all containers.

## Resource requests

Each container in the pod requests its own amount of CPU, memory, GPU, and Inferentia resources. In addition, Cortex's dequeuer sidecar container (which is automatically added to the pod) requests 100m CPU and 100Mi memory.

## Observability

See docs for [logging](../../clusters/observability/logging.md), [metrics](../../clusters/observability/metrics.md), and [alerting](../../clusters/observability/metrics.md).
Expand Down
4 changes: 4 additions & 0 deletions docs/workloads/realtime/containers.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ Your API pod can contain multiple containers, only one of which can be listening

The `/mnt` directory is mounted to each container's file system, and is shared across all containers.

## Resource requests

Each container in the pod requests its own amount of CPU, memory, GPU, and Inferentia resources. In addition, Cortex's proxy sidecar container (which is automatically added to the pod) requests 100m CPU and 100Mi memory.

## Observability

See docs for [logging](../../clusters/observability/logging.md), [metrics](../../clusters/observability/metrics.md), and [alerting](../../clusters/observability/metrics.md).
Expand Down
7 changes: 1 addition & 6 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ import (
var (
OperatorMetadata *clusterconfig.OperatorMetadata

ClusterConfig *clusterconfig.Config
InstancesMetadata []aws.InstanceMetadata
ClusterConfig *clusterconfig.Config

AWS *aws.Client
K8s *k8s.Client
Expand Down Expand Up @@ -92,10 +91,6 @@ func Init() error {

ClusterConfig = clusterConfig

for _, instanceType := range clusterConfig.GetAllInstanceTypes() {
InstancesMetadata = append(InstancesMetadata, aws.InstanceMetadatas[clusterConfig.Region][instanceType])
}

AWS, err = aws.NewForRegion(clusterConfig.Region)
if err != nil {
return err
Expand Down
7 changes: 7 additions & 0 deletions pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package consts

import (
"os"

kresource "k8s.io/apimachinery/pkg/api/resource"
)

var (
Expand All @@ -37,6 +39,11 @@ var (
AdminPortInt32 = int32(15000)
AuthHeader = "X-Cortex-Authorization"

CortexProxyCPU = kresource.MustParse("100m")
CortexProxyMem = kresource.MustParse("100Mi")
CortexDequeuerCPU = kresource.MustParse("100m")
CortexDequeuerMem = kresource.MustParse("100Mi")

DefaultInClusterConfigPath = "/configs/cluster/cluster.yaml"
MaxBucketLifecycleRules = 100
AsyncWorkloadsExpirationDays = int64(7)
Expand Down
117 changes: 98 additions & 19 deletions pkg/lib/k8s/quantity.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,22 @@ func NewMilliQuantity(milliValue int64) *Quantity {
}
}

// Returns nil if no quantities are passed in
func NewSummed(quantities ...kresource.Quantity) *Quantity {
if len(quantities) == 0 {
return nil
}

k8sQuantity := kresource.Quantity{}
for _, q := range quantities {
k8sQuantity.Add(q)
}

return &Quantity{
Quantity: k8sQuantity,
}
}

func (quantity *Quantity) MilliString() string {
return s.Int64(quantity.Quantity.MilliValue()) + "m"
}
Expand All @@ -103,32 +119,88 @@ func (quantity *Quantity) ToFloat32() float32 {
return float32(quantity.Quantity.MilliValue()) / float32(1000)
}

func (quantity *Quantity) ToKi() int64 {
kiFloat := float64(quantity.Quantity.Value()) / float64(1024)
func ToKiRounded(k8sQuantity kresource.Quantity) int64 {
kiFloat := float64(k8sQuantity.Value()) / float64(1024)
return int64(math.Round(kiFloat))
}
func ToKiRoundedStr(k8sQuantity kresource.Quantity) string {
return s.Int64(ToKiRounded(k8sQuantity)) + "Ki"
}
func (quantity *Quantity) ToKiRounded() int64 {
return ToKiRounded(quantity.Quantity)
}
func (quantity *Quantity) ToKiRoundedStr() string {
return ToKiRoundedStr(quantity.Quantity)
}

func ToKiCeil(k8sQuantity kresource.Quantity) int64 {
kiFloat := float64(k8sQuantity.Value()) / float64(1024)
return int64(math.Ceil(kiFloat))
}
func ToKiCeilStr(k8sQuantity kresource.Quantity) string {
return s.Int64(ToKiCeil(k8sQuantity)) + "Ki"
}
func (quantity *Quantity) ToKiCeil() int64 {
return ToKiCeil(quantity.Quantity)
}
func (quantity *Quantity) ToKiCeilStr() string {
return ToKiCeilStr(quantity.Quantity)
}

// SplitInTwo divides the quantity in two and return both halves (ensuring they add up to the original value)
func (quantity *Quantity) SplitInTwo() (*kresource.Quantity, *kresource.Quantity) {
return SplitInTwo(&quantity.Quantity)
func ToKiFloor(k8sQuantity kresource.Quantity) int64 {
kiFloat := float64(k8sQuantity.Value()) / float64(1024)
return int64(math.Floor(kiFloat))
}
func ToKiFloorStr(k8sQuantity kresource.Quantity) string {
return s.Int64(ToKiFloor(k8sQuantity)) + "Ki"
}
func (quantity *Quantity) ToKiFloor() int64 {
return ToKiFloor(quantity.Quantity)
}
func (quantity *Quantity) ToKiFloorStr() string {
return ToKiFloorStr(quantity.Quantity)
}

// SplitInTwo divides the quantity in two and return both halves (ensuring they add up to the original value)
func SplitInTwo(quantity *kresource.Quantity) (*kresource.Quantity, *kresource.Quantity) {
milliValue := quantity.MilliValue()
halfMilliValue := milliValue / 2
q1 := kresource.NewMilliQuantity(milliValue-halfMilliValue, kresource.DecimalSI)
q2 := kresource.NewMilliQuantity(halfMilliValue, kresource.DecimalSI)
return q1, q2
func ToMiRounded(k8sQuantity kresource.Quantity) int64 {
miFloat := float64(k8sQuantity.Value()) / float64(1024*1024)
return int64(math.Round(miFloat))
}
func ToMiRoundedStr(k8sQuantity kresource.Quantity) string {
return s.Int64(ToMiRounded(k8sQuantity)) + "Mi"
}
func (quantity *Quantity) ToMiRounded() int64 {
return ToMiRounded(quantity.Quantity)
}
func (quantity *Quantity) ToMiRoundedStr() string {
return ToMiRoundedStr(quantity.Quantity)
}

func SplitInThree(quantity *kresource.Quantity) (*kresource.Quantity, *kresource.Quantity, *kresource.Quantity) {
milliValue := quantity.MilliValue()
thirdMilliValue := milliValue / 3
q1 := kresource.NewMilliQuantity(milliValue-2*thirdMilliValue, kresource.DecimalSI)
q2 := kresource.NewMilliQuantity(thirdMilliValue, kresource.DecimalSI)
q3 := kresource.NewMilliQuantity(thirdMilliValue, kresource.DecimalSI)
return q1, q2, q3
func ToMiCeil(k8sQuantity kresource.Quantity) int64 {
miFloat := float64(k8sQuantity.Value()) / float64(1024*1024)
return int64(math.Ceil(miFloat))
}
func ToMiCeilStr(k8sQuantity kresource.Quantity) string {
return s.Int64(ToMiCeil(k8sQuantity)) + "Mi"
}
func (quantity *Quantity) ToMiCeil() int64 {
return ToMiCeil(quantity.Quantity)
}
func (quantity *Quantity) ToMiCeilStr() string {
return ToMiCeilStr(quantity.Quantity)
}

func ToMiFloor(k8sQuantity kresource.Quantity) int64 {
miFloat := float64(k8sQuantity.Value()) / float64(1024*1024)
return int64(math.Floor(miFloat))
}
func ToMiFloorStr(k8sQuantity kresource.Quantity) string {
return s.Int64(ToMiFloor(k8sQuantity)) + "Mi"
}
func (quantity *Quantity) ToMiFloor() int64 {
return ToMiFloor(quantity.Quantity)
}
func (quantity *Quantity) ToMiFloorStr() string {
return ToMiFloorStr(quantity.Quantity)
}

func (quantity *Quantity) Sub(q2 kresource.Quantity) {
Expand Down Expand Up @@ -169,6 +241,13 @@ func (quantity *Quantity) ID() string {
return s.Int64(quantity.MilliValue())
}

func (quantity *Quantity) DeepCopy() Quantity {
return Quantity{
Quantity: quantity.Quantity.DeepCopy(),
UserString: quantity.UserString,
}
}

func QuantityPtr(k8sQuantity kresource.Quantity) *kresource.Quantity {
return &k8sQuantity
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/operator/endpoints/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ func Info(w http.ResponseWriter, r *http.Request) {
}

fullClusterConfig := clusterconfig.InternalConfig{
Config: *config.ClusterConfig,
OperatorMetadata: *config.OperatorMetadata,
InstancesMetadata: config.InstancesMetadata,
Config: *config.ClusterConfig,
OperatorMetadata: *config.OperatorMetadata,
}

response := schema.InfoResponse{
Expand Down
8 changes: 5 additions & 3 deletions pkg/operator/operator/memory_capacity.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package operator

import (
"github.com/cortexlabs/cortex/pkg/config"
"github.com/cortexlabs/cortex/pkg/lib/aws"
"github.com/cortexlabs/cortex/pkg/lib/k8s"
"github.com/cortexlabs/cortex/pkg/lib/slices"
kresource "k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -104,9 +105,10 @@ func UpdateMemoryCapacityConfigMap() (map[string]kresource.Quantity, error) {
primaryInstances := []string{}

minMemMap := map[string]kresource.Quantity{}
for _, instanceMetadata := range config.InstancesMetadata {
minMemMap[instanceMetadata.Type] = instanceMetadata.Memory
primaryInstances = append(primaryInstances, instanceMetadata.Type)
for _, ng := range config.ClusterConfig.NodeGroups {
instanceMetadata := aws.InstanceMetadatas[config.ClusterConfig.Region][ng.InstanceType]
minMemMap[ng.InstanceType] = instanceMetadata.Memory
primaryInstances = append(primaryInstances, ng.InstanceType)
}

nodeMemCapacityMap, err := getMemoryCapacityFromNodes(primaryInstances)
Expand Down
106 changes: 96 additions & 10 deletions pkg/operator/resources/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ package resources

import (
"fmt"
"strings"

"github.com/cortexlabs/cortex/pkg/config"
"github.com/cortexlabs/cortex/pkg/consts"
"github.com/cortexlabs/cortex/pkg/lib/console"
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/strings"
"github.com/cortexlabs/cortex/pkg/lib/k8s"
"github.com/cortexlabs/cortex/pkg/lib/slices"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
"github.com/cortexlabs/cortex/pkg/lib/table"
"github.com/cortexlabs/cortex/pkg/operator/operator"
"github.com/cortexlabs/cortex/pkg/types/userconfig"
"github.com/cortexlabs/cortex/pkg/workloads"
kresource "k8s.io/apimachinery/pkg/api/resource"
)

const (
Expand Down Expand Up @@ -73,26 +81,27 @@ func ErrorCannotChangeKindOfDeployedAPI(name string, newKind, prevKind userconfi
})
}

func ErrorNoAvailableNodeComputeLimit(resource string, reqStr string, maxStr string) error {
message := fmt.Sprintf("no instances can satisfy the requested %s quantity - requested %s %s but instances only have %s %s available", resource, reqStr, resource, maxStr, resource)
if maxStr == "0" {
message = fmt.Sprintf("no instances can satisfy the requested %s quantity - requested %s %s but instances don't have any %s", resource, reqStr, resource, resource)
}
func ErrorNoAvailableNodeComputeLimit(api *userconfig.API, compute userconfig.Compute, maxMemMap map[string]kresource.Quantity) error {
msg := "no instance types in your cluster are large enough to satisfy the requested resources for your pod\n\n"
msg += console.Bold("requested pod resources\n")
msg += podResourceRequestsTable(api, compute)
msg += "\n" + s.TrimTrailingNewLines(nodeGroupResourcesTable(api, compute, maxMemMap))

return errors.WithStack(&errors.Error{
Kind: ErrNoAvailableNodeComputeLimit,
Message: message,
Message: msg,
})
}

func ErrorAPIUsedByTrafficSplitter(trafficSplitters []string) error {
return errors.WithStack(&errors.Error{
Kind: ErrRealtimeAPIUsedByTrafficSplitter,
Message: fmt.Sprintf("cannot delete api because it is used by the following %s: %s", strings.PluralS("TrafficSplitter", len(trafficSplitters)), strings.StrsSentence(trafficSplitters, "")),
Message: fmt.Sprintf("cannot delete api because it is used by the following %s: %s", s.PluralS("TrafficSplitter", len(trafficSplitters)), s.StrsSentence(trafficSplitters, "")),
})
}

func ErrorAPIsNotDeployed(notDeployedAPIs []string) error {
message := fmt.Sprintf("apis %s were either not found or are not RealtimeAPIs", strings.StrsAnd(notDeployedAPIs))
message := fmt.Sprintf("apis %s were either not found or are not RealtimeAPIs", s.StrsAnd(notDeployedAPIs))
if len(notDeployedAPIs) == 1 {
message = fmt.Sprintf("api %s was either not found or is not a RealtimeAPI", notDeployedAPIs[0])
}
Expand All @@ -105,6 +114,83 @@ func ErrorAPIsNotDeployed(notDeployedAPIs []string) error {
func ErrorInvalidNodeGroupSelector(selected string, availableNodeGroups []string) error {
return errors.WithStack(&errors.Error{
Kind: ErrInvalidNodeGroupSelector,
Message: fmt.Sprintf("node group %s doesn't exist; remove the node group selector to let Cortex determine automatically where to place the API or specify a valid node group name (%s)", selected, s.StrsOr(availableNodeGroups)),
Message: fmt.Sprintf("node group \"%s\" doesn't exist; remove the node group selector to let Cortex determine automatically where to place the API, or specify a valid node group name (%s)", selected, s.StrsOr(availableNodeGroups)),
})
}

func podResourceRequestsTable(api *userconfig.API, compute userconfig.Compute) string {
sidecarCPUNote := ""
sidecarMemNote := ""
if api.Kind == userconfig.RealtimeAPIKind {
sidecarCPUNote = fmt.Sprintf(" (including %s for the %s sidecar container)", consts.CortexProxyCPU.String(), workloads.ProxyContainerName)
sidecarMemNote = fmt.Sprintf(" (including %s for the %s sidecar container)", k8s.ToMiCeilStr(consts.CortexProxyMem), workloads.ProxyContainerName)
} else if api.Kind == userconfig.AsyncAPIKind || api.Kind == userconfig.BatchAPIKind {
sidecarCPUNote = fmt.Sprintf(" (including %s for the %s sidecar container)", consts.CortexDequeuerCPU.String(), workloads.DequeuerContainerName)
sidecarMemNote = fmt.Sprintf(" (including %s for the %s sidecar container)", k8s.ToMiCeilStr(consts.CortexDequeuerMem), workloads.DequeuerContainerName)
}

var items table.KeyValuePairs
if compute.CPU != nil {
items.Add("CPU", compute.CPU.String()+sidecarCPUNote)
}
if compute.Mem != nil {
items.Add("memory", compute.Mem.ToMiCeilStr()+sidecarMemNote)
}
if compute.GPU > 0 {
items.Add("GPU", compute.GPU)
}
if compute.Inf > 0 {
items.Add("Inf", compute.Inf)
}

return items.String()
}

func nodeGroupResourcesTable(api *userconfig.API, compute userconfig.Compute, maxMemMap map[string]kresource.Quantity) string {
var skippedNodeGroups []string
var nodeGroupResourceRows [][]interface{}

showGPU := false
showInf := false
if compute.GPU > 0 {
showGPU = true
}
if compute.Inf > 0 {
showInf = true
}

for _, ng := range config.ClusterConfig.NodeGroups {
nodeCPU, nodeMem, nodeGPU, nodeInf := getNodeCapacity(ng.InstanceType, maxMemMap)
if nodeGPU > 0 {
showGPU = true
}
if nodeInf > 0 {
showInf = true
}

if api.NodeGroups != nil && !slices.HasString(api.NodeGroups, ng.Name) {
skippedNodeGroups = append(skippedNodeGroups, ng.Name)
} else {
nodeGroupResourceRows = append(nodeGroupResourceRows, []interface{}{ng.Name, ng.InstanceType, nodeCPU, k8s.ToMiFloorStr(nodeMem), nodeGPU, nodeInf})
}
}

nodeGroupResourceRowsTable := table.Table{
Headers: []table.Header{
{Title: "node group"},
{Title: "instance type"},
{Title: "CPU"},
{Title: "memory"},
{Title: "GPU", Hidden: !showGPU},
{Title: "Inf", Hidden: !showInf},
},
Rows: nodeGroupResourceRows,
}

out := nodeGroupResourceRowsTable.MustFormat()
if len(skippedNodeGroups) > 0 {
out += fmt.Sprintf("\nthe following %s skipped (based on the api configuration's %s field): %s", s.PluralCustom("node group was", "node groups were", len(skippedNodeGroups)), userconfig.NodeGroupsKey, strings.Join(skippedNodeGroups, ", "))
}

return out
}
Loading