Skip to content

Support refresh CLI command for Async APIs #2265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 21, 2021
15 changes: 8 additions & 7 deletions cmd/operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ func main() {

for i := range deployments {
deployment := deployments[i]
apiKind := deployment.Labels["apiKind"]
if userconfig.KindFromString(apiKind) == userconfig.RealtimeAPIKind ||
userconfig.KindFromString(apiKind) == userconfig.AsyncAPIKind {
apiKind := userconfig.KindFromString(deployment.Labels["apiKind"])
if apiKind == userconfig.RealtimeAPIKind ||
(apiKind == userconfig.AsyncAPIKind && deployment.Labels["cortex.dev/async"] != "gateway") {

apiID := deployment.Labels["apiID"]
apiName := deployment.Labels["apiName"]
api, err := operator.DownloadAPISpec(apiName, apiID)
Expand All @@ -76,16 +77,16 @@ func main() {
}

switch apiKind {
case userconfig.RealtimeAPIKind.String():
case userconfig.RealtimeAPIKind:
if err := realtimeapi.UpdateAutoscalerCron(&deployment, api); err != nil {
operatorLogger.Fatal(errors.Wrap(err, "init"))
}
case userconfig.AsyncAPIKind.String():
if err := asyncapi.UpdateMetricsCron(&deployment); err != nil {
case userconfig.AsyncAPIKind:
if err := asyncapi.UpdateAPIMetricsCron(&deployment); err != nil {
operatorLogger.Fatal(errors.Wrap(err, "init"))
}

if err := asyncapi.UpdateAutoscalerCron(&deployment, *api); err != nil {
if err := asyncapi.UpdateAPIAutoscalerCron(&deployment, *api); err != nil {
operatorLogger.Fatal(errors.Wrap(err, "init"))
}
}
Expand Down
123 changes: 91 additions & 32 deletions pkg/operator/resources/asyncapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,25 +64,31 @@ func getGatewayK8sName(apiName string) string {
return "gateway-" + apiName
}

func deploymentID() string {
func generateDeploymentID() string {
return k8s.RandomName()[:10]
}

func UpdateAPI(apiConfig userconfig.API, force bool) (*spec.API, string, error) {
prevK8sResources, err := getK8sResources(apiConfig)
prevK8sResources, err := getK8sResources(apiConfig.Name)
if err != nil {
return nil, "", err
}

deployID := deploymentID()
if prevK8sResources.apiDeployment != nil && prevK8sResources.apiDeployment.Labels["deploymentID"] != "" {
deployID = prevK8sResources.apiDeployment.Labels["deploymentID"]
initialDeploymentTime := time.Now().UnixNano()
deploymentID := generateDeploymentID()
if prevK8sResources.gatewayVirtualService != nil && prevK8sResources.gatewayVirtualService.Labels["initialDeploymentTime"] != "" {
var err error
initialDeploymentTime, err = k8s.ParseInt64Label(prevK8sResources.gatewayVirtualService, "initialDeploymentTime")
if err != nil {
return nil, "", err
}
deploymentID = prevK8sResources.gatewayVirtualService.Labels["deploymentID"]
}

api := spec.GetAPISpec(&apiConfig, deployID, config.ClusterConfig.ClusterUID)
api := spec.GetAPISpec(&apiConfig, initialDeploymentTime, deploymentID, config.ClusterConfig.ClusterUID)

// resource creation
if prevK8sResources.apiDeployment == nil {
if prevK8sResources.gatewayVirtualService == nil {
if err := config.AWS.UploadJSONToS3(api, config.ClusterConfig.Bucket, api.Key); err != nil {
return nil, "", errors.Wrap(err, "upload api spec")
}
Expand All @@ -91,7 +97,7 @@ func UpdateAPI(apiConfig userconfig.API, force bool) (*spec.API, string, error)
"apiName": apiConfig.Name,
}

queueURL, err := createFIFOQueue(apiConfig.Name, deployID, tags)
queueURL, err := createFIFOQueue(apiConfig.Name, initialDeploymentTime, tags)
if err != nil {
return nil, "", err
}
Expand Down Expand Up @@ -127,7 +133,12 @@ func UpdateAPI(apiConfig userconfig.API, force bool) (*spec.API, string, error)
return nil, "", errors.Wrap(err, "upload api spec")
}

queueURL, err := getQueueURL(api.Name, prevK8sResources.gatewayVirtualService.Labels["deploymentID"])
initialDeploymentTime, err := k8s.ParseInt64Label(prevK8sResources.gatewayVirtualService, "initialDeploymentTime")
if err != nil {
return nil, "", err
}

queueURL, err := getQueueURL(api.Name, initialDeploymentTime)
if err != nil {
return nil, "", err
}
Expand All @@ -150,6 +161,56 @@ func UpdateAPI(apiConfig userconfig.API, force bool) (*spec.API, string, error)
return api, fmt.Sprintf("%s is up to date", api.Resource.UserString()), nil
}

func RefreshAPI(apiName string, force bool) (string, error) {
prevK8sResources, err := getK8sResources(apiName)
if err != nil {
return "", err
} else if prevK8sResources.gatewayVirtualService == nil || prevK8sResources.apiDeployment == nil {
return "", errors.ErrorUnexpected("unable to find deployment", apiName)
}

isUpdating, err := isAPIUpdating(prevK8sResources.apiDeployment)
if err != nil {
return "", err
}

if isUpdating && !force {
return "", ErrorAPIUpdating(apiName)
}

apiID, err := k8s.GetLabel(prevK8sResources.gatewayVirtualService, "apiID")
if err != nil {
return "", err
}

api, err := operator.DownloadAPISpec(apiName, apiID)
if err != nil {
return "", err
}

initialDeploymentTime, err := k8s.ParseInt64Label(prevK8sResources.gatewayVirtualService, "initialDeploymentTime")
if err != nil {
return "", err
}

api = spec.GetAPISpec(api.API, initialDeploymentTime, generateDeploymentID(), config.ClusterConfig.ClusterUID)

if err := config.AWS.UploadJSONToS3(api, config.ClusterConfig.Bucket, api.Key); err != nil {
return "", errors.Wrap(err, "upload api spec")
}

queueURL, err := getQueueURL(api.Name, initialDeploymentTime)
if err != nil {
return "", err
}

if err = applyK8sResources(*api, prevK8sResources, queueURL); err != nil {
return "", err
}

return fmt.Sprintf("updating %s", api.Resource.UserString()), nil
}

func DeleteAPI(apiName string, keepCache bool) error {
err := parallel.RunFirstErr(
func() error {
Expand All @@ -158,7 +219,11 @@ func DeleteAPI(apiName string, keepCache bool) error {
return err
}
if vs != nil {
queueURL, err := getQueueURL(apiName, vs.Labels["deploymentID"])
initialDeploymentTime, err := k8s.ParseInt64Label(vs, "initialDeploymentTime")
if err != nil {
return err
}
queueURL, err := getQueueURL(apiName, initialDeploymentTime)
if err != nil {
return err
}
Expand Down Expand Up @@ -258,20 +323,19 @@ func GetAllAPIs(pods []kcore.Pod, deployments []kapps.Deployment) ([]schema.APIR
return asyncAPIs, nil
}

func UpdateMetricsCron(deployment *kapps.Deployment) error {
// skip gateway deployments
if deployment.Labels["cortex.dev/async"] != "api" {
return nil
}

apiName := deployment.Labels["apiName"]
deployID := deployment.Labels["deploymentID"]
func UpdateAPIMetricsCron(apiDeployment *kapps.Deployment) error {
apiName := apiDeployment.Labels["apiName"]

if prevMetricsCron, ok := _metricsCrons[apiName]; ok {
prevMetricsCron.Cancel()
}

queueURL, err := getQueueURL(apiName, deployID)
initialDeploymentTime, err := k8s.ParseInt64Label(apiDeployment, "initialDeploymentTime")
if err != nil {
return err
}

queueURL, err := getQueueURL(apiName, initialDeploymentTime)
if err != nil {
return err
}
Expand All @@ -283,18 +347,13 @@ func UpdateMetricsCron(deployment *kapps.Deployment) error {
return nil
}

func UpdateAutoscalerCron(deployment *kapps.Deployment, apiSpec spec.API) error {
// skip gateway deployments
if deployment.Labels["cortex.dev/async"] != "api" {
return nil
}

apiName := deployment.Labels["apiName"]
func UpdateAPIAutoscalerCron(apiDeployment *kapps.Deployment, apiSpec spec.API) error {
apiName := apiDeployment.Labels["apiName"]
if prevAutoscalerCron, ok := _autoscalerCrons[apiName]; ok {
prevAutoscalerCron.Cancel()
}

autoscaler, err := autoscalerlib.AutoscaleFn(deployment, &apiSpec, getMessagesInQueue)
autoscaler, err := autoscalerlib.AutoscaleFn(apiDeployment, &apiSpec, getMessagesInQueue)
if err != nil {
return err
}
Expand All @@ -304,16 +363,16 @@ func UpdateAutoscalerCron(deployment *kapps.Deployment, apiSpec spec.API) error
return nil
}

func getK8sResources(apiConfig userconfig.API) (resources, error) {
func getK8sResources(apiName string) (resources, error) {
var deployment *kapps.Deployment
var apiConfigMap *kcore.ConfigMap
var gatewayDeployment *kapps.Deployment
var gatewayService *kcore.Service
var gatewayHPA *kautoscaling.HorizontalPodAutoscaler
var gatewayVirtualService *istioclientnetworking.VirtualService

gatewayK8sName := getGatewayK8sName(apiConfig.Name)
apiK8sName := workloads.K8sName(apiConfig.Name)
gatewayK8sName := getGatewayK8sName(apiName)
apiK8sName := workloads.K8sName(apiName)

err := parallel.RunFirstErr(
func() error {
Expand Down Expand Up @@ -382,11 +441,11 @@ func applyK8sResources(api spec.API, prevK8sResources resources, queueURL string
return err
}

if err := UpdateMetricsCron(&apiDeployment); err != nil {
if err := UpdateAPIMetricsCron(&apiDeployment); err != nil {
return err
}

if err := UpdateAutoscalerCron(&apiDeployment, api); err != nil {
if err := UpdateAPIAutoscalerCron(&apiDeployment, api); err != nil {
return err
}

Expand Down
64 changes: 28 additions & 36 deletions pkg/operator/resources/asyncapi/k8s_specs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cortexlabs/cortex/pkg/consts"
"github.com/cortexlabs/cortex/pkg/lib/k8s"
"github.com/cortexlabs/cortex/pkg/lib/pointer"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
"github.com/cortexlabs/cortex/pkg/types/spec"
"github.com/cortexlabs/cortex/pkg/workloads"
"istio.io/client-go/pkg/apis/networking/v1beta1"
Expand Down Expand Up @@ -66,19 +67,14 @@ func gatewayDeploymentSpec(api spec.API, queueURL string) kapps.Deployment {
Labels: map[string]string{
"apiName": api.Name,
"apiKind": api.Kind.String(),
"apiID": api.ID,
"specID": api.SpecID,
"deploymentID": api.DeploymentID,
"podID": api.PodID,
"cortex.dev/api": "true",
"cortex.dev/async": "gateway",
},
PodSpec: k8s.PodSpec{
Labels: map[string]string{
// ID labels are omitted to avoid restarting the gateway on update/refresh
"apiName": api.Name,
"apiKind": api.Kind.String(),
"deploymentID": api.DeploymentID,
"podID": api.PodID,
"cortex.dev/api": "true",
"cortex.dev/async": "gateway",
},
Expand Down Expand Up @@ -110,10 +106,6 @@ func gatewayHPASpec(api spec.API) (kautoscaling.HorizontalPodAutoscaler, error)
Labels: map[string]string{
"apiName": api.Name,
"apiKind": api.Kind.String(),
"apiID": api.ID,
"specID": api.SpecID,
"deploymentID": api.DeploymentID,
"podID": api.PodID,
"cortex.dev/api": "true",
"cortex.dev/async": "hpa",
},
Expand Down Expand Up @@ -159,14 +151,15 @@ func gatewayVirtualServiceSpec(api spec.API) v1beta1.VirtualService {
Rewrite: pointer.String("/"),
Annotations: api.ToK8sAnnotations(),
Labels: map[string]string{
"apiName": api.Name,
"apiKind": api.Kind.String(),
"apiID": api.ID,
"specID": api.SpecID,
"deploymentID": api.DeploymentID,
"podID": api.PodID,
"cortex.dev/api": "true",
"cortex.dev/async": "gateway",
"apiName": api.Name,
"apiKind": api.Kind.String(),
"apiID": api.ID,
"specID": api.SpecID,
"initialDeploymentTime": s.Int64(api.InitialDeploymentTime),
"deploymentID": api.DeploymentID,
"podID": api.PodID,
"cortex.dev/api": "true",
"cortex.dev/async": "gateway",
},
})
}
Expand All @@ -187,9 +180,6 @@ func configMapSpec(api spec.API) (kcore.ConfigMap, error) {
Labels: map[string]string{
"apiName": api.Name,
"apiKind": api.Kind.String(),
"apiID": api.ID,
"specID": api.SpecID,
"deploymentID": api.DeploymentID,
"cortex.dev/api": "true",
},
}), nil
Expand All @@ -209,14 +199,15 @@ func deploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL str
MaxSurge: pointer.String(api.UpdateStrategy.MaxSurge),
MaxUnavailable: pointer.String(api.UpdateStrategy.MaxUnavailable),
Labels: map[string]string{
"apiName": api.Name,
"apiKind": api.Kind.String(),
"apiID": api.ID,
"specID": api.SpecID,
"deploymentID": api.DeploymentID,
"podID": api.PodID,
"cortex.dev/api": "true",
"cortex.dev/async": "api",
"apiName": api.Name,
"apiKind": api.Kind.String(),
"apiID": api.ID,
"specID": api.SpecID,
"initialDeploymentTime": s.Int64(api.InitialDeploymentTime),
"deploymentID": api.DeploymentID,
"podID": api.PodID,
"cortex.dev/api": "true",
"cortex.dev/async": "api",
},
Annotations: api.ToK8sAnnotations(),
Selector: map[string]string{
Expand All @@ -226,13 +217,14 @@ func deploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL str
},
PodSpec: k8s.PodSpec{
Labels: map[string]string{
"apiName": api.Name,
"apiKind": api.Kind.String(),
"apiID": api.ID,
"deploymentID": api.DeploymentID,
"podID": api.PodID,
"cortex.dev/api": "true",
"cortex.dev/async": "api",
"apiName": api.Name,
"apiKind": api.Kind.String(),
"apiID": api.ID,
"initialDeploymentTime": s.Int64(api.InitialDeploymentTime),
"deploymentID": api.DeploymentID,
"podID": api.PodID,
"cortex.dev/api": "true",
"cortex.dev/async": "api",
},
K8sPodSpec: kcore.PodSpec{
RestartPolicy: "Always",
Expand Down
Loading