diff --git a/doc/known-issue-and-upcoming-feature.md b/doc/known-issue-and-upcoming-feature.md index 7b9946bb..6858443e 100644 --- a/doc/known-issue-and-upcoming-feature.md +++ b/doc/known-issue-and-upcoming-feature.md @@ -16,5 +16,4 @@ - [ ] Support Framework Spec Update - [ ] Support Framework Spec Validation and Defaulting - [ ] Support Framework Status Subresource -- [ ] Support Framework CompletedRetainSec - [ ] Add AttemptCreating state to move the object initialization time out of the ObjectLocalCacheCreationTimeoutSec diff --git a/pkg/apis/frameworkcontroller/v1/config.go b/pkg/apis/frameworkcontroller/v1/config.go index aa1130b5..5952e673 100644 --- a/pkg/apis/frameworkcontroller/v1/config.go +++ b/pkg/apis/frameworkcontroller/v1/config.go @@ -68,6 +68,11 @@ type Config struct { // it is considered as deleted. ObjectLocalCacheCreationTimeoutSec *int64 `yaml:"objectLocalCacheCreationTimeoutSec"` + // A Framework will only be retained within recent FrameworkCompletedRetainSec + // after it is completed, i.e. it will be automatically deleted after + // f.Status.CompletionTime + FrameworkCompletedRetainSec. + FrameworkCompletedRetainSec *int64 `yaml:"frameworkCompletedRetainSec"` + // If the Framework FancyRetryPolicy is enabled and its FrameworkAttempt is // completed with Transient Conflict Failed CompletionType, it will be retried // after a random delay within this range. @@ -131,6 +136,9 @@ func NewConfig() *Config { // Default to k8s.io/kubernetes/pkg/controller.ExpectationsTimeout c.ObjectLocalCacheCreationTimeoutSec = common.PtrInt64(5 * 60) } + if c.FrameworkCompletedRetainSec == nil { + c.FrameworkCompletedRetainSec = common.PtrInt64(30 * 24 * 3600) + } if c.FrameworkMinRetryDelaySecForTransientConflictFailed == nil { c.FrameworkMinRetryDelaySecForTransientConflictFailed = common.PtrInt64(60) } diff --git a/pkg/apis/frameworkcontroller/v1/zz_generated.deepcopy.go b/pkg/apis/frameworkcontroller/v1/zz_generated.deepcopy.go index 2129c590..44899136 100644 --- a/pkg/apis/frameworkcontroller/v1/zz_generated.deepcopy.go +++ b/pkg/apis/frameworkcontroller/v1/zz_generated.deepcopy.go @@ -129,6 +129,11 @@ func (in *Config) DeepCopyInto(out *Config) { *out = new(int64) **out = **in } + if in.FrameworkCompletedRetainSec != nil { + in, out := &in.FrameworkCompletedRetainSec, &out.FrameworkCompletedRetainSec + *out = new(int64) + **out = **in + } if in.FrameworkMinRetryDelaySecForTransientConflictFailed != nil { in, out := &in.FrameworkMinRetryDelaySecForTransientConflictFailed, &out.FrameworkMinRetryDelaySecForTransientConflictFailed *out = new(int64) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 2e93370d..f937bb8d 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -600,8 +600,13 @@ func (c *FrameworkController) recoverFrameworkWorkItems(f *ci.Framework) { func (c *FrameworkController) recoverTimeoutChecks(f *ci.Framework) { // If a check is already timeout, the timeout will be handled by the following // sync after the recover, so no need to enqueue it again. + if f.Status.State == ci.FrameworkCompleted { + c.enqueueFrameworkCompletedRetainTimeoutCheck(f, true) + return + } c.enqueueFrameworkAttemptCreationTimeoutCheck(f, true) c.enqueueFrameworkRetryDelayTimeoutCheck(f, true) + for _, taskRoleStatus := range f.TaskRoleStatuses() { for _, taskStatus := range taskRoleStatus.TaskStatuses { taskRoleName := taskRoleStatus.Name @@ -612,23 +617,26 @@ func (c *FrameworkController) recoverTimeoutChecks(f *ci.Framework) { } } -func (c *FrameworkController) enqueueFrameworkAttemptCreationTimeoutCheck( +func (c *FrameworkController) enqueueFrameworkCompletedRetainTimeoutCheck( f *ci.Framework, failIfTimeout bool) bool { - if f.Status.State != ci.FrameworkAttemptCreationRequested { + if f.Status.State != ci.FrameworkCompleted { return false } - leftDuration := common.CurrentLeftDuration( - f.Status.TransitionTime, - c.cConfig.ObjectLocalCacheCreationTimeoutSec) - if common.IsTimeout(leftDuration) && failIfTimeout { + return c.enqueueFrameworkTimeoutCheck( + f, f.Status.TransitionTime, c.cConfig.FrameworkCompletedRetainSec, + failIfTimeout, "FrameworkCompletedRetainTimeoutCheck") +} + +func (c *FrameworkController) enqueueFrameworkAttemptCreationTimeoutCheck( + f *ci.Framework, failIfTimeout bool) bool { + if f.Status.State != ci.FrameworkAttemptCreationRequested { return false } - c.fQueue.AddAfter(f.Key(), leftDuration) - klog.Infof("[%v]: enqueueFrameworkAttemptCreationTimeoutCheck after %v", - f.Key(), leftDuration) - return true + return c.enqueueFrameworkTimeoutCheck( + f, f.Status.TransitionTime, c.cConfig.ObjectLocalCacheCreationTimeoutSec, + failIfTimeout, "FrameworkAttemptCreationTimeoutCheck") } func (c *FrameworkController) enqueueTaskAttemptCreationTimeoutCheck( @@ -639,17 +647,9 @@ func (c *FrameworkController) enqueueTaskAttemptCreationTimeoutCheck( return false } - leftDuration := common.CurrentLeftDuration( - taskStatus.TransitionTime, - c.cConfig.ObjectLocalCacheCreationTimeoutSec) - if common.IsTimeout(leftDuration) && failIfTimeout { - return false - } - - c.fQueue.AddAfter(f.Key(), leftDuration) - klog.Infof("[%v][%v][%v]: enqueueTaskAttemptCreationTimeoutCheck after %v", - f.Key(), taskRoleName, taskIndex, leftDuration) - return true + return c.enqueueFrameworkTimeoutCheck( + f, taskStatus.TransitionTime, c.cConfig.ObjectLocalCacheCreationTimeoutSec, + failIfTimeout, "TaskAttemptCreationTimeoutCheck") } func (c *FrameworkController) enqueueFrameworkRetryDelayTimeoutCheck( @@ -658,17 +658,9 @@ func (c *FrameworkController) enqueueFrameworkRetryDelayTimeoutCheck( return false } - leftDuration := common.CurrentLeftDuration( - f.Status.TransitionTime, - f.Status.RetryPolicyStatus.RetryDelaySec) - if common.IsTimeout(leftDuration) && failIfTimeout { - return false - } - - c.fQueue.AddAfter(f.Key(), leftDuration) - klog.Infof("[%v]: enqueueFrameworkRetryDelayTimeoutCheck after %v", - f.Key(), leftDuration) - return true + return c.enqueueFrameworkTimeoutCheck( + f, f.Status.TransitionTime, f.Status.RetryPolicyStatus.RetryDelaySec, + failIfTimeout, "FrameworkRetryDelayTimeoutCheck") } func (c *FrameworkController) enqueueTaskRetryDelayTimeoutCheck( @@ -679,22 +671,36 @@ func (c *FrameworkController) enqueueTaskRetryDelayTimeoutCheck( return false } - leftDuration := common.CurrentLeftDuration( - taskStatus.TransitionTime, - taskStatus.RetryPolicyStatus.RetryDelaySec) + return c.enqueueFrameworkTimeoutCheck( + f, taskStatus.TransitionTime, taskStatus.RetryPolicyStatus.RetryDelaySec, + failIfTimeout, "TaskRetryDelayTimeoutCheck") +} + +func (c *FrameworkController) enqueueFrameworkTimeoutCheck( + f *ci.Framework, startTime meta.Time, timeoutSec *int64, + failIfTimeout bool, logSfx string) bool { + leftDuration := common.CurrentLeftDuration(startTime, timeoutSec) if common.IsTimeout(leftDuration) && failIfTimeout { return false } + // The startTime may not contain OS monotonic clock, such as it is recovered + // after FrameworkController restart. So the IsTimeout judgement may be affected + // by OS wall clock changes, such as it should be timeout but the IsTimeout + // returns false. + // See wall clock and monotonic clock in Golang time/time.go. + // To ensure the timeout will be eventually checked, AddAfter the Framework + // for every none timeout check. c.fQueue.AddAfter(f.Key(), leftDuration) - klog.Infof("[%v][%v][%v]: enqueueTaskRetryDelayTimeoutCheck after %v", - f.Key(), taskRoleName, taskIndex, leftDuration) + klog.Infof( + "[%v]: enqueueFrameworkTimeoutCheck after %v: %v", + f.Key(), leftDuration, logSfx) return true } -func (c *FrameworkController) enqueueFramework(f *ci.Framework, logSfx string) { +func (c *FrameworkController) enqueueFrameworkSync(f *ci.Framework, logSfx string) { c.fQueue.Add(f.Key()) - klog.Infof("[%v]: enqueueFramework: %v", f.Key(), logSfx) + klog.Infof("[%v]: enqueueFrameworkSync: %v", f.Key(), logSfx) } func (c *FrameworkController) syncFrameworkStatus(f *ci.Framework) error { @@ -717,8 +723,21 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { defer func() { klog.Infof(logPfx + "Completed") }() if f.Status.State == ci.FrameworkCompleted { - klog.Infof(logPfx + "Skipped: Framework is already completed") - return nil + if c.enqueueFrameworkCompletedRetainTimeoutCheck(f, true) { + klog.Infof(logPfx + "Skipped: Framework is already completed") + return nil + } + + // deleteFramework + logSfx := "" + if *c.cConfig.LogObjectSnapshot.Framework.OnFrameworkDeletion { + // Ensure the FrameworkSnapshot is exposed before the deletion. + logSfx = ci.GetFrameworkSnapshotLogTail(f) + } + klog.Infof(logPfx+"Framework will be deleted due to "+ + "FrameworkCompletedRetainSec %v is expired"+logSfx, + common.SecToDuration(c.cConfig.FrameworkCompletedRetainSec)) + return c.deleteFramework(f, true) } var cm *core.ConfigMap @@ -849,6 +868,10 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { retryDecision) f.TransitionFrameworkState(ci.FrameworkCompleted) + + c.enqueueFrameworkCompletedRetainTimeoutCheck(f, false) + klog.Infof(logPfx + + "Waiting Framework to be deleted after FrameworkCompletedRetainSec") return nil } } @@ -868,14 +891,13 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { } // retryFramework - klog.Infof(logPfx + "Retry Framework") - - // The completed FrameworkAttempt has been persisted, so it is safe to also - // expose it as one history snapshot. + logSfx := "" if *c.cConfig.LogObjectSnapshot.Framework.OnFrameworkRetry { - klog.Infof(logPfx + "Framework will be retried" + - ci.GetFrameworkSnapshotLogTail(f)) + // The completed FrameworkAttempt has been persisted, so it is safe to + // also expose it as one history snapshot. + logSfx = ci.GetFrameworkSnapshotLogTail(f) } + klog.Infof(logPfx + "Framework will be retried" + logSfx) f.Status.RetryPolicyStatus.TotalRetriedCount++ if retryDecision.IsAccountable { @@ -968,6 +990,47 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { } } +func (c *FrameworkController) deleteFramework( + f *ci.Framework, force bool) error { + errPfx := fmt.Sprintf( + "[%v]: Failed to delete Framework %v: force: %v: ", + f.Key(), f.UID, force) + + // Do not set zero GracePeriodSeconds to do force deletion in any case, since + // it will also immediately delete Pod in PodUnknown state, while the Pod may + // be still running. + deleteErr := c.fClient.FrameworkcontrollerV1().Frameworks(f.Namespace).Delete( + f.Name, &meta.DeleteOptions{Preconditions: &meta.Preconditions{UID: &f.UID}}) + if deleteErr != nil { + if !apiErrors.IsNotFound(deleteErr) { + return fmt.Errorf(errPfx+"%v", deleteErr) + } + } else { + if force { + // Confirm it is deleted instead of still deleting. + remoteF, getErr := c.fClient.FrameworkcontrollerV1().Frameworks(f.Namespace).Get( + f.Name, meta.GetOptions{}) + if getErr != nil { + if !apiErrors.IsNotFound(getErr) { + return fmt.Errorf(errPfx+ + "Framework cannot be got from remote: %v", getErr) + } + } else { + if f.UID == remoteF.UID { + return fmt.Errorf(errPfx+ + "Framework with DeletionTimestamp %v still exist after deletion", + remoteF.DeletionTimestamp) + } + } + } + } + + klog.Infof( + "[%v]: Succeeded to delete Framework %v: force: %v", + f.Key(), f.UID, force) + return nil +} + // Get Framework's current ConfigMap object, if not found, then clean up existing // controlled ConfigMap if any. // Returned cm is either managed or nil, if it is the managed cm, it is not @@ -1348,14 +1411,13 @@ func (c *FrameworkController) syncTaskState( } // retryTask - klog.Infof(logPfx + "Retry Task") - - // The completed TaskAttempt has been persisted, so it is safe to also - // expose it as one history snapshot. + logSfx := "" if *c.cConfig.LogObjectSnapshot.Framework.OnTaskRetry { - klog.Infof(logPfx + "Task will be retried" + - ci.GetFrameworkSnapshotLogTail(f)) + // The completed TaskAttempt has been persisted, so it is safe to also + // expose it as one history snapshot. + logSfx = ci.GetFrameworkSnapshotLogTail(f) } + klog.Infof(logPfx + "Task will be retried" + logSfx) taskStatus.RetryPolicyStatus.TotalRetriedCount++ if retryDecision.IsAccountable { @@ -1645,7 +1707,7 @@ func (c *FrameworkController) completeTaskAttempt( // To ensure the completed TaskAttempt is persisted before exposed, // we need to wait until next sync to expose it, so manually enqueue a sync. klog.Infof(logPfx + "Waiting the completed TaskAttempt to be persisted") - c.enqueueFramework(f, "TaskAttemptCompleted") + c.enqueueFrameworkSync(f, "TaskAttemptCompleted") } else { f.TransitionTaskState(taskRoleName, taskIndex, ci.TaskAttemptDeletionPending) @@ -1653,7 +1715,7 @@ func (c *FrameworkController) completeTaskAttempt( // we need to wait until next sync to delete the pod, so manually enqueue // a sync. klog.Infof(logPfx + "Waiting the CompletionStatus to be persisted") - c.enqueueFramework(f, "TaskAttemptDeletionPending") + c.enqueueFrameworkSync(f, "TaskAttemptDeletionPending") } } @@ -1707,7 +1769,7 @@ func (c *FrameworkController) completeFrameworkAttempt( // To ensure the completed FrameworkAttempt is persisted before exposed, // we need to wait until next sync to expose it, so manually enqueue a sync. klog.Infof(logPfx + "Waiting the completed FrameworkAttempt to be persisted") - c.enqueueFramework(f, "FrameworkAttemptCompleted") + c.enqueueFrameworkSync(f, "FrameworkAttemptCompleted") } else { f.TransitionFrameworkState(ci.FrameworkAttemptDeletionPending) @@ -1715,7 +1777,7 @@ func (c *FrameworkController) completeFrameworkAttempt( // we need to wait until next sync to delete the cm, so manually enqueue // a sync. klog.Infof(logPfx + "Waiting the CompletionStatus to be persisted") - c.enqueueFramework(f, "FrameworkAttemptDeletionPending") + c.enqueueFrameworkSync(f, "FrameworkAttemptDeletionPending") } } @@ -1776,7 +1838,7 @@ func (c *FrameworkController) getExpectedFrameworkStatusInfo(key string) *Expect } func (c *FrameworkController) deleteExpectedFrameworkStatusInfo(key string) { - klog.Infof("[%v]: deleteExpectedFrameworkStatusInfo: ", key) + klog.Infof("[%v]: deleteExpectedFrameworkStatusInfo", key) c.fExpectedStatusInfos.Delete(key) }