diff --git a/go.mod b/go.mod index 590c345675a..fc47d9b3736 100644 --- a/go.mod +++ b/go.mod @@ -53,7 +53,7 @@ require ( github.com/weaveworks/common v0.0.0-20220706100410-67d27ed40fae go.etcd.io/etcd/api/v3 v3.5.6 go.etcd.io/etcd/client/pkg/v3 v3.5.6 - go.etcd.io/etcd/client/v3 v3.5.4 + go.etcd.io/etcd/client/v3 v3.5.6 go.opentelemetry.io/contrib/propagators/aws v1.12.0 go.opentelemetry.io/otel v1.11.2 go.opentelemetry.io/otel/bridge/opentracing v1.11.1-0.20221013154440-84e28fd3bdb1 diff --git a/go.sum b/go.sum index 5d3536330d2..51c0be2d58e 100644 --- a/go.sum +++ b/go.sum @@ -1560,17 +1560,15 @@ go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.etcd.io/etcd v0.5.0-alpha.5.0.20200910180754-dd1b699fc489/go.mod h1:yVHk9ub3CSBatqGNg7GRmsnfLWtoW60w4eDYfh7vHDg= go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs= -go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= go.etcd.io/etcd/api/v3 v3.5.6 h1:Cy2qx3npLcYqTKqGJzMypnMv2tiRyifZJ17BlWIWA7A= go.etcd.io/etcd/api/v3 v3.5.6/go.mod h1:KFtNaxGDw4Yx/BA4iPPwevUTAuqcsPxzyX8PHydchN8= go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= -go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= go.etcd.io/etcd/client/pkg/v3 v3.5.6 h1:TXQWYceBKqLp4sa87rcPs11SXxUA/mHwH975v+BDvLU= go.etcd.io/etcd/client/pkg/v3 v3.5.6/go.mod h1:ggrwbk069qxpKPq8/FKkQ3Xq9y39kbFR4LnKszpRXeQ= go.etcd.io/etcd/client/v2 v2.305.0/go.mod h1:h9puh54ZTgAKtEbut2oe9P4L/oqKCVB6xsXlzd7alYQ= go.etcd.io/etcd/client/v3 v3.5.0/go.mod h1:AIKXXVX/DQXtfTEqBryiLTUXwON+GuvO6Z7lLS/oTh0= -go.etcd.io/etcd/client/v3 v3.5.4 h1:p83BUL3tAYS0OT/r0qglgc3M1JjhM0diV8DSWAhVXv4= -go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= +go.etcd.io/etcd/client/v3 v3.5.6 h1:coLs69PWCXE9G4FKquzNaSHrRyMCAXwF+IX1tAPVO8E= +go.etcd.io/etcd/client/v3 v3.5.6/go.mod h1:f6GRinRMCsFVv9Ht42EyY7nfsVGwrNO0WEoS2pRKzQk= go.etcd.io/etcd/pkg/v3 v3.5.0/go.mod h1:UzJGatBQ1lXChBkQF0AuAtkRQMYnHubxAEYIrC3MSsE= go.etcd.io/etcd/raft/v3 v3.5.0/go.mod h1:UFOHSIvO/nKwd4lhkwabrTD3cqW5yVyYYf/KlD00Szc= go.etcd.io/etcd/server/v3 v3.5.0/go.mod h1:3Ah5ruV+M+7RZr0+Y/5mNLwC+eQlni+mQmOVdCRJoS4= diff --git a/vendor/go.etcd.io/etcd/client/v3/client.go b/vendor/go.etcd.io/etcd/client/v3/client.go index 2990379ab9f..4dfae89c610 100644 --- a/vendor/go.etcd.io/etcd/client/v3/client.go +++ b/vendor/go.etcd.io/etcd/client/v3/client.go @@ -286,8 +286,7 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc. if err != nil { return nil, fmt.Errorf("failed to configure dialer: %v", err) } - if c.Username != "" && c.Password != "" { - c.authTokenBundle = credentials.NewBundle(credentials.Config{}) + if c.authTokenBundle != nil { opts = append(opts, grpc.WithPerRPCCredentials(c.authTokenBundle.PerRPCCredentials())) } @@ -383,6 +382,7 @@ func newClient(cfg *Config) (*Client, error) { if cfg.Username != "" && cfg.Password != "" { client.Username = cfg.Username client.Password = cfg.Password + client.authTokenBundle = credentials.NewBundle(credentials.Config{}) } if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 { if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize { diff --git a/vendor/go.etcd.io/etcd/client/v3/lease.go b/vendor/go.etcd.io/etcd/client/v3/lease.go index bd31e6b4a5b..9e1b704648b 100644 --- a/vendor/go.etcd.io/etcd/client/v3/lease.go +++ b/vendor/go.etcd.io/etcd/client/v3/lease.go @@ -397,7 +397,7 @@ func (l *lessor) closeRequireLeader() { } } -func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) { +func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (karesp *LeaseKeepAliveResponse, ferr error) { cctx, cancel := context.WithCancel(ctx) defer cancel() @@ -406,6 +406,15 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive return nil, toErr(ctx, err) } + defer func() { + if err := stream.CloseSend(); err != nil { + if ferr == nil { + ferr = toErr(ctx, err) + } + return + } + }() + err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)}) if err != nil { return nil, toErr(ctx, err) @@ -416,7 +425,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive return nil, toErr(ctx, rerr) } - karesp := &LeaseKeepAliveResponse{ + karesp = &LeaseKeepAliveResponse{ ResponseHeader: resp.GetHeader(), ID: LeaseID(resp.ID), TTL: resp.TTL, diff --git a/vendor/go.etcd.io/etcd/client/v3/logger.go b/vendor/go.etcd.io/etcd/client/v3/logger.go index ecac42730f6..eaa35f2d3ac 100644 --- a/vendor/go.etcd.io/etcd/client/v3/logger.go +++ b/vendor/go.etcd.io/etcd/client/v3/logger.go @@ -51,8 +51,8 @@ func etcdClientDebugLevel() zapcore.Level { return zapcore.InfoLevel } var l zapcore.Level - if err := l.Set(envLevel); err == nil { - log.Printf("Deprecated env ETCD_CLIENT_DEBUG value. Using default level: 'info'") + if err := l.Set(envLevel); err != nil { + log.Printf("Invalid value for environment variable 'ETCD_CLIENT_DEBUG'. Using default level: 'info'") return zapcore.InfoLevel } return l diff --git a/vendor/go.etcd.io/etcd/client/v3/maintenance.go b/vendor/go.etcd.io/etcd/client/v3/maintenance.go index dbea530e66a..a98b8ca51e1 100644 --- a/vendor/go.etcd.io/etcd/client/v3/maintenance.go +++ b/vendor/go.etcd.io/etcd/client/v3/maintenance.go @@ -92,6 +92,7 @@ func NewMaintenance(c *Client) Maintenance { err = c.getToken(dctx) cancel() if err != nil { + conn.Close() return nil, nil, fmt.Errorf("failed to getToken from endpoint %s with maintenance client: %v", endpoint, err) } cancel = func() { conn.Close() } diff --git a/vendor/go.etcd.io/etcd/client/v3/op.go b/vendor/go.etcd.io/etcd/client/v3/op.go index e8c0c1e08c9..5251906322c 100644 --- a/vendor/go.etcd.io/etcd/client/v3/op.go +++ b/vendor/go.etcd.io/etcd/client/v3/op.go @@ -389,12 +389,12 @@ func getPrefix(key []byte) []byte { // can return 'foo1', 'foo2', and so on. func WithPrefix() OpOption { return func(op *Op) { + op.isOptsWithPrefix = true if len(op.key) == 0 { op.key, op.end = []byte{0}, []byte{0} return } op.end = getPrefix(op.key) - op.isOptsWithPrefix = true } } diff --git a/vendor/go.etcd.io/etcd/client/v3/retry_interceptor.go b/vendor/go.etcd.io/etcd/client/v3/retry_interceptor.go index 04f157a1dcb..7dc5ddae0fd 100644 --- a/vendor/go.etcd.io/etcd/client/v3/retry_interceptor.go +++ b/vendor/go.etcd.io/etcd/client/v3/retry_interceptor.go @@ -74,13 +74,7 @@ func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClien continue } if c.shouldRefreshToken(lastErr, callOpts) { - // clear auth token before refreshing it. - // call c.Auth.Authenticate with an invalid token will always fail the auth check on the server-side, - // if the server has not apply the patch of pr #12165 (https://github.com/etcd-io/etcd/pull/12165) - // and a rpctypes.ErrInvalidAuthToken will recursively call c.getToken until system run out of resource. - c.authTokenBundle.UpdateAuthToken("") - - gterr := c.getToken(ctx) + gterr := c.refreshToken(ctx) if gterr != nil { c.GetLogger().Warn( "retrying of unary invoker failed to fetch new auth token", @@ -161,6 +155,24 @@ func (c *Client) shouldRefreshToken(err error, callOpts *options) bool { (rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken || rpctypes.Error(err) == rpctypes.ErrAuthOldRevision) } +func (c *Client) refreshToken(ctx context.Context) error { + if c.authTokenBundle == nil { + // c.authTokenBundle will be initialized only when + // c.Username != "" && c.Password != "". + // + // When users use the TLS CommonName based authentication, the + // authTokenBundle is always nil. But it's possible for the clients + // to get `rpctypes.ErrAuthOldRevision` response when the clients + // concurrently modify auth data (e.g, addUser, deleteUser etc.). + // In this case, there is no need to refresh the token; instead the + // clients just need to retry the operations (e.g. Put, Delete etc). + return nil + } + // clear auth token before refreshing it. + c.authTokenBundle.UpdateAuthToken("") + return c.getToken(ctx) +} + // type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a // proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish // a new ClientStream according to the retry policy. @@ -259,10 +271,7 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{} return true, err } if s.client.shouldRefreshToken(err, s.callOpts) { - // clear auth token to avoid failure when call getToken - s.client.authTokenBundle.UpdateAuthToken("") - - gterr := s.client.getToken(s.ctx) + gterr := s.client.refreshToken(s.ctx) if gterr != nil { s.client.lg.Warn("retry failed to fetch new auth token", zap.Error(gterr)) return false, err // return the original error for simplicity diff --git a/vendor/go.etcd.io/etcd/client/v3/watch.go b/vendor/go.etcd.io/etcd/client/v3/watch.go index b73925ba128..90f125ac466 100644 --- a/vendor/go.etcd.io/etcd/client/v3/watch.go +++ b/vendor/go.etcd.io/etcd/client/v3/watch.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "strings" "sync" "time" @@ -37,6 +38,13 @@ const ( EventTypePut = mvccpb.PUT closeSendErrTimeout = 250 * time.Millisecond + + // AutoWatchID is the watcher ID passed in WatchStream.Watch when no + // user-provided ID is available. If pass, an ID will automatically be assigned. + AutoWatchID = 0 + + // InvalidWatchID represents an invalid watch ID and prevents duplication with an existing watch. + InvalidWatchID = -1 ) type Event mvccpb.Event @@ -450,7 +458,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) { func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) { // check watch ID for backward compatibility (<= v3.3) - if resp.WatchId == -1 || (resp.Canceled && resp.CancelReason != "") { + if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") { w.closeErr = v3rpc.Error(errors.New(resp.CancelReason)) // failed; no channel close(ws.recvc) @@ -481,7 +489,7 @@ func (w *watchGrpcStream) closeSubstream(ws *watcherStream) { } else if ws.outc != nil { close(ws.outc) } - if ws.id != -1 { + if ws.id != InvalidWatchID { delete(w.substreams, ws.id) return } @@ -533,6 +541,7 @@ func (w *watchGrpcStream) run() { cancelSet := make(map[int64]struct{}) var cur *pb.WatchResponse + backoff := time.Millisecond for { select { // Watch() requested @@ -543,7 +552,7 @@ func (w *watchGrpcStream) run() { // TODO: pass custom watch ID? ws := &watcherStream{ initReq: *wreq, - id: -1, + id: InvalidWatchID, outc: outc, // unbuffered so resumes won't cause repeat events recvc: make(chan *WatchResponse), @@ -580,6 +589,26 @@ func (w *watchGrpcStream) run() { switch { case pbresp.Created: + cancelReasonError := v3rpc.Error(errors.New(pbresp.CancelReason)) + if shouldRetryWatch(cancelReasonError) { + var newErr error + if wc, newErr = w.newWatchClient(); newErr != nil { + w.lg.Error("failed to create a new watch client", zap.Error(newErr)) + return + } + + if len(w.resuming) != 0 { + if ws := w.resuming[0]; ws != nil { + if err := wc.Send(ws.initReq.toPB()); err != nil { + w.lg.Debug("error when sending request", zap.Error(err)) + } + } + } + + cur = nil + continue + } + // response to head of queue creation if len(w.resuming) != 0 { if ws := w.resuming[0]; ws != nil { @@ -649,6 +678,7 @@ func (w *watchGrpcStream) run() { closeErr = err return } + backoff = w.backoffIfUnavailable(backoff, err) if wc, closeErr = w.newWatchClient(); closeErr != nil { return } @@ -669,7 +699,7 @@ func (w *watchGrpcStream) run() { if len(w.substreams)+len(w.resuming) == 0 { return } - if ws.id != -1 { + if ws.id != InvalidWatchID { // client is closing an established watch; close it on the server proactively instead of waiting // to close when the next message arrives cancelSet[ws.id] = struct{}{} @@ -688,6 +718,11 @@ func (w *watchGrpcStream) run() { } } +func shouldRetryWatch(cancelReasonError error) bool { + return (strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCInvalidAuthToken.Error()) == 0) || + (strings.Compare(cancelReasonError.Error(), v3rpc.ErrGRPCAuthOldRevision.Error()) == 0) +} + // nextResume chooses the next resuming to register with the grpc stream. Abandoned // streams are marked as nil in the queue since the head must wait for its inflight registration. func (w *watchGrpcStream) nextResume() *watcherStream { @@ -716,9 +751,9 @@ func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool { cancelReason: pbresp.CancelReason, } - // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of -1 to + // watch IDs are zero indexed, so request notify watch responses are assigned a watch ID of InvalidWatchID to // indicate they should be broadcast. - if wr.IsProgressNotify() && pbresp.WatchId == -1 { + if wr.IsProgressNotify() && pbresp.WatchId == InvalidWatchID { return w.broadcastResponse(wr) } @@ -873,7 +908,7 @@ func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) { w.resumec = make(chan struct{}) w.joinSubstreams() for _, ws := range w.substreams { - ws.id = -1 + ws.id = InvalidWatchID w.resuming = append(w.resuming, ws) } // strip out nils, if any @@ -963,6 +998,21 @@ func (w *watchGrpcStream) joinSubstreams() { var maxBackoff = 100 * time.Millisecond +func (w *watchGrpcStream) backoffIfUnavailable(backoff time.Duration, err error) time.Duration { + if isUnavailableErr(w.ctx, err) { + // retry, but backoff + if backoff < maxBackoff { + // 25% backoff factor + backoff = backoff + backoff/4 + if backoff > maxBackoff { + backoff = maxBackoff + } + } + time.Sleep(backoff) + } + return backoff +} + // openWatchClient retries opening a watch client until success or halt. // manually retry in case "ws==nil && err==nil" // TODO: remove FailFast=false @@ -983,17 +1033,7 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) if isHaltErr(w.ctx, err) { return nil, v3rpc.Error(err) } - if isUnavailableErr(w.ctx, err) { - // retry, but backoff - if backoff < maxBackoff { - // 25% backoff factor - backoff = backoff + backoff/4 - if backoff > maxBackoff { - backoff = maxBackoff - } - } - time.Sleep(backoff) - } + backoff = w.backoffIfUnavailable(backoff, err) } return ws, nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index 10e90aa85d6..5f560dfd8cd 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -893,7 +893,7 @@ go.etcd.io/etcd/client/pkg/v3/systemd go.etcd.io/etcd/client/pkg/v3/tlsutil go.etcd.io/etcd/client/pkg/v3/transport go.etcd.io/etcd/client/pkg/v3/types -# go.etcd.io/etcd/client/v3 v3.5.4 +# go.etcd.io/etcd/client/v3 v3.5.6 ## explicit; go 1.16 go.etcd.io/etcd/client/v3 go.etcd.io/etcd/client/v3/credentials