Skip to content

Commit 5e489fa

Browse files
authored
Support refresh CLI command for Async APIs (#2265)
1 parent 20652ea commit 5e489fa

File tree

15 files changed

+266
-155
lines changed

15 files changed

+266
-155
lines changed

cmd/operator/main.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,10 @@ func main() {
6565

6666
for i := range deployments {
6767
deployment := deployments[i]
68-
apiKind := deployment.Labels["apiKind"]
69-
if userconfig.KindFromString(apiKind) == userconfig.RealtimeAPIKind ||
70-
userconfig.KindFromString(apiKind) == userconfig.AsyncAPIKind {
68+
apiKind := userconfig.KindFromString(deployment.Labels["apiKind"])
69+
if apiKind == userconfig.RealtimeAPIKind ||
70+
(apiKind == userconfig.AsyncAPIKind && deployment.Labels["cortex.dev/async"] != "gateway") {
71+
7172
apiID := deployment.Labels["apiID"]
7273
apiName := deployment.Labels["apiName"]
7374
api, err := operator.DownloadAPISpec(apiName, apiID)
@@ -76,16 +77,16 @@ func main() {
7677
}
7778

7879
switch apiKind {
79-
case userconfig.RealtimeAPIKind.String():
80+
case userconfig.RealtimeAPIKind:
8081
if err := realtimeapi.UpdateAutoscalerCron(&deployment, api); err != nil {
8182
operatorLogger.Fatal(errors.Wrap(err, "init"))
8283
}
83-
case userconfig.AsyncAPIKind.String():
84-
if err := asyncapi.UpdateMetricsCron(&deployment); err != nil {
84+
case userconfig.AsyncAPIKind:
85+
if err := asyncapi.UpdateAPIMetricsCron(&deployment); err != nil {
8586
operatorLogger.Fatal(errors.Wrap(err, "init"))
8687
}
8788

88-
if err := asyncapi.UpdateAutoscalerCron(&deployment, *api); err != nil {
89+
if err := asyncapi.UpdateAPIAutoscalerCron(&deployment, *api); err != nil {
8990
operatorLogger.Fatal(errors.Wrap(err, "init"))
9091
}
9192
}

pkg/operator/resources/asyncapi/api.go

+91-32
Original file line numberDiff line numberDiff line change
@@ -64,25 +64,31 @@ func getGatewayK8sName(apiName string) string {
6464
return "gateway-" + apiName
6565
}
6666

67-
func deploymentID() string {
67+
func generateDeploymentID() string {
6868
return k8s.RandomName()[:10]
6969
}
7070

7171
func UpdateAPI(apiConfig userconfig.API, force bool) (*spec.API, string, error) {
72-
prevK8sResources, err := getK8sResources(apiConfig)
72+
prevK8sResources, err := getK8sResources(apiConfig.Name)
7373
if err != nil {
7474
return nil, "", err
7575
}
7676

77-
deployID := deploymentID()
78-
if prevK8sResources.apiDeployment != nil && prevK8sResources.apiDeployment.Labels["deploymentID"] != "" {
79-
deployID = prevK8sResources.apiDeployment.Labels["deploymentID"]
77+
initialDeploymentTime := time.Now().UnixNano()
78+
deploymentID := generateDeploymentID()
79+
if prevK8sResources.gatewayVirtualService != nil && prevK8sResources.gatewayVirtualService.Labels["initialDeploymentTime"] != "" {
80+
var err error
81+
initialDeploymentTime, err = k8s.ParseInt64Label(prevK8sResources.gatewayVirtualService, "initialDeploymentTime")
82+
if err != nil {
83+
return nil, "", err
84+
}
85+
deploymentID = prevK8sResources.gatewayVirtualService.Labels["deploymentID"]
8086
}
8187

82-
api := spec.GetAPISpec(&apiConfig, deployID, config.ClusterConfig.ClusterUID)
88+
api := spec.GetAPISpec(&apiConfig, initialDeploymentTime, deploymentID, config.ClusterConfig.ClusterUID)
8389

8490
// resource creation
85-
if prevK8sResources.apiDeployment == nil {
91+
if prevK8sResources.gatewayVirtualService == nil {
8692
if err := config.AWS.UploadJSONToS3(api, config.ClusterConfig.Bucket, api.Key); err != nil {
8793
return nil, "", errors.Wrap(err, "upload api spec")
8894
}
@@ -91,7 +97,7 @@ func UpdateAPI(apiConfig userconfig.API, force bool) (*spec.API, string, error)
9197
"apiName": apiConfig.Name,
9298
}
9399

94-
queueURL, err := createFIFOQueue(apiConfig.Name, deployID, tags)
100+
queueURL, err := createFIFOQueue(apiConfig.Name, initialDeploymentTime, tags)
95101
if err != nil {
96102
return nil, "", err
97103
}
@@ -127,7 +133,12 @@ func UpdateAPI(apiConfig userconfig.API, force bool) (*spec.API, string, error)
127133
return nil, "", errors.Wrap(err, "upload api spec")
128134
}
129135

130-
queueURL, err := getQueueURL(api.Name, prevK8sResources.gatewayVirtualService.Labels["deploymentID"])
136+
initialDeploymentTime, err := k8s.ParseInt64Label(prevK8sResources.gatewayVirtualService, "initialDeploymentTime")
137+
if err != nil {
138+
return nil, "", err
139+
}
140+
141+
queueURL, err := getQueueURL(api.Name, initialDeploymentTime)
131142
if err != nil {
132143
return nil, "", err
133144
}
@@ -150,6 +161,56 @@ func UpdateAPI(apiConfig userconfig.API, force bool) (*spec.API, string, error)
150161
return api, fmt.Sprintf("%s is up to date", api.Resource.UserString()), nil
151162
}
152163

164+
func RefreshAPI(apiName string, force bool) (string, error) {
165+
prevK8sResources, err := getK8sResources(apiName)
166+
if err != nil {
167+
return "", err
168+
} else if prevK8sResources.gatewayVirtualService == nil || prevK8sResources.apiDeployment == nil {
169+
return "", errors.ErrorUnexpected("unable to find deployment", apiName)
170+
}
171+
172+
isUpdating, err := isAPIUpdating(prevK8sResources.apiDeployment)
173+
if err != nil {
174+
return "", err
175+
}
176+
177+
if isUpdating && !force {
178+
return "", ErrorAPIUpdating(apiName)
179+
}
180+
181+
apiID, err := k8s.GetLabel(prevK8sResources.gatewayVirtualService, "apiID")
182+
if err != nil {
183+
return "", err
184+
}
185+
186+
api, err := operator.DownloadAPISpec(apiName, apiID)
187+
if err != nil {
188+
return "", err
189+
}
190+
191+
initialDeploymentTime, err := k8s.ParseInt64Label(prevK8sResources.gatewayVirtualService, "initialDeploymentTime")
192+
if err != nil {
193+
return "", err
194+
}
195+
196+
api = spec.GetAPISpec(api.API, initialDeploymentTime, generateDeploymentID(), config.ClusterConfig.ClusterUID)
197+
198+
if err := config.AWS.UploadJSONToS3(api, config.ClusterConfig.Bucket, api.Key); err != nil {
199+
return "", errors.Wrap(err, "upload api spec")
200+
}
201+
202+
queueURL, err := getQueueURL(api.Name, initialDeploymentTime)
203+
if err != nil {
204+
return "", err
205+
}
206+
207+
if err = applyK8sResources(*api, prevK8sResources, queueURL); err != nil {
208+
return "", err
209+
}
210+
211+
return fmt.Sprintf("updating %s", api.Resource.UserString()), nil
212+
}
213+
153214
func DeleteAPI(apiName string, keepCache bool) error {
154215
err := parallel.RunFirstErr(
155216
func() error {
@@ -158,7 +219,11 @@ func DeleteAPI(apiName string, keepCache bool) error {
158219
return err
159220
}
160221
if vs != nil {
161-
queueURL, err := getQueueURL(apiName, vs.Labels["deploymentID"])
222+
initialDeploymentTime, err := k8s.ParseInt64Label(vs, "initialDeploymentTime")
223+
if err != nil {
224+
return err
225+
}
226+
queueURL, err := getQueueURL(apiName, initialDeploymentTime)
162227
if err != nil {
163228
return err
164229
}
@@ -258,20 +323,19 @@ func GetAllAPIs(pods []kcore.Pod, deployments []kapps.Deployment) ([]schema.APIR
258323
return asyncAPIs, nil
259324
}
260325

261-
func UpdateMetricsCron(deployment *kapps.Deployment) error {
262-
// skip gateway deployments
263-
if deployment.Labels["cortex.dev/async"] != "api" {
264-
return nil
265-
}
266-
267-
apiName := deployment.Labels["apiName"]
268-
deployID := deployment.Labels["deploymentID"]
326+
func UpdateAPIMetricsCron(apiDeployment *kapps.Deployment) error {
327+
apiName := apiDeployment.Labels["apiName"]
269328

270329
if prevMetricsCron, ok := _metricsCrons[apiName]; ok {
271330
prevMetricsCron.Cancel()
272331
}
273332

274-
queueURL, err := getQueueURL(apiName, deployID)
333+
initialDeploymentTime, err := k8s.ParseInt64Label(apiDeployment, "initialDeploymentTime")
334+
if err != nil {
335+
return err
336+
}
337+
338+
queueURL, err := getQueueURL(apiName, initialDeploymentTime)
275339
if err != nil {
276340
return err
277341
}
@@ -283,18 +347,13 @@ func UpdateMetricsCron(deployment *kapps.Deployment) error {
283347
return nil
284348
}
285349

286-
func UpdateAutoscalerCron(deployment *kapps.Deployment, apiSpec spec.API) error {
287-
// skip gateway deployments
288-
if deployment.Labels["cortex.dev/async"] != "api" {
289-
return nil
290-
}
291-
292-
apiName := deployment.Labels["apiName"]
350+
func UpdateAPIAutoscalerCron(apiDeployment *kapps.Deployment, apiSpec spec.API) error {
351+
apiName := apiDeployment.Labels["apiName"]
293352
if prevAutoscalerCron, ok := _autoscalerCrons[apiName]; ok {
294353
prevAutoscalerCron.Cancel()
295354
}
296355

297-
autoscaler, err := autoscalerlib.AutoscaleFn(deployment, &apiSpec, getMessagesInQueue)
356+
autoscaler, err := autoscalerlib.AutoscaleFn(apiDeployment, &apiSpec, getMessagesInQueue)
298357
if err != nil {
299358
return err
300359
}
@@ -304,16 +363,16 @@ func UpdateAutoscalerCron(deployment *kapps.Deployment, apiSpec spec.API) error
304363
return nil
305364
}
306365

307-
func getK8sResources(apiConfig userconfig.API) (resources, error) {
366+
func getK8sResources(apiName string) (resources, error) {
308367
var deployment *kapps.Deployment
309368
var apiConfigMap *kcore.ConfigMap
310369
var gatewayDeployment *kapps.Deployment
311370
var gatewayService *kcore.Service
312371
var gatewayHPA *kautoscaling.HorizontalPodAutoscaler
313372
var gatewayVirtualService *istioclientnetworking.VirtualService
314373

315-
gatewayK8sName := getGatewayK8sName(apiConfig.Name)
316-
apiK8sName := workloads.K8sName(apiConfig.Name)
374+
gatewayK8sName := getGatewayK8sName(apiName)
375+
apiK8sName := workloads.K8sName(apiName)
317376

318377
err := parallel.RunFirstErr(
319378
func() error {
@@ -382,11 +441,11 @@ func applyK8sResources(api spec.API, prevK8sResources resources, queueURL string
382441
return err
383442
}
384443

385-
if err := UpdateMetricsCron(&apiDeployment); err != nil {
444+
if err := UpdateAPIMetricsCron(&apiDeployment); err != nil {
386445
return err
387446
}
388447

389-
if err := UpdateAutoscalerCron(&apiDeployment, api); err != nil {
448+
if err := UpdateAPIAutoscalerCron(&apiDeployment, api); err != nil {
390449
return err
391450
}
392451

pkg/operator/resources/asyncapi/k8s_specs.go

+28-36
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/cortexlabs/cortex/pkg/consts"
2121
"github.com/cortexlabs/cortex/pkg/lib/k8s"
2222
"github.com/cortexlabs/cortex/pkg/lib/pointer"
23+
s "github.com/cortexlabs/cortex/pkg/lib/strings"
2324
"github.com/cortexlabs/cortex/pkg/types/spec"
2425
"github.com/cortexlabs/cortex/pkg/workloads"
2526
"istio.io/client-go/pkg/apis/networking/v1beta1"
@@ -66,19 +67,14 @@ func gatewayDeploymentSpec(api spec.API, queueURL string) kapps.Deployment {
6667
Labels: map[string]string{
6768
"apiName": api.Name,
6869
"apiKind": api.Kind.String(),
69-
"apiID": api.ID,
70-
"specID": api.SpecID,
71-
"deploymentID": api.DeploymentID,
72-
"podID": api.PodID,
7370
"cortex.dev/api": "true",
7471
"cortex.dev/async": "gateway",
7572
},
7673
PodSpec: k8s.PodSpec{
7774
Labels: map[string]string{
75+
// ID labels are omitted to avoid restarting the gateway on update/refresh
7876
"apiName": api.Name,
7977
"apiKind": api.Kind.String(),
80-
"deploymentID": api.DeploymentID,
81-
"podID": api.PodID,
8278
"cortex.dev/api": "true",
8379
"cortex.dev/async": "gateway",
8480
},
@@ -110,10 +106,6 @@ func gatewayHPASpec(api spec.API) (kautoscaling.HorizontalPodAutoscaler, error)
110106
Labels: map[string]string{
111107
"apiName": api.Name,
112108
"apiKind": api.Kind.String(),
113-
"apiID": api.ID,
114-
"specID": api.SpecID,
115-
"deploymentID": api.DeploymentID,
116-
"podID": api.PodID,
117109
"cortex.dev/api": "true",
118110
"cortex.dev/async": "hpa",
119111
},
@@ -159,14 +151,15 @@ func gatewayVirtualServiceSpec(api spec.API) v1beta1.VirtualService {
159151
Rewrite: pointer.String("/"),
160152
Annotations: api.ToK8sAnnotations(),
161153
Labels: map[string]string{
162-
"apiName": api.Name,
163-
"apiKind": api.Kind.String(),
164-
"apiID": api.ID,
165-
"specID": api.SpecID,
166-
"deploymentID": api.DeploymentID,
167-
"podID": api.PodID,
168-
"cortex.dev/api": "true",
169-
"cortex.dev/async": "gateway",
154+
"apiName": api.Name,
155+
"apiKind": api.Kind.String(),
156+
"apiID": api.ID,
157+
"specID": api.SpecID,
158+
"initialDeploymentTime": s.Int64(api.InitialDeploymentTime),
159+
"deploymentID": api.DeploymentID,
160+
"podID": api.PodID,
161+
"cortex.dev/api": "true",
162+
"cortex.dev/async": "gateway",
170163
},
171164
})
172165
}
@@ -187,9 +180,6 @@ func configMapSpec(api spec.API) (kcore.ConfigMap, error) {
187180
Labels: map[string]string{
188181
"apiName": api.Name,
189182
"apiKind": api.Kind.String(),
190-
"apiID": api.ID,
191-
"specID": api.SpecID,
192-
"deploymentID": api.DeploymentID,
193183
"cortex.dev/api": "true",
194184
},
195185
}), nil
@@ -209,14 +199,15 @@ func deploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL str
209199
MaxSurge: pointer.String(api.UpdateStrategy.MaxSurge),
210200
MaxUnavailable: pointer.String(api.UpdateStrategy.MaxUnavailable),
211201
Labels: map[string]string{
212-
"apiName": api.Name,
213-
"apiKind": api.Kind.String(),
214-
"apiID": api.ID,
215-
"specID": api.SpecID,
216-
"deploymentID": api.DeploymentID,
217-
"podID": api.PodID,
218-
"cortex.dev/api": "true",
219-
"cortex.dev/async": "api",
202+
"apiName": api.Name,
203+
"apiKind": api.Kind.String(),
204+
"apiID": api.ID,
205+
"specID": api.SpecID,
206+
"initialDeploymentTime": s.Int64(api.InitialDeploymentTime),
207+
"deploymentID": api.DeploymentID,
208+
"podID": api.PodID,
209+
"cortex.dev/api": "true",
210+
"cortex.dev/async": "api",
220211
},
221212
Annotations: api.ToK8sAnnotations(),
222213
Selector: map[string]string{
@@ -226,13 +217,14 @@ func deploymentSpec(api spec.API, prevDeployment *kapps.Deployment, queueURL str
226217
},
227218
PodSpec: k8s.PodSpec{
228219
Labels: map[string]string{
229-
"apiName": api.Name,
230-
"apiKind": api.Kind.String(),
231-
"apiID": api.ID,
232-
"deploymentID": api.DeploymentID,
233-
"podID": api.PodID,
234-
"cortex.dev/api": "true",
235-
"cortex.dev/async": "api",
220+
"apiName": api.Name,
221+
"apiKind": api.Kind.String(),
222+
"apiID": api.ID,
223+
"initialDeploymentTime": s.Int64(api.InitialDeploymentTime),
224+
"deploymentID": api.DeploymentID,
225+
"podID": api.PodID,
226+
"cortex.dev/api": "true",
227+
"cortex.dev/async": "api",
236228
},
237229
K8sPodSpec: kcore.PodSpec{
238230
RestartPolicy: "Always",

0 commit comments

Comments
 (0)