diff --git a/CHANGELOG.md b/CHANGELOG.md index d51a4ef054b..8be89ba1f61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * [ENHANCEMENT] Push reduce one hash operation of Labels. #4945 #5114 * [ENHANCEMENT] Alertmanager: Added `-alertmanager.enabled-tenants` and `-alertmanager.disabled-tenants` to explicitly enable or disable alertmanager for specific tenants. #5116 * [ENHANCEMENT] Upgraded Docker base images to `alpine:3.17`. #5132 +* [ENHANCEMENT] Add retry logic to S3 bucket client. #5135 * [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978 * [FEATURE] Ingester: Add active series to all_user_stats page. #4972 * [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000 diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index 89d10101583..68a8ee714f2 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -1,12 +1,22 @@ package s3 import ( + "context" + "io" + "time" + "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/s3" + + "github.com/cortexproject/cortex/pkg/util/backoff" ) +var defaultOperationRetries = 5 +var defaultRetryMinBackoff = 5 * time.Second +var defaultRetryMaxBackoff = 1 * time.Minute + // NewBucketClient creates a new S3 bucket client func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) { s3Cfg, err := newS3Config(cfg) @@ -14,7 +24,16 @@ func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucke return nil, err } - return s3.NewBucketWithConfig(logger, s3Cfg, name) + bucket, err := s3.NewBucketWithConfig(logger, s3Cfg, name) + if err != nil { + return nil, err + } + return &BucketWithRetries{ + bucket: bucket, + operationRetries: defaultOperationRetries, + retryMinBackoff: defaultRetryMinBackoff, + retryMaxBackoff: defaultRetryMaxBackoff, + }, nil } // NewBucketReaderClient creates a new S3 bucket client @@ -24,7 +43,16 @@ func NewBucketReaderClient(cfg Config, name string, logger log.Logger) (objstore return nil, err } - return s3.NewBucketWithConfig(logger, s3Cfg, name) + bucket, err := s3.NewBucketWithConfig(logger, s3Cfg, name) + if err != nil { + return nil, err + } + return &BucketWithRetries{ + bucket: bucket, + operationRetries: defaultOperationRetries, + retryMinBackoff: defaultRetryMinBackoff, + retryMaxBackoff: defaultRetryMaxBackoff, + }, nil } func newS3Config(cfg Config) (s3.Config, error) { @@ -62,3 +90,92 @@ func newS3Config(cfg Config) (s3.Config, error) { AWSSDKAuth: cfg.AccessKeyID == "", }, nil } + +type BucketWithRetries struct { + bucket objstore.Bucket + operationRetries int + retryMinBackoff time.Duration + retryMaxBackoff time.Duration +} + +func (b *BucketWithRetries) retry(ctx context.Context, f func() error) error { + var lastErr error + retries := backoff.New(ctx, backoff.Config{ + MinBackoff: b.retryMinBackoff, + MaxBackoff: b.retryMaxBackoff, + MaxRetries: b.operationRetries, + }) + for retries.Ongoing() { + lastErr = f() + if lastErr == nil { + return nil + } + if b.bucket.IsObjNotFoundErr(lastErr) { + return lastErr + } + retries.Wait() + } + return lastErr +} + +func (b *BucketWithRetries) Name() string { + return b.bucket.Name() +} + +func (b *BucketWithRetries) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { + return b.retry(ctx, func() error { + return b.bucket.Iter(ctx, dir, f, options...) + }) +} + +func (b *BucketWithRetries) Get(ctx context.Context, name string) (reader io.ReadCloser, err error) { + err = b.retry(ctx, func() error { + reader, err = b.bucket.Get(ctx, name) + return err + }) + return +} + +func (b *BucketWithRetries) GetRange(ctx context.Context, name string, off, length int64) (closer io.ReadCloser, err error) { + err = b.retry(ctx, func() error { + closer, err = b.bucket.GetRange(ctx, name, off, length) + return err + }) + return +} + +func (b *BucketWithRetries) Exists(ctx context.Context, name string) (exists bool, err error) { + err = b.retry(ctx, func() error { + exists, err = b.bucket.Exists(ctx, name) + return err + }) + return +} + +func (b *BucketWithRetries) Upload(ctx context.Context, name string, r io.Reader) error { + return b.retry(ctx, func() error { + return b.bucket.Upload(ctx, name, r) + }) +} + +func (b *BucketWithRetries) Attributes(ctx context.Context, name string) (attributes objstore.ObjectAttributes, err error) { + err = b.retry(ctx, func() error { + attributes, err = b.bucket.Attributes(ctx, name) + return err + }) + return +} + +func (b *BucketWithRetries) Delete(ctx context.Context, name string) error { + return b.retry(ctx, func() error { + return b.bucket.Delete(ctx, name) + }) +} + +func (b *BucketWithRetries) IsObjNotFoundErr(err error) bool { + return b.bucket.IsObjNotFoundErr(err) +} + +func (b *BucketWithRetries) Close() error { + return b.bucket.Close() +}