Skip to content

Commit f3e4264

Browse files
committed
Update workflow states
1 parent e265d88 commit f3e4264

File tree

9 files changed

+163
-39
lines changed

9 files changed

+163
-39
lines changed

pkg/operator/api/resource/saved_status.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type APISavedStatus struct {
4444
type DataExitCode string
4545

4646
const (
47+
ExitCodeDataUnknown DataExitCode = ""
4748
ExitCodeDataSucceeded DataExitCode = "succeeded"
4849
ExitCodeDataFailed DataExitCode = "failed"
4950
ExitCodeDataKilled DataExitCode = "killed"

pkg/operator/workloads/api_workload.go

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,56 @@ func (aw *APIWorkload) IsRunning(ctx *context.Context) (bool, error) {
186186
return false, nil
187187
}
188188

189+
func (aw *APIWorkload) IsStarted(ctx *context.Context) (bool, error) {
190+
api := ctx.APIs.OneByID(aw.GetSingleResourceID())
191+
k8sDeloymentName := internalAPIName(api.Name, ctx.App.Name)
192+
193+
k8sDeployment, err := config.Kubernetes.GetDeployment(k8sDeloymentName)
194+
if err != nil {
195+
return false, err
196+
}
197+
if k8sDeployment == nil || k8sDeployment.Labels["resourceID"] != api.ID || k8sDeployment.DeletionTimestamp != nil {
198+
return false, nil
199+
}
200+
201+
hpa, err := config.Kubernetes.GetHPA(k8sDeloymentName)
202+
if err != nil {
203+
return false, err
204+
}
205+
206+
if doesAPIComputeNeedsUpdating(api, k8sDeployment, hpa) {
207+
return false, nil
208+
}
209+
210+
return true, nil
211+
}
212+
189213
func (aw *APIWorkload) CanRun(ctx *context.Context) (bool, error) {
190-
return areDataDependenciesSucceeded(ctx, aw.GetResourceIDs())
214+
return areAllDataDependenciesSucceeded(ctx, aw.GetResourceIDs())
215+
}
216+
217+
func (aw *APIWorkload) IsFailed(ctx *context.Context) (bool, error) {
218+
api := ctx.APIs.OneByID(aw.GetSingleResourceID())
219+
220+
pods, err := config.Kubernetes.ListPodsByLabels(map[string]string{
221+
"appName": ctx.App.Name,
222+
"workloadType": workloadTypeAPI,
223+
"apiName": api.Name,
224+
"resourceID": api.ID,
225+
"workloadID": aw.GetWorkloadID(),
226+
"userFacing": "true",
227+
})
228+
if err != nil {
229+
return false, err
230+
}
231+
232+
for _, pod := range pods {
233+
if k8s.GetPodStatus(&pod) == k8s.PodStatusFailed {
234+
return true, nil
235+
}
236+
}
237+
238+
return false, nil
191239
}
192240

193241
func tfAPISpec(

pkg/operator/workloads/logs.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,13 @@ func ReadLogs(appName string, workloadID string, verbose bool, socket *websocket
107107
return
108108
}
109109

110-
isPending, err := IsWorkloadPending(appName, workloadID)
110+
isEnded, err := IsWorkloadEnded(appName, workloadID)
111+
111112
if err != nil {
112113
writeSocket(err.Error(), socket)
113114
return
114115
}
115-
if !isPending {
116+
if isEnded {
116117
logPrefix, err := getSavedLogPrefix(workloadID, appName, true)
117118
if err != nil {
118119
writeSocket(err.Error(), socket)

pkg/operator/workloads/python_package_workload.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,22 @@ func (pyw *PythonPackagesWorkload) Start(ctx *context.Context) error {
111111
return nil
112112
}
113113

114+
func (pyw *PythonPackagesWorkload) IsStarted(ctx *context.Context) (bool, error) {
115+
return config.Kubernetes.JobExists(pyw.WorkloadID)
116+
}
117+
114118
func (pyw *PythonPackagesWorkload) IsRunning(ctx *context.Context) (bool, error) {
115119
return config.Kubernetes.IsJobRunning(pyw.WorkloadID)
116120
}
117121

118122
func (pyw *PythonPackagesWorkload) CanRun(ctx *context.Context) (bool, error) {
119-
return areDataDependenciesSucceeded(ctx, pyw.GetResourceIDs())
123+
return areAllDataDependenciesSucceeded(ctx, pyw.GetResourceIDs())
120124
}
121125

122126
func (pyw *PythonPackagesWorkload) IsSucceeded(ctx *context.Context) (bool, error) {
123-
return areDataResourcesSucceeded(ctx, pyw.GetResourceIDs())
127+
return areAllDataResourcesSucceeded(ctx, pyw.GetResourceIDs())
128+
}
129+
130+
func (pyw *PythonPackagesWorkload) IsFailed(ctx *context.Context) (bool, error) {
131+
return areAnyDataResourcesFailed(ctx, pyw.GetResourceIDs())
124132
}

pkg/operator/workloads/shared.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func generateWorkloadID() string {
2929
}
3030

3131
// Check if all resourceIDs have succeeded (only data resource types)
32-
func areDataResourcesSucceeded(ctx *context.Context, resourceIDs strset.Set) (bool, error) {
32+
func areAllDataResourcesSucceeded(ctx *context.Context, resourceIDs strset.Set) (bool, error) {
3333
resourceWorkloadIDs := ctx.DataResourceWorkloadIDs()
3434
for resourceID := range resourceIDs {
3535
workloadID := resourceWorkloadIDs[resourceID]
@@ -46,11 +46,34 @@ func areDataResourcesSucceeded(ctx *context.Context, resourceIDs strset.Set) (bo
4646
return false, nil
4747
}
4848
}
49+
4950
return true, nil
5051
}
5152

53+
// Check if any resourceIDs have succeeded (only data resource types)
54+
func areAnyDataResourcesFailed(ctx *context.Context, resourceIDs strset.Set) (bool, error) {
55+
resourceWorkloadIDs := ctx.DataResourceWorkloadIDs()
56+
for resourceID := range resourceIDs {
57+
workloadID := resourceWorkloadIDs[resourceID]
58+
if workloadID == "" {
59+
continue
60+
}
61+
62+
savedStatus, err := getDataSavedStatus(resourceID, workloadID, ctx.App.Name)
63+
if err != nil {
64+
return false, err
65+
}
66+
67+
if savedStatus != nil && savedStatus.ExitCode != resource.ExitCodeDataSucceeded && savedStatus.ExitCode != resource.ExitCodeDataUnknown {
68+
return true, nil
69+
}
70+
}
71+
72+
return false, nil
73+
}
74+
5275
// Check if all dependencies of targetResourceIDs have succeeded (only data resource types)
53-
func areDataDependenciesSucceeded(ctx *context.Context, targetResourceIDs strset.Set) (bool, error) {
76+
func areAllDataDependenciesSucceeded(ctx *context.Context, targetResourceIDs strset.Set) (bool, error) {
5477
dependencies := ctx.DirectComputedResourceDependencies(targetResourceIDs.Slice()...)
55-
return areDataResourcesSucceeded(ctx, dependencies)
78+
return areAllDataResourcesSucceeded(ctx, dependencies)
5679
}

pkg/operator/workloads/spark_workload.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,14 +271,22 @@ func sparkSpec(
271271
})
272272
}
273273

274+
func (sw *SparkWorkload) IsStarted(ctx *context.Context) (bool, error) {
275+
return config.Spark.Exists(sw.WorkloadID)
276+
}
277+
274278
func (sw *SparkWorkload) IsRunning(ctx *context.Context) (bool, error) {
275279
return config.Spark.IsRunning(sw.WorkloadID)
276280
}
277281

278282
func (sw *SparkWorkload) CanRun(ctx *context.Context) (bool, error) {
279-
return areDataDependenciesSucceeded(ctx, sw.GetResourceIDs())
283+
return areAllDataDependenciesSucceeded(ctx, sw.GetResourceIDs())
280284
}
281285

282286
func (sw *SparkWorkload) IsSucceeded(ctx *context.Context) (bool, error) {
283-
return areDataResourcesSucceeded(ctx, sw.GetResourceIDs())
287+
return areAllDataResourcesSucceeded(ctx, sw.GetResourceIDs())
288+
}
289+
290+
func (sw *SparkWorkload) IsFailed(ctx *context.Context) (bool, error) {
291+
return areAnyDataResourcesFailed(ctx, sw.GetResourceIDs())
284292
}

pkg/operator/workloads/training_workload.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,14 +140,22 @@ func (tw *TrainingWorkload) Start(ctx *context.Context) error {
140140
return nil
141141
}
142142

143+
func (tw *TrainingWorkload) IsStarted(ctx *context.Context) (bool, error) {
144+
return config.Kubernetes.JobExists(tw.WorkloadID)
145+
}
146+
143147
func (tw *TrainingWorkload) IsRunning(ctx *context.Context) (bool, error) {
144148
return config.Kubernetes.IsJobRunning(tw.WorkloadID)
145149
}
146150

147151
func (tw *TrainingWorkload) CanRun(ctx *context.Context) (bool, error) {
148-
return areDataDependenciesSucceeded(ctx, tw.GetResourceIDs())
152+
return areAllDataDependenciesSucceeded(ctx, tw.GetResourceIDs())
149153
}
150154

151155
func (tw *TrainingWorkload) IsSucceeded(ctx *context.Context) (bool, error) {
152-
return areDataResourcesSucceeded(ctx, tw.GetResourceIDs())
156+
return areAllDataResourcesSucceeded(ctx, tw.GetResourceIDs())
157+
}
158+
159+
func (tw *TrainingWorkload) IsFailed(ctx *context.Context) (bool, error) {
160+
return areAnyDataResourcesFailed(ctx, tw.GetResourceIDs())
153161
}

pkg/operator/workloads/workflow.go

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -219,11 +219,19 @@ func updateWorkflow(ctx *context.Context) error {
219219
continue
220220
}
221221

222-
isRunning, err := workload.IsRunning(ctx)
222+
isFailed, err := workload.IsFailed(ctx)
223223
if err != nil {
224224
return err
225225
}
226-
if isRunning {
226+
if isFailed {
227+
continue
228+
}
229+
230+
isStarted, err := workload.IsStarted(ctx)
231+
if err != nil {
232+
return err
233+
}
234+
if isStarted {
227235
continue
228236
}
229237

@@ -244,17 +252,44 @@ func updateWorkflow(ctx *context.Context) error {
244252
return nil
245253
}
246254

247-
func IsWorkloadPending(appName string, workloadID string) (bool, error) {
255+
func IsWorkloadEnded(appName string, workloadID string) (bool, error) {
248256
ctx := CurrentContext(appName)
249257
if ctx == nil {
250258
return false, nil
251259
}
252260

253261
for _, workload := range extractWorkloads(ctx) {
254-
if workload.GetWorkloadID() != workloadID {
255-
continue
262+
if workload.GetWorkloadID() == workloadID {
263+
isSucceeded, err := workload.IsSucceeded(ctx)
264+
if err != nil {
265+
return false, err
266+
}
267+
if isSucceeded {
268+
return true, nil
269+
}
270+
271+
isFailed, err := workload.IsFailed(ctx)
272+
if err != nil {
273+
return false, err
274+
}
275+
if isFailed {
276+
return true, nil
277+
}
278+
279+
return false, nil
256280
}
281+
}
282+
283+
return false, errors.New("workload not found in the current context")
284+
}
257285

286+
func IsDeploymentUpdating(appName string) (bool, error) {
287+
ctx := CurrentContext(appName)
288+
if ctx == nil {
289+
return false, nil
290+
}
291+
292+
for _, workload := range extractWorkloads(ctx) {
258293
isSucceeded, err := workload.IsSucceeded(ctx)
259294
if err != nil {
260295
return false, err
@@ -263,34 +298,24 @@ func IsWorkloadPending(appName string, workloadID string) (bool, error) {
263298
continue
264299
}
265300

266-
isRunning, err := workload.IsRunning(ctx)
301+
isFailed, err := workload.IsFailed(ctx)
267302
if err != nil {
268303
return false, err
269304
}
270-
if isRunning {
305+
if isFailed {
271306
continue
272307
}
273308

274-
return true, nil
275-
}
276-
277-
return false, nil
278-
}
279-
280-
func IsDeploymentUpdating(appName string) (bool, error) {
281-
ctx := CurrentContext(appName)
282-
if ctx == nil {
283-
return false, nil
284-
}
285-
286-
for _, workload := range extractWorkloads(ctx) {
287-
isRunning, err := workload.IsRunning(ctx)
309+
canRun, err := workload.CanRun(ctx)
288310
if err != nil {
289311
return false, err
290312
}
291-
if isRunning {
292-
return true, nil
313+
if !canRun {
314+
continue
293315
}
316+
317+
// It's either running or can run
318+
return true, nil
294319
}
295320

296321
return false, nil

pkg/operator/workloads/workload.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ const (
3030

3131
type Workload interface {
3232
BaseWorkloadInterface
33-
CanRun(*context.Context) (bool, error)
34-
Start(*context.Context) error
35-
IsRunning(*context.Context) (bool, error)
36-
IsSucceeded(*context.Context) (bool, error)
33+
CanRun(*context.Context) (bool, error) // All of the dependencies are satisfied and the workload can be started
34+
Start(*context.Context) error // Start the workload
35+
IsStarted(*context.Context) (bool, error) // The workload was started on the most recent deploy (might be running, succeeded, or failed). It's ok if this doesn't remain accurate across cx deploys
36+
IsRunning(*context.Context) (bool, error) // The workload is currently running
37+
IsSucceeded(*context.Context) (bool, error) // The workload succeeded
38+
IsFailed(*context.Context) (bool, error) // The workload failed
3739
}
3840

3941
type BaseWorkload struct {

0 commit comments

Comments
 (0)