diff --git a/pkg/lib/k8s/k8s.go b/pkg/lib/k8s/k8s.go index 46de5e1acb..362931dd59 100644 --- a/pkg/lib/k8s/k8s.go +++ b/pkg/lib/k8s/k8s.go @@ -51,6 +51,7 @@ type Client struct { clientset *kclientset.Clientset dynamicClient kclientdynamic.Interface podClient kclientcore.PodInterface + nodeClient kclientcore.NodeInterface serviceClient kclientcore.ServiceInterface configMapClient kclientcore.ConfigMapInterface deploymentClient kclientapps.DeploymentInterface @@ -87,6 +88,7 @@ func New(namespace string, inCluster bool) (*Client, error) { } client.podClient = client.clientset.CoreV1().Pods(namespace) + client.nodeClient = client.clientset.CoreV1().Nodes() client.serviceClient = client.clientset.CoreV1().Services(namespace) client.configMapClient = client.clientset.CoreV1().ConfigMaps(namespace) client.deploymentClient = client.clientset.AppsV1().Deployments(namespace) diff --git a/pkg/lib/k8s/node.go b/pkg/lib/k8s/node.go new file mode 100644 index 0000000000..5208d6acce --- /dev/null +++ b/pkg/lib/k8s/node.go @@ -0,0 +1,35 @@ +/* +Copyright 2019 Cortex Labs, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k8s + +import ( + "github.com/cortexlabs/cortex/pkg/lib/errors" + + kcore "k8s.io/api/core/v1" + kmeta "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func (c *Client) ListNodes(opts *kmeta.ListOptions) ([]kcore.Node, error) { + if opts == nil { + opts = &kmeta.ListOptions{} + } + nodeList, err := c.nodeClient.List(*opts) + if err != nil { + return nil, errors.WithStack(err) + } + return nodeList.Items, nil +} diff --git a/pkg/operator/endpoints/deploy.go b/pkg/operator/endpoints/deploy.go index d5c37dee7a..2c28bb5283 100644 --- a/pkg/operator/endpoints/deploy.go +++ b/pkg/operator/endpoints/deploy.go @@ -56,6 +56,12 @@ func Deploy(w http.ResponseWriter, r *http.Request) { fullCtxMatch = true } + err = workloads.ValidateDeploy(ctx) + if err != nil { + RespondError(w, err) + return + } + deploymentStatus, err := workloads.GetDeploymentStatus(ctx.App.Name) if err != nil { RespondError(w, err) diff --git a/pkg/operator/workloads/api_workload.go b/pkg/operator/workloads/api_workload.go index caf9b1f473..f0ee8c0d3f 100644 --- a/pkg/operator/workloads/api_workload.go +++ b/pkg/operator/workloads/api_workload.go @@ -87,7 +87,6 @@ func (aw *APIWorkload) Start(ctx *context.Context) error { desiredReplicas := getRequestedReplicasFromDeployment(api, k8sDeloyment, hpa) var deploymentSpec *kapps.Deployment - switch api.ModelFormat { case userconfig.TensorFlowModelFormat: deploymentSpec = tfAPISpec(ctx, api, aw.WorkloadID, desiredReplicas) diff --git a/pkg/operator/workloads/errors.go b/pkg/operator/workloads/errors.go index 9f489376e3..73c93e712b 100644 --- a/pkg/operator/workloads/errors.go +++ b/pkg/operator/workloads/errors.go @@ -16,6 +16,8 @@ limitations under the License. package workloads +import "fmt" + type ErrorKind int const ( @@ -25,6 +27,7 @@ const ( ErrLoadBalancerInitializing ErrNotFound ErrAPIInitializing + ErrNoAvailableNodeComputeLimit ) var errorKinds = []string{ @@ -34,9 +37,10 @@ var errorKinds = []string{ "err_load_balancer_initializing", "err_not_found", "err_api_initializing", + "err_no_available_node_compute_limit", } -var _ = [1]int{}[int(ErrAPIInitializing)-(len(errorKinds)-1)] // Ensure list length matches +var _ = [1]int{}[int(ErrNoAvailableNodeComputeLimit)-(len(errorKinds)-1)] // Ensure list length matches func (t ErrorKind) String() string { return errorKinds[t] @@ -115,3 +119,14 @@ func ErrorAPIInitializing() error { message: "api is still initializing", } } + +func ErrorNoAvailableNodeComputeLimit(resource string, reqStr string, maxStr string) error { + message := fmt.Sprintf("no available nodes can satisfy the requested %s quantity - requested %s %s but nodes only have %s %s", resource, reqStr, resource, maxStr, resource) + if maxStr == "0" { + message = fmt.Sprintf("no available nodes can satisfy the requested %s quantity - requested %s %s but nodes don't have any %s", resource, reqStr, resource, resource) + } + return Error{ + Kind: ErrNoAvailableNodeComputeLimit, + message: message, + } +} diff --git a/pkg/operator/workloads/workflow.go b/pkg/operator/workloads/workflow.go index 684a1a70fb..d3d3981f7b 100644 --- a/pkg/operator/workloads/workflow.go +++ b/pkg/operator/workloads/workflow.go @@ -17,13 +17,17 @@ limitations under the License. package workloads import ( + "fmt" "path/filepath" + kresource "k8s.io/apimachinery/pkg/api/resource" + "github.com/cortexlabs/cortex/pkg/consts" "github.com/cortexlabs/cortex/pkg/lib/errors" "github.com/cortexlabs/cortex/pkg/lib/sets/strset" "github.com/cortexlabs/cortex/pkg/operator/api/context" "github.com/cortexlabs/cortex/pkg/operator/api/resource" + "github.com/cortexlabs/cortex/pkg/operator/api/userconfig" "github.com/cortexlabs/cortex/pkg/operator/config" ) @@ -294,3 +298,50 @@ func GetDeploymentStatus(appName string) (resource.DeploymentStatus, error) { } return resource.UpdatedDeploymentStatus, nil } + +func ValidateDeploy(ctx *context.Context) error { + nodes, err := config.Kubernetes.ListNodes(nil) + if err != nil { + return err + } + + var maxCPU, maxMem kresource.Quantity + var maxGPU int64 + for _, node := range nodes { + curCPU := node.Status.Capacity.Cpu() + curMem := node.Status.Capacity.Memory() + + var curGPU int64 + if GPUQuantity, ok := node.Status.Allocatable["nvidia.com/gpu"]; ok { + curGPU, _ = GPUQuantity.AsInt64() + } + + if curCPU != nil && maxCPU.Cmp(*curCPU) < 0 { + maxCPU = *curCPU + } + + if curMem != nil && maxMem.Cmp(*curMem) < 0 { + maxMem = *curMem + } + + if curGPU > maxGPU { + maxGPU = curGPU + } + } + + for _, api := range ctx.APIs { + if maxCPU.Cmp(api.Compute.CPU.Quantity) < 0 { + return errors.Wrap(ErrorNoAvailableNodeComputeLimit("CPU", api.Compute.CPU.String(), maxCPU.String()), userconfig.Identify(api)) + } + if api.Compute.Mem != nil { + if maxMem.Cmp(api.Compute.Mem.Quantity) < 0 { + return errors.Wrap(ErrorNoAvailableNodeComputeLimit("Memory", api.Compute.Mem.String(), maxMem.String()), userconfig.Identify(api)) + } + } + gpu := api.Compute.GPU + if gpu > maxGPU { + return errors.Wrap(ErrorNoAvailableNodeComputeLimit("GPU", fmt.Sprintf("%d", gpu), fmt.Sprintf("%d", maxGPU)), userconfig.Identify(api)) + } + } + return nil +}