Skip to content

🌱 add preWatch options for leader-elections runnables to reduce failover time when leader changes #2005

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,19 @@ func (c *Controller) InjectFunc(f inject.Func) error {
func (c *Controller) updateMetrics(reconcileTime time.Duration) {
ctrlmetrics.ReconcileTime.WithLabelValues(c.Name).Observe(reconcileTime.Seconds())
}

// implements PreWatch interface.
func (c *Controller) PreWatch() error {
c.mu.Lock()
defer c.mu.Unlock()
for _, watch := range c.startWatches {
preWatchSource, ok := watch.src.(source.PreWatch)
if !ok {
continue
}
if err := preWatchSource.PreWatch(); err != nil {
return err
}
}
return nil
}
13 changes: 13 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ type Options struct {
newResourceLock func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error)
newMetricsListener func(addr string) (net.Listener, error)
newHealthProbeListener func(addr string) (net.Listener, error)

// preWatch resources for leader-election runnables
PreWatch bool
}

// BaseContextFunc is a function used to provide a base Context to Runnables
Expand All @@ -316,6 +319,12 @@ type Runnable interface {
Start(context.Context) error
}

// runnable with preWatch resources
type RunnableWithPreWatch interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand why this would be needed. We already have the LeaderElectionRunnable interace to start thigns before the leader lock was acquired:

type LeaderElectionRunnable interface {

Moreover, we start the cache before we start anything else:

if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {

We don't star the Source itself which might lead to us not getting an informer which then might mean we don't actually start a watch. But changing that is probably a breaking change, as it will lead to increased memory consumption on non-leader instances.

Copy link
Author

@nicktming nicktming Oct 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alvaroaleman thanks for the discussion。

it has the obvious problem that it will lead to increased memory consumption on non-leader instances, so i provider one option for users who need this feature. users can enable this feature when it is helpful for them.

in our env(big clusters) and when leader changes, it takes too long time to list resources from apiserver, so we want to save failover time by memory consumption.

Runnable
PreWatch() error
}

// RunnableFunc implements Runnable using a function.
// It's very important that the given function block
// until it's done running.
Expand Down Expand Up @@ -407,6 +416,10 @@ func New(config *rest.Config, options Options) (Manager, error) {
errChan := make(chan error)
runnables := newRunnables(options.BaseContext, errChan)

if options.PreWatch {
runnables.LeaderElection.preWatch = true
}

return &controllerManager{
stopProcedureEngaged: pointer.Int64(0),
cluster: cluster,
Expand Down
15 changes: 15 additions & 0 deletions pkg/manager/runnable_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ type runnableGroup struct {
// wg is an internal sync.WaitGroup that allows us to properly stop
// and wait for all the runnables to finish before returning.
wg *sync.WaitGroup

// preWatch resources for leader-election runnables
preWatch bool
}

func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnableGroup {
Expand Down Expand Up @@ -223,6 +226,15 @@ func (r *runnableGroup) reconcile() {
}
}

func (r *runnableGroup) preWatchWithRunnable(rn Runnable) {
preWatchRunnable, ok := rn.(RunnableWithPreWatch)
if ok {
if err := preWatchRunnable.PreWatch(); err != nil {
r.errChan <- err
}
}
}

// Add should be able to be called before and after Start, but not after StopAndWait.
// Add should return an error when called during StopAndWait.
func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
Expand Down Expand Up @@ -251,6 +263,9 @@ func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {

// Check if we're already started.
if !r.started {
if r.preWatch {
go r.preWatchWithRunnable(readyRunnable.Runnable)
}
// Store the runnable in the internal if not.
r.startQueue = append(r.startQueue, readyRunnable)
r.start.Unlock()
Expand Down
39 changes: 39 additions & 0 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ type Source interface {
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

type PreWatch interface {
Source
PreWatch() error
}

// SyncingSource is a source that needs syncing prior to being usable. The controller
// will call its WaitForSync prior to starting workers.
type SyncingSource interface {
Expand Down Expand Up @@ -87,6 +92,10 @@ func (ks *kindWithCache) WaitForSync(ctx context.Context) error {
return ks.kind.WaitForSync(ctx)
}

func (ks *kindWithCache) PreWatch() error {
return ks.kind.PreWatch()
}

// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
type Kind struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
Expand All @@ -103,6 +112,36 @@ type Kind struct {

var _ SyncingSource = &Kind{}

// register the ks.type to cache
func (ks *Kind) PreWatch() error {
// Type should have been specified by the user.
if ks.Type == nil {
return fmt.Errorf("must specify Kind.Type")
}

// cache should have been injected before Start was called
if ks.cache == nil {
return fmt.Errorf("must call CacheInto on Kind before calling Start")
}

_, err := ks.cache.GetInformer(context.Background(), ks.Type)
if err != nil {
kindMatchErr := &meta.NoKindMatchError{}
switch {
case errors.As(err, &kindMatchErr):
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
case runtime.IsNotRegisteredError(err):
log.Error(err, "kind must be registered to the Scheme")
default:
log.Error(err, "failed to get informer from cache")
}
return err
}
log.Info("pre watch", "object", ks.Type)
return nil
}

// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
Expand Down