Skip to content
This repository was archived by the owner on Nov 16, 2023. It is now read-only.

Treat invalid Pod caused by network error as PodCreationUnknownError #61

Merged
merged 2 commits into from
Aug 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 82 additions & 11 deletions pkg/apis/frameworkcontroller/v1/completion.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import (
"fmt"
"github.com/microsoft/frameworkcontroller/pkg/common"
core "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/net"
"reflect"
"regexp"
"strings"
"time"
)

Expand Down Expand Up @@ -63,17 +66,19 @@ const (

// [-999, -1]: Predefined Framework Error
// -1XX: Transient Error
CompletionCodeConfigMapExternalDeleted CompletionCode = -100
CompletionCodePodExternalDeleted CompletionCode = -101
CompletionCodeConfigMapCreationTimeout CompletionCode = -110
CompletionCodePodCreationTimeout CompletionCode = -111
CompletionCodeConfigMapExternalDeleted CompletionCode = -100
CompletionCodePodExternalDeleted CompletionCode = -101
CompletionCodeConfigMapLocalCacheCreationTimeout CompletionCode = -110
CompletionCodePodLocalCacheCreationTimeout CompletionCode = -111
CompletionCodePodCreationTransientError CompletionCode = -120
// -2XX: Permanent Error
CompletionCodePodSpecPermanentError CompletionCode = -200
CompletionCodePodCreationPermanentError CompletionCode = -200
CompletionCodeStopFrameworkRequested CompletionCode = -210
CompletionCodeFrameworkAttemptCompletion CompletionCode = -220
CompletionCodeDeleteTaskRequested CompletionCode = -230
// -3XX: Unknown Error
CompletionCodePodFailedWithoutFailedContainer CompletionCode = -300
CompletionCodePodCreationUnknownError CompletionCode = -310
)

var completionCodeInfoList = []*CompletionCodeInfo{}
Expand Down Expand Up @@ -152,20 +157,28 @@ func initCompletionCodeInfos() {
[]CompletionTypeAttribute{CompletionTypeAttributeTransient}},
},
{
Code: CompletionCodeConfigMapCreationTimeout.Ptr(),
Phrase: "ConfigMapCreationTimeout",
Code: CompletionCodeConfigMapLocalCacheCreationTimeout.Ptr(),
Phrase: "ConfigMapLocalCacheCreationTimeout",
Type: CompletionType{CompletionTypeNameFailed,
[]CompletionTypeAttribute{CompletionTypeAttributeTransient}},
},
{
Code: CompletionCodePodCreationTimeout.Ptr(),
Phrase: "PodCreationTimeout",
Code: CompletionCodePodLocalCacheCreationTimeout.Ptr(),
Phrase: "PodLocalCacheCreationTimeout",
Type: CompletionType{CompletionTypeNameFailed,
[]CompletionTypeAttribute{CompletionTypeAttributeTransient}},
},
{
Code: CompletionCodePodSpecPermanentError.Ptr(),
Phrase: "PodSpecPermanentError",
// Only used to distinguish with others, and will never be used to complete
// a TaskAttempt.
Code: CompletionCodePodCreationTransientError.Ptr(),
Phrase: "PodCreationTransientError",
Type: CompletionType{CompletionTypeNameFailed,
[]CompletionTypeAttribute{CompletionTypeAttributeTransient}},
},
{
Code: CompletionCodePodCreationPermanentError.Ptr(),
Phrase: "PodCreationPermanentError",
Type: CompletionType{CompletionTypeNameFailed,
[]CompletionTypeAttribute{CompletionTypeAttributePermanent}},
},
Expand Down Expand Up @@ -193,6 +206,12 @@ func initCompletionCodeInfos() {
Type: CompletionType{CompletionTypeNameFailed,
[]CompletionTypeAttribute{}},
},
{
Code: CompletionCodePodCreationUnknownError.Ptr(),
Phrase: "PodCreationUnknownError",
Type: CompletionType{CompletionTypeNameFailed,
[]CompletionTypeAttribute{}},
},
})
}

Expand Down Expand Up @@ -238,6 +257,9 @@ type MatchedContainer struct {
}

// Match ANY CompletionCodeInfo
// The returned CompletionCode may not within CompletionCodeInfos, such as for
// the ContainerUnrecognizedFailed, so it should not be used to
// NewTaskAttemptCompletionStatus or NewFrameworkAttemptCompletionStatus later.
func MatchCompletionCodeInfos(pod *core.Pod) PodMatchResult {
for _, codeInfo := range completionCodeInfoList {
for _, podPattern := range codeInfo.PodPatterns {
Expand Down Expand Up @@ -404,6 +426,55 @@ func generatePodUnmatchedResult(pod *core.Pod) PodMatchResult {
}
}

// The returned CompletionCode must be within CompletionCodeInfos.
func ClassifyPodCreationError(apiErr error) PodMatchResult {
diag := fmt.Sprintf("Failed to create Pod: %v", common.ToJson(apiErr))

// Treat Platform Error as Transient Error, such as Pod decoding error.
if strings.Contains(apiErr.Error(), "object provided is unrecognized") ||
strings.Contains(apiErr.Error(), "exceeded quota") {
return PodMatchResult{
CodeInfo: completionCodeInfoMap[CompletionCodePodCreationTransientError],
Diagnostics: diag,
}
}

// Treat General Framework Error as Unknown Error for safety.
if apiErrors.IsBadRequest(apiErr) ||
apiErrors.IsForbidden(apiErr) {
return PodMatchResult{
CodeInfo: completionCodeInfoMap[CompletionCodePodCreationUnknownError],
Diagnostics: diag,
}
}

// Treat Permanent Framework Error as Permanent Error only if it must be
// Permanent Error.
if apiErrors.IsInvalid(apiErr) ||
apiErrors.IsRequestEntityTooLargeError(apiErr) {
// TODO: Also check net.IsConnectionRefused
if net.IsConnectionReset(apiErr) || net.IsProbableEOF(apiErr) {
// The ApiServer Permanent Error may be caused by Network Transient Error,
// so treat it as Unknown Error for safety.
return PodMatchResult{
CodeInfo: completionCodeInfoMap[CompletionCodePodCreationUnknownError],
Diagnostics: diag,
}
} else {
return PodMatchResult{
CodeInfo: completionCodeInfoMap[CompletionCodePodCreationPermanentError],
Diagnostics: diag,
}
}
}

// Treat all other errors as Transient Error, including all non-APIStatus errors.
return PodMatchResult{
CodeInfo: completionCodeInfoMap[CompletionCodePodCreationTransientError],
Diagnostics: diag,
}
}

///////////////////////////////////////////////////////////////////////////////////////
// Completion Utils
///////////////////////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/frameworkcontroller/v1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Config struct {
// Generally, it should be proportional to the cluster Framework workload, and within the ApiServer
// serving capacity/limit such as the --max-mutating-requests-inflight.
KubeClientQps *float32 `yaml:"kubeClientQps"`
KubeClientBurst *int32 `yaml:"kubeClientBurst"`
KubeClientBurst *int32 `yaml:"kubeClientBurst"`

// Number of concurrent workers to process each different Frameworks.
// Generally, it should be proportional to the above rate limits of requests.
Expand Down
18 changes: 10 additions & 8 deletions pkg/apis/frameworkcontroller/v1/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,17 @@ func NewCompletedTaskTriggeredCompletionStatus(
"conditions in FrameworkAttemptCompletionPolicy have ever been triggered",
completedTaskCount, totalTaskCount)
if triggerTaskStatus == nil {
return CompletionCodeSucceeded.NewFrameworkAttemptCompletionStatus(diag, nil)
return CompletionCodeSucceeded.
NewFrameworkAttemptCompletionStatus(diag, nil)
} else {
return CompletionCodeSucceeded.NewFrameworkAttemptCompletionStatus(diag,
&CompletionPolicyTriggerStatus{
Message: diag,
TaskRoleName: triggerTaskRoleName,
TaskIndex: triggerTaskStatus.Index,
},
)
return CompletionCodeSucceeded.
NewFrameworkAttemptCompletionStatus(diag,
&CompletionPolicyTriggerStatus{
Message: diag,
TaskRoleName: triggerTaskRoleName,
TaskIndex: triggerTaskStatus.Index,
},
)
}
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"encoding/json"
"flag"
"fmt"
errorWrap "github.com/pkg/errors"
"gopkg.in/yaml.v2"
"io/ioutil"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -287,3 +288,12 @@ func Decompress(compressedBytes []byte) (string, error) {
}
}
}

func GetErrorCause(err error) error {
causeErr := errorWrap.Cause(err)
if causeErr == nil {
return err
} else {
return causeErr
}
}
43 changes: 22 additions & 21 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1053,7 +1053,7 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) {
"ConfigMap does not appear in the local cache within timeout %v, "+
"so consider it was deleted and explicitly delete it",
common.SecToDuration(c.cConfig.ObjectLocalCacheCreationTimeoutSec))
code = ci.CompletionCodeConfigMapCreationTimeout
code = ci.CompletionCodeConfigMapLocalCacheCreationTimeout
klog.Warning(logPfx + diag)
}

Expand Down Expand Up @@ -1670,7 +1670,7 @@ func (c *FrameworkController) syncTaskState(
"Pod does not appear in the local cache within timeout %v, "+
"so consider it was deleted and explicitly delete it",
common.SecToDuration(c.cConfig.ObjectLocalCacheCreationTimeoutSec))
code = ci.CompletionCodePodCreationTimeout
code = ci.CompletionCodePodLocalCacheCreationTimeout
klog.Warning(logPfx + diag)
}

Expand Down Expand Up @@ -1752,8 +1752,9 @@ func (c *FrameworkController) syncTaskState(
diag := fmt.Sprintf("Pod succeeded")
klog.Info(logPfx + diag)
c.completeTaskAttempt(f, taskRoleName, taskIndex, false,
ci.CompletionCodeSucceeded.NewTaskAttemptCompletionStatus(
diag, ci.ExtractPodCompletionStatus(pod)))
ci.CompletionCodeSucceeded.
NewTaskAttemptCompletionStatus(
diag, ci.ExtractPodCompletionStatus(pod)))
return nil
} else if pod.Status.Phase == core.PodFailed {
result := ci.MatchCompletionCodeInfos(pod)
Expand Down Expand Up @@ -1910,26 +1911,26 @@ func (c *FrameworkController) syncTaskState(
// createTaskAttempt
pod, err = c.createPod(f, cm, taskRoleName, taskIndex)
if err != nil {
apiErr := errorWrap.Cause(err)
if internal.IsPodSpecPermanentError(apiErr) {
// Should be Framework Error instead of Platform Transient Error.
diag := fmt.Sprintf("Failed to create Pod: %v", common.ToJson(apiErr))
klog.Info(logPfx + diag)

// Ensure pod is deleted in remote to avoid managed pod leak after
// TaskAttemptCompleted.
_, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, true)
if err != nil {
return err
}
apiErr := common.GetErrorCause(err)
result := ci.ClassifyPodCreationError(apiErr)
if *result.CodeInfo.Code == ci.CompletionCodePodCreationTransientError {
// Do not complete the TaskAttempt, as generally, user does not need to
// aware the Transient Error during Pod creation.
return err
}

c.completeTaskAttempt(f, taskRoleName, taskIndex, true,
ci.CompletionCodePodSpecPermanentError.
NewTaskAttemptCompletionStatus(diag, nil))
return nil
} else {
klog.Info(logPfx + result.Diagnostics)
// Ensure pod is deleted in remote to avoid managed pod leak after
// TaskAttemptCompleted.
_, err = c.getOrCleanupPod(f, cm, taskRoleName, taskIndex, true)
if err != nil {
return err
}

c.completeTaskAttempt(f, taskRoleName, taskIndex, true,
result.CodeInfo.Code.
NewTaskAttemptCompletionStatus(result.Diagnostics, nil))
return nil
}

taskStatus.AttemptStatus.PodUID = &pod.UID
Expand Down
9 changes: 0 additions & 9 deletions pkg/internal/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"reflect"
"strings"
"time"
)

Expand Down Expand Up @@ -222,11 +221,3 @@ func GetPodDeletionStartTime(pod *core.Pod) *meta.Time {
}
return common.PtrTime(meta.NewTime(pod.DeletionTimestamp.Add(-gracePeriod)))
}

func IsPodSpecPermanentError(apiErr error) bool {
return apiErrors.IsBadRequest(apiErr) ||
apiErrors.IsInvalid(apiErr) ||
apiErrors.IsRequestEntityTooLargeError(apiErr) ||
(apiErrors.IsForbidden(apiErr) &&
!strings.Contains(apiErr.Error(), "exceeded quota"))
}