diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index ad766ffb30c..d0625b6deef 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -2,6 +2,7 @@ package s3 import ( "context" + "fmt" "io" "time" @@ -102,7 +103,7 @@ type BucketWithRetries struct { retryMaxBackoff time.Duration } -func (b *BucketWithRetries) retry(ctx context.Context, f func() error) error { +func (b *BucketWithRetries) retry(ctx context.Context, f func() error, operationInfo string) error { var lastErr error retries := backoff.New(ctx, backoff.Config{ MinBackoff: b.retryMinBackoff, @@ -120,7 +121,8 @@ func (b *BucketWithRetries) retry(ctx context.Context, f func() error) error { retries.Wait() } if lastErr != nil { - level.Error(b.logger).Log("msg", "bucket operation fail after retries", "err", lastErr) + level.Error(b.logger).Log("msg", "bucket operation fail after retries", "err", lastErr, "operation", operationInfo) + return lastErr } return nil } @@ -132,14 +134,14 @@ func (b *BucketWithRetries) Name() string { func (b *BucketWithRetries) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { return b.retry(ctx, func() error { return b.bucket.Iter(ctx, dir, f, options...) - }) + }, fmt.Sprintf("Iter %s", dir)) } func (b *BucketWithRetries) Get(ctx context.Context, name string) (reader io.ReadCloser, err error) { err = b.retry(ctx, func() error { reader, err = b.bucket.Get(ctx, name) return err - }) + }, fmt.Sprintf("Get %s", name)) return } @@ -147,7 +149,7 @@ func (b *BucketWithRetries) GetRange(ctx context.Context, name string, off, leng err = b.retry(ctx, func() error { closer, err = b.bucket.GetRange(ctx, name, off, length) return err - }) + }, fmt.Sprintf("GetRange %s (off: %d, length: %d)", name, off, length)) return } @@ -155,7 +157,7 @@ func (b *BucketWithRetries) Exists(ctx context.Context, name string) (exists boo err = b.retry(ctx, func() error { exists, err = b.bucket.Exists(ctx, name) return err - }) + }, fmt.Sprintf("Exists %s", name)) return } @@ -171,21 +173,21 @@ func (b *BucketWithRetries) Upload(ctx context.Context, name string, r io.Reader return err } return b.bucket.Upload(ctx, name, rs) - }) + }, fmt.Sprintf("Upload %s", name)) } func (b *BucketWithRetries) Attributes(ctx context.Context, name string) (attributes objstore.ObjectAttributes, err error) { err = b.retry(ctx, func() error { attributes, err = b.bucket.Attributes(ctx, name) return err - }) + }, fmt.Sprintf("Attributes %s", name)) return } func (b *BucketWithRetries) Delete(ctx context.Context, name string) error { return b.retry(ctx, func() error { return b.bucket.Delete(ctx, name) - }) + }, fmt.Sprintf("Delete %s", name)) } func (b *BucketWithRetries) IsObjNotFoundErr(err error) bool { diff --git a/pkg/storage/bucket/s3/bucket_client_test.go b/pkg/storage/bucket/s3/bucket_client_test.go index 38e020426bb..c62f9107093 100644 --- a/pkg/storage/bucket/s3/bucket_client_test.go +++ b/pkg/storage/bucket/s3/bucket_client_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" ) @@ -19,6 +20,7 @@ func TestBucketWithRetries_UploadSeekable(t *testing.T) { FailCount: 3, } b := BucketWithRetries{ + logger: log.NewNopLogger(), bucket: &m, operationRetries: 5, retryMinBackoff: 10 * time.Millisecond, @@ -39,6 +41,7 @@ func TestBucketWithRetries_UploadNonSeekable(t *testing.T) { FailCount: maxFailCount, } b := BucketWithRetries{ + logger: log.NewNopLogger(), bucket: &m, operationRetries: 5, retryMinBackoff: 10 * time.Millisecond, @@ -51,6 +54,25 @@ func TestBucketWithRetries_UploadNonSeekable(t *testing.T) { require.Equal(t, maxFailCount, m.FailCount) } +func TestBucketWithRetries_UploadFailed(t *testing.T) { + t.Parallel() + + m := mockBucket{ + FailCount: 6, + } + b := BucketWithRetries{ + logger: log.NewNopLogger(), + bucket: &m, + operationRetries: 5, + retryMinBackoff: 10 * time.Millisecond, + retryMaxBackoff: time.Second, + } + + input := []byte("test input") + err := b.Upload(context.Background(), "dummy", bytes.NewReader(input)) + require.ErrorContains(t, err, "failed upload: ") +} + type fakeReader struct { }