diff --git a/pkg/apis/frameworkcontroller/v1/completion.go b/pkg/apis/frameworkcontroller/v1/completion.go index ea51d89a..c77b2da7 100644 --- a/pkg/apis/frameworkcontroller/v1/completion.go +++ b/pkg/apis/frameworkcontroller/v1/completion.go @@ -26,8 +26,11 @@ import ( "fmt" "github.com/microsoft/frameworkcontroller/pkg/common" core "k8s.io/api/core/v1" + apiErrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/net" "reflect" "regexp" + "strings" "time" ) @@ -63,17 +66,19 @@ const ( // [-999, -1]: Predefined Framework Error // -1XX: Transient Error - CompletionCodeConfigMapExternalDeleted CompletionCode = -100 - CompletionCodePodExternalDeleted CompletionCode = -101 - CompletionCodeConfigMapCreationTimeout CompletionCode = -110 - CompletionCodePodCreationTimeout CompletionCode = -111 + CompletionCodeConfigMapExternalDeleted CompletionCode = -100 + CompletionCodePodExternalDeleted CompletionCode = -101 + CompletionCodeConfigMapLocalCacheCreationTimeout CompletionCode = -110 + CompletionCodePodLocalCacheCreationTimeout CompletionCode = -111 + CompletionCodePodCreationTransientError CompletionCode = -120 // -2XX: Permanent Error - CompletionCodePodSpecPermanentError CompletionCode = -200 + CompletionCodePodCreationPermanentError CompletionCode = -200 CompletionCodeStopFrameworkRequested CompletionCode = -210 CompletionCodeFrameworkAttemptCompletion CompletionCode = -220 CompletionCodeDeleteTaskRequested CompletionCode = -230 // -3XX: Unknown Error CompletionCodePodFailedWithoutFailedContainer CompletionCode = -300 + CompletionCodePodCreationUnknownError CompletionCode = -310 ) var completionCodeInfoList = []*CompletionCodeInfo{} @@ -152,20 +157,28 @@ func initCompletionCodeInfos() { []CompletionTypeAttribute{CompletionTypeAttributeTransient}}, }, { - Code: CompletionCodeConfigMapCreationTimeout.Ptr(), - Phrase: "ConfigMapCreationTimeout", + Code: CompletionCodeConfigMapLocalCacheCreationTimeout.Ptr(), + Phrase: "ConfigMapLocalCacheCreationTimeout", Type: CompletionType{CompletionTypeNameFailed, []CompletionTypeAttribute{CompletionTypeAttributeTransient}}, }, { - Code: CompletionCodePodCreationTimeout.Ptr(), - Phrase: "PodCreationTimeout", + Code: CompletionCodePodLocalCacheCreationTimeout.Ptr(), + Phrase: "PodLocalCacheCreationTimeout", Type: CompletionType{CompletionTypeNameFailed, []CompletionTypeAttribute{CompletionTypeAttributeTransient}}, }, { - Code: CompletionCodePodSpecPermanentError.Ptr(), - Phrase: "PodSpecPermanentError", + // Only used to distinguish with others, and will never be used to complete + // a TaskAttempt. + Code: CompletionCodePodCreationTransientError.Ptr(), + Phrase: "PodCreationTransientError", + Type: CompletionType{CompletionTypeNameFailed, + []CompletionTypeAttribute{CompletionTypeAttributeTransient}}, + }, + { + Code: CompletionCodePodCreationPermanentError.Ptr(), + Phrase: "PodCreationPermanentError", Type: CompletionType{CompletionTypeNameFailed, []CompletionTypeAttribute{CompletionTypeAttributePermanent}}, }, @@ -193,6 +206,12 @@ func initCompletionCodeInfos() { Type: CompletionType{CompletionTypeNameFailed, []CompletionTypeAttribute{}}, }, + { + Code: CompletionCodePodCreationUnknownError.Ptr(), + Phrase: "PodCreationUnknownError", + Type: CompletionType{CompletionTypeNameFailed, + []CompletionTypeAttribute{}}, + }, }) } @@ -238,6 +257,9 @@ type MatchedContainer struct { } // Match ANY CompletionCodeInfo +// The returned CompletionCode may not within CompletionCodeInfos, such as for +// the ContainerUnrecognizedFailed, so it should not be used to +// NewTaskAttemptCompletionStatus or NewFrameworkAttemptCompletionStatus later. func MatchCompletionCodeInfos(pod *core.Pod) PodMatchResult { for _, codeInfo := range completionCodeInfoList { for _, podPattern := range codeInfo.PodPatterns { @@ -404,6 +426,55 @@ func generatePodUnmatchedResult(pod *core.Pod) PodMatchResult { } } +// The returned CompletionCode must be within CompletionCodeInfos. +func ClassifyPodCreationError(apiErr error) PodMatchResult { + diag := fmt.Sprintf("Failed to create Pod: %v", common.ToJson(apiErr)) + + // Treat Platform Error as Transient Error, such as Pod decoding error. + if strings.Contains(apiErr.Error(), "object provided is unrecognized") || + strings.Contains(apiErr.Error(), "exceeded quota") { + return PodMatchResult{ + CodeInfo: completionCodeInfoMap[CompletionCodePodCreationTransientError], + Diagnostics: diag, + } + } + + // Treat General Framework Error as Unknown Error for safety. + if apiErrors.IsBadRequest(apiErr) || + apiErrors.IsForbidden(apiErr) { + return PodMatchResult{ + CodeInfo: completionCodeInfoMap[CompletionCodePodCreationUnknownError], + Diagnostics: diag, + } + } + + // Treat Permanent Framework Error as Permanent Error only if it must be + // Permanent Error. + if apiErrors.IsInvalid(apiErr) || + apiErrors.IsRequestEntityTooLargeError(apiErr) { + // TODO: Also check net.IsConnectionRefused + if net.IsConnectionReset(apiErr) || net.IsProbableEOF(apiErr) { + // The ApiServer Permanent Error may be caused by Network Transient Error, + // so treat it as Unknown Error for safety. + return PodMatchResult{ + CodeInfo: completionCodeInfoMap[CompletionCodePodCreationUnknownError], + Diagnostics: diag, + } + } else { + return PodMatchResult{ + CodeInfo: completionCodeInfoMap[CompletionCodePodCreationPermanentError], + Diagnostics: diag, + } + } + } + + // Treat all other errors as Transient Error, including all non-APIStatus errors. + return PodMatchResult{ + CodeInfo: completionCodeInfoMap[CompletionCodePodCreationTransientError], + Diagnostics: diag, + } +} + /////////////////////////////////////////////////////////////////////////////////////// // Completion Utils /////////////////////////////////////////////////////////////////////////////////////// diff --git a/pkg/apis/frameworkcontroller/v1/config.go b/pkg/apis/frameworkcontroller/v1/config.go index 33c91697..a47df52c 100644 --- a/pkg/apis/frameworkcontroller/v1/config.go +++ b/pkg/apis/frameworkcontroller/v1/config.go @@ -60,7 +60,7 @@ type Config struct { // Generally, it should be proportional to the cluster Framework workload, and within the ApiServer // serving capacity/limit such as the --max-mutating-requests-inflight. KubeClientQps *float32 `yaml:"kubeClientQps"` - KubeClientBurst *int32 `yaml:"kubeClientBurst"` + KubeClientBurst *int32 `yaml:"kubeClientBurst"` // Number of concurrent workers to process each different Frameworks. // Generally, it should be proportional to the above rate limits of requests. diff --git a/pkg/apis/frameworkcontroller/v1/funcs.go b/pkg/apis/frameworkcontroller/v1/funcs.go index 1e1b56f5..0baa209e 100644 --- a/pkg/apis/frameworkcontroller/v1/funcs.go +++ b/pkg/apis/frameworkcontroller/v1/funcs.go @@ -195,15 +195,17 @@ func NewCompletedTaskTriggeredCompletionStatus( "conditions in FrameworkAttemptCompletionPolicy have ever been triggered", completedTaskCount, totalTaskCount) if triggerTaskStatus == nil { - return CompletionCodeSucceeded.NewFrameworkAttemptCompletionStatus(diag, nil) + return CompletionCodeSucceeded. + NewFrameworkAttemptCompletionStatus(diag, nil) } else { - return CompletionCodeSucceeded.NewFrameworkAttemptCompletionStatus(diag, - &CompletionPolicyTriggerStatus{ - Message: diag, - TaskRoleName: triggerTaskRoleName, - TaskIndex: triggerTaskStatus.Index, - }, - ) + return CompletionCodeSucceeded. + NewFrameworkAttemptCompletionStatus(diag, + &CompletionPolicyTriggerStatus{ + Message: diag, + TaskRoleName: triggerTaskRoleName, + TaskIndex: triggerTaskStatus.Index, + }, + ) } } diff --git a/pkg/common/utils.go b/pkg/common/utils.go index 629e3ff6..ff21e860 100644 --- a/pkg/common/utils.go +++ b/pkg/common/utils.go @@ -28,6 +28,7 @@ import ( "encoding/json" "flag" "fmt" + errorWrap "github.com/pkg/errors" "gopkg.in/yaml.v2" "io/ioutil" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -287,3 +288,12 @@ func Decompress(compressedBytes []byte) (string, error) { } } } + +func GetErrorCause(err error) error { + causeErr := errorWrap.Cause(err) + if causeErr == nil { + return err + } else { + return causeErr + } +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 5c51eef3..0f9054e4 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -1053,7 +1053,7 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { "ConfigMap does not appear in the local cache within timeout %v, "+ "so consider it was deleted and explicitly delete it", common.SecToDuration(c.cConfig.ObjectLocalCacheCreationTimeoutSec)) - code = ci.CompletionCodeConfigMapCreationTimeout + code = ci.CompletionCodeConfigMapLocalCacheCreationTimeout klog.Warning(logPfx + diag) } @@ -1670,7 +1670,7 @@ func (c *FrameworkController) syncTaskState( "Pod does not appear in the local cache within timeout %v, "+ "so consider it was deleted and explicitly delete it", common.SecToDuration(c.cConfig.ObjectLocalCacheCreationTimeoutSec)) - code = ci.CompletionCodePodCreationTimeout + code = ci.CompletionCodePodLocalCacheCreationTimeout klog.Warning(logPfx + diag) } @@ -1752,8 +1752,9 @@ func (c *FrameworkController) syncTaskState( diag := fmt.Sprintf("Pod succeeded") klog.Info(logPfx + diag) c.completeTaskAttempt(f, taskRoleName, taskIndex, false, - ci.CompletionCodeSucceeded.NewTaskAttemptCompletionStatus( - diag, ci.ExtractPodCompletionStatus(pod))) + ci.CompletionCodeSucceeded. + NewTaskAttemptCompletionStatus( + diag, ci.ExtractPodCompletionStatus(pod))) return nil } else if pod.Status.Phase == core.PodFailed { result := ci.MatchCompletionCodeInfos(pod) @@ -1910,26 +1911,26 @@ func (c *FrameworkController) syncTaskState( // createTaskAttempt pod, err = c.createPod(f, cm, taskRoleName, taskIndex) if err != nil { - apiErr := errorWrap.Cause(err) - if internal.IsPodSpecPermanentError(apiErr) { - // Should be Framework Error instead of Platform Transient Error. - diag := fmt.Sprintf("Failed to create Pod: %v", common.ToJson(apiErr)) - klog.Info(logPfx + diag) - - // Ensure pod is deleted in remote to avoid managed pod leak after - // TaskAttemptCompleted. - _, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, true) - if err != nil { - return err - } + apiErr := common.GetErrorCause(err) + result := ci.ClassifyPodCreationError(apiErr) + if *result.CodeInfo.Code == ci.CompletionCodePodCreationTransientError { + // Do not complete the TaskAttempt, as generally, user does not need to + // aware the Transient Error during Pod creation. + return err + } - c.completeTaskAttempt(f, taskRoleName, taskIndex, true, - ci.CompletionCodePodSpecPermanentError. - NewTaskAttemptCompletionStatus(diag, nil)) - return nil - } else { + klog.Info(logPfx + result.Diagnostics) + // Ensure pod is deleted in remote to avoid managed pod leak after + // TaskAttemptCompleted. + _, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, true) + if err != nil { return err } + + c.completeTaskAttempt(f, taskRoleName, taskIndex, true, + result.CodeInfo.Code. + NewTaskAttemptCompletionStatus(result.Diagnostics, nil)) + return nil } taskStatus.AttemptStatus.PodUID = &pod.UID diff --git a/pkg/internal/utils.go b/pkg/internal/utils.go index 381703d5..a3c05ea6 100644 --- a/pkg/internal/utils.go +++ b/pkg/internal/utils.go @@ -38,7 +38,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/klog" "reflect" - "strings" "time" ) @@ -222,11 +221,3 @@ func GetPodDeletionStartTime(pod *core.Pod) *meta.Time { } return common.PtrTime(meta.NewTime(pod.DeletionTimestamp.Add(-gracePeriod))) } - -func IsPodSpecPermanentError(apiErr error) bool { - return apiErrors.IsBadRequest(apiErr) || - apiErrors.IsInvalid(apiErr) || - apiErrors.IsRequestEntityTooLargeError(apiErr) || - (apiErrors.IsForbidden(apiErr) && - !strings.Contains(apiErr.Error(), "exceeded quota")) -}