From b640464ed744b79d6b3825ed901ca50f57522099 Mon Sep 17 00:00:00 2001 From: Yuqi Wang Date: Wed, 31 Jul 2019 14:44:25 +0800 Subject: [PATCH] Support LogObjectSnapshot: Expose Framework and Pod History --- pkg/apis/frameworkcontroller/v1/config.go | 41 ++++++++++++ pkg/apis/frameworkcontroller/v1/funcs.go | 8 +++ pkg/controller/controller.go | 77 +++++++++++++++++------ 3 files changed, 106 insertions(+), 20 deletions(-) diff --git a/pkg/apis/frameworkcontroller/v1/config.go b/pkg/apis/frameworkcontroller/v1/config.go index 44bcfdaf..becd9108 100644 --- a/pkg/apis/frameworkcontroller/v1/config.go +++ b/pkg/apis/frameworkcontroller/v1/config.go @@ -76,6 +76,35 @@ type Config struct { // all-or-nothing fashion in order to perform any useful work. FrameworkMinRetryDelaySecForTransientConflictFailed *int64 `yaml:"frameworkMinRetryDelaySecForTransientConflictFailed"` FrameworkMaxRetryDelaySecForTransientConflictFailed *int64 `yaml:"frameworkMaxRetryDelaySecForTransientConflictFailed"` + + // Specify when to log the snapshot of which managed object. + // This enables external systems to collect and process the history snapshots, + // such as persistence, metrics conversion, visualization, alerting, acting, + // analysis, etc. + // Notes: + // 1. The snapshot is logged to stderr. + // 2. Check GetFrameworkSnapshotLogTail and GetPodSnapshotLogTail to see how + // to extract the snapshot from stderr. + // 3. The same snapshot may be logged more than once in some rare cases, so + // external systems may need to deduplicate them by object.ResourceVersion. + // 4. The snapshot triggered by deletion may be missed to log during the + // FrameworkController downtime. + LogObjectSnapshot LogObjectSnapshot `yaml:"logObjectSnapshot"` +} + +type LogObjectSnapshot struct { + Framework LogFrameworkSnapshot `yaml:"framework"` + Pod LogPodSnapshot `yaml:"pod"` +} + +type LogFrameworkSnapshot struct { + OnTaskRetry *bool `yaml:"onTaskRetry"` + OnFrameworkRetry *bool `yaml:"onFrameworkRetry"` + OnFrameworkDeletion *bool `yaml:"onFrameworkDeletion"` +} + +type LogPodSnapshot struct { + OnPodDeletion *bool `yaml:"onPodDeletion"` } func NewConfig() *Config { @@ -107,6 +136,18 @@ func NewConfig() *Config { if c.FrameworkMaxRetryDelaySecForTransientConflictFailed == nil { c.FrameworkMaxRetryDelaySecForTransientConflictFailed = common.PtrInt64(15 * 60) } + if c.LogObjectSnapshot.Framework.OnTaskRetry == nil { + c.LogObjectSnapshot.Framework.OnTaskRetry = common.PtrBool(true) + } + if c.LogObjectSnapshot.Framework.OnFrameworkRetry == nil { + c.LogObjectSnapshot.Framework.OnFrameworkRetry = common.PtrBool(true) + } + if c.LogObjectSnapshot.Framework.OnFrameworkDeletion == nil { + c.LogObjectSnapshot.Framework.OnFrameworkDeletion = common.PtrBool(true) + } + if c.LogObjectSnapshot.Pod.OnPodDeletion == nil { + c.LogObjectSnapshot.Pod.OnPodDeletion = common.PtrBool(true) + } // Validation errPrefix := "Config Validation Failed: " diff --git a/pkg/apis/frameworkcontroller/v1/funcs.go b/pkg/apis/frameworkcontroller/v1/funcs.go index 8f2621a2..ca18cbac 100644 --- a/pkg/apis/frameworkcontroller/v1/funcs.go +++ b/pkg/apis/frameworkcontroller/v1/funcs.go @@ -90,6 +90,14 @@ func SplitTaskAttemptInstanceUID(taskAttemptInstanceUID *types.UID) ( return int32(i), common.PtrUIDStr(parts[1]) } +func GetFrameworkSnapshotLogTail(podPtr interface{}) string { + return "FrameworkSnapshot: " + common.ToJson(podPtr) +} + +func GetPodSnapshotLogTail(frameworkPtr interface{}) string { + return "PodSnapshot: " + common.ToJson(frameworkPtr) +} + /////////////////////////////////////////////////////////////////////////////////////// // Interfaces /////////////////////////////////////////////////////////////////////////////////////// diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index e276119a..e8099192 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -245,42 +245,54 @@ func NewFrameworkController() *FrameworkController { fInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - c.enqueueFrameworkObj(obj, "Framework Added") + c.enqueueFrameworkObj(obj, "Framework Added", nil) }, UpdateFunc: func(oldObj, newObj interface{}) { // FrameworkController only cares about Framework.Spec update oldF := oldObj.(*ci.Framework) newF := newObj.(*ci.Framework) if !reflect.DeepEqual(oldF.Spec, newF.Spec) { - c.enqueueFrameworkObj(newObj, "Framework.Spec Updated") + c.enqueueFrameworkObj(newObj, "Framework.Spec Updated", nil) } }, DeleteFunc: func(obj interface{}) { - c.enqueueFrameworkObj(obj, "Framework Deleted") + c.enqueueFrameworkObj(obj, "Framework Deleted", func() string { + if *c.cConfig.LogObjectSnapshot.Framework.OnFrameworkDeletion { + return ": " + ci.GetFrameworkSnapshotLogTail(obj) + } else { + return "" + } + }) }, }) cmInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - c.enqueueFrameworkConfigMapObj(obj, "Framework ConfigMap Added") + c.enqueueFrameworkConfigMapObj(obj, "Framework ConfigMap Added", nil) }, UpdateFunc: func(oldObj, newObj interface{}) { - c.enqueueFrameworkConfigMapObj(newObj, "Framework ConfigMap Updated") + c.enqueueFrameworkConfigMapObj(newObj, "Framework ConfigMap Updated", nil) }, DeleteFunc: func(obj interface{}) { - c.enqueueFrameworkConfigMapObj(obj, "Framework ConfigMap Deleted") + c.enqueueFrameworkConfigMapObj(obj, "Framework ConfigMap Deleted", nil) }, }) podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - c.enqueueFrameworkPodObj(obj, "Framework Pod Added") + c.enqueueFrameworkPodObj(obj, "Framework Pod Added", nil) }, UpdateFunc: func(oldObj, newObj interface{}) { - c.enqueueFrameworkPodObj(newObj, "Framework Pod Updated") + c.enqueueFrameworkPodObj(newObj, "Framework Pod Updated", nil) }, DeleteFunc: func(obj interface{}) { - c.enqueueFrameworkPodObj(obj, "Framework Pod Deleted") + c.enqueueFrameworkPodObj(obj, "Framework Pod Deleted", func() string { + if *c.cConfig.LogObjectSnapshot.Pod.OnPodDeletion { + return ": " + ci.GetPodSnapshotLogTail(obj) + } else { + return "" + } + }) }, }) @@ -288,7 +300,8 @@ func NewFrameworkController() *FrameworkController { } // obj could be *ci.Framework or cache.DeletedFinalStateUnknown. -func (c *FrameworkController) enqueueFrameworkObj(obj interface{}, msg string) { +func (c *FrameworkController) enqueueFrameworkObj( + obj interface{}, logSfx string, logTailFunc func() string) { key, err := internal.GetKey(obj) if err != nil { klog.Errorf("Failed to get key for obj %#v, skip to enqueue: %v", obj, err) @@ -302,23 +315,29 @@ func (c *FrameworkController) enqueueFrameworkObj(obj interface{}, msg string) { } c.fQueue.Add(key) - klog.Infof("[%v]: enqueueFrameworkObj: %v", key, msg) + + if logTailFunc != nil { + logSfx += logTailFunc() + } + klog.Infof("[%v]: enqueueFrameworkObj: %v", key, logSfx) } // obj could be *core.ConfigMap or cache.DeletedFinalStateUnknown. -func (c *FrameworkController) enqueueFrameworkConfigMapObj(obj interface{}, msg string) { +func (c *FrameworkController) enqueueFrameworkConfigMapObj( + obj interface{}, logSfx string, logTailFunc func() string) { if cm := internal.ToConfigMap(obj); cm != nil { if f := c.getConfigMapOwner(cm); f != nil { - c.enqueueFrameworkObj(f, msg+": "+cm.Name) + c.enqueueFrameworkObj(f, logSfx+": "+cm.Name, logTailFunc) } } } // obj could be *core.Pod or cache.DeletedFinalStateUnknown. -func (c *FrameworkController) enqueueFrameworkPodObj(obj interface{}, msg string) { +func (c *FrameworkController) enqueueFrameworkPodObj( + obj interface{}, logSfx string, logTailFunc func() string) { if pod := internal.ToPod(obj); pod != nil { if cm := c.getPodOwner(pod); cm != nil { - c.enqueueFrameworkConfigMapObj(cm, msg+": "+pod.Name) + c.enqueueFrameworkConfigMapObj(cm, logSfx+": "+pod.Name, logTailFunc) } } } @@ -393,6 +412,8 @@ func (c *FrameworkController) Run(stopCh <-chan struct{}) { c.cConfig.CRDEstablishedCheckIntervalSec, c.cConfig.CRDEstablishedCheckTimeoutSec) + // The recovery order is not important, since all Frameworks will be enqueued + // to sync in any case. go c.fInformer.Run(stopCh) go c.cmInformer.Run(stopCh) go c.podInformer.Run(stopCh) @@ -669,9 +690,9 @@ func (c *FrameworkController) enqueueTaskRetryDelayTimeoutCheck( return true } -func (c *FrameworkController) enqueueFramework(f *ci.Framework, msg string) { +func (c *FrameworkController) enqueueFramework(f *ci.Framework, logSfx string) { c.fQueue.Add(f.Key()) - klog.Infof("[%v]: enqueueFramework: %v", f.Key(), msg) + klog.Infof("[%v]: enqueueFramework: %v", f.Key(), logSfx) } func (c *FrameworkController) syncFrameworkStatus(f *ci.Framework) error { @@ -847,6 +868,14 @@ func (c *FrameworkController) syncFrameworkState(f *ci.Framework) (err error) { // retryFramework klog.Infof(logPfx + "Retry Framework") + + // The completed FrameworkAttempt has been persisted, so it is safe to also + // expose it as one history snapshot. + if *c.cConfig.LogObjectSnapshot.Framework.OnFrameworkRetry { + klog.Infof(logPfx+ + "Framework will be retried: %v", ci.GetFrameworkSnapshotLogTail(f)) + } + f.Status.RetryPolicyStatus.TotalRetriedCount++ if retryDecision.IsAccountable { f.Status.RetryPolicyStatus.AccountableRetriedCount++ @@ -1232,9 +1261,9 @@ func (c *FrameworkController) syncTaskState( terminated := containerStatus.State.Terminated if terminated != nil && terminated.ExitCode != 0 { allContainerDiags = append(allContainerDiags, fmt.Sprintf( - "[Container %v, ExitCode: %v, Reason: %v, Message: %v]", - containerStatus.Name, terminated.ExitCode, terminated.Reason, - terminated.Message)) + "[Container: %v, ExitCode: %v, Signal: %v, Reason: %v, Message: %v]", + containerStatus.Name, terminated.ExitCode, terminated.Signal, + terminated.Reason, common.ToJson(terminated.Message))) if lastContainerExitCode == nil || lastContainerCompletionTime.Before(terminated.FinishedAt.Time) { @@ -1323,6 +1352,14 @@ func (c *FrameworkController) syncTaskState( // retryTask klog.Infof(logPfx + "Retry Task") + + // The completed TaskAttempt has been persisted, so it is safe to also + // expose it as one history snapshot. + if *c.cConfig.LogObjectSnapshot.Framework.OnTaskRetry { + klog.Infof(logPfx+ + "Task will be retried: %v", ci.GetFrameworkSnapshotLogTail(f)) + } + taskStatus.RetryPolicyStatus.TotalRetriedCount++ if retryDecision.IsAccountable { taskStatus.RetryPolicyStatus.AccountableRetriedCount++