Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (qjm *XController) PreemptQueueJobs() {
// Only back-off AWs that are in state running and not in state Failed
if updateNewJob.Status.State != arbv1.AppWrapperStateFailed {
klog.Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to back off queue.", aw.Name, aw.Namespace)
go qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message))
qjm.backoff(ctx, updateNewJob, "PreemptionTriggered", string(message))
}
}
}
Expand Down Expand Up @@ -1155,7 +1155,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
} else {
dispatchFailedMessage = "Cannot find an cluster with enough resources to dispatch AppWrapper."
klog.V(2).Infof("[ScheduleNex] [Dispatcher Mode] %s %s\n", dispatchFailedReason, dispatchFailedMessage)
go qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage)
qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage)
}
} else { // Agent Mode
aggqj := qjm.GetAggregatedResources(qj)
Expand Down Expand Up @@ -1284,7 +1284,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
// TODO: Remove forwarded logic as a big AW will never be forwarded
forwarded = true
// should we call backoff or update etcd?
go qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage)
qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage)
}
}
forwarded = true
Expand Down Expand Up @@ -1347,7 +1347,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) {
if qjm.quotaManager != nil && quotaFits {
qjm.quotaManager.Release(qj)
}
go qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage)
qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage)
}
}
return nil
Expand Down Expand Up @@ -1672,6 +1672,20 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) {
}

klog.V(6).Infof("[Informer-updateQJ] '%s/%s' *Delay=%.6f seconds normal enqueue Version=%s Status=%v", newQJ.Namespace, newQJ.Name, time.Now().Sub(newQJ.Status.ControllerFirstTimestamp.Time).Seconds(), newQJ.ResourceVersion, newQJ.Status)
for _, cond := range newQJ.Status.Conditions {
if cond.Type == arbv1.AppWrapperCondBackoff {
//AWs that have backoff conditions have a delay of 10 seconds before getting added to enqueue.
//TODO: we could plug an interface here with back-off strategies for different MCAD use cases.
time.AfterFunc(time.Duration(cc.serverOption.BackoffTime)*time.Second, func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be useful to have that logic encapsulated into a enqueueAfter method, so it mimics the semantic from client-go DelayingInterface.AddAfter, and can potentially be reused elsewhere.

if cc.serverOption.QuotaEnabled && cc.quotaManager != nil {
cc.quotaManager.Release(newQJ)
}
cc.enqueue(newQJ)
})
return
}
}

// cc.eventQueue.Delete(oldObj)
cc.enqueue(newQJ)
}
Expand Down