diff --git a/CHANGELOG.md b/CHANGELOG.md index 49ef11c3eb8..f682e74cd9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ * [BUGFIX] Query-frontend: Fix shardable instant queries do not produce sorted results for `sort`, `sort_desc`, `topk`, `bottomk` functions. #5148, #5170 * [BUGFIX] Querier: Fix `/api/v1/series` returning 5XX instead of 4XX when limits are hit. #5169 * [BUGFIX] Compactor: Fix issue that shuffle sharding planner return error if block is under visit by other compactor. #5188 +* [BUGFIX] Fix S3 BucketWithRetries upload empty content issue #5217 * [FEATURE] Alertmanager: Add support for time_intervals. #5102 ## 1.14.0 2022-12-02 diff --git a/pkg/storage/bucket/s3/bucket_client.go b/pkg/storage/bucket/s3/bucket_client.go index 68a8ee714f2..ad766ffb30c 100644 --- a/pkg/storage/bucket/s3/bucket_client.go +++ b/pkg/storage/bucket/s3/bucket_client.go @@ -6,6 +6,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus/common/model" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/providers/s3" @@ -29,6 +30,7 @@ func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucke return nil, err } return &BucketWithRetries{ + logger: logger, bucket: bucket, operationRetries: defaultOperationRetries, retryMinBackoff: defaultRetryMinBackoff, @@ -48,6 +50,7 @@ func NewBucketReaderClient(cfg Config, name string, logger log.Logger) (objstore return nil, err } return &BucketWithRetries{ + logger: logger, bucket: bucket, operationRetries: defaultOperationRetries, retryMinBackoff: defaultRetryMinBackoff, @@ -92,6 +95,7 @@ func newS3Config(cfg Config) (s3.Config, error) { } type BucketWithRetries struct { + logger log.Logger bucket objstore.Bucket operationRetries int retryMinBackoff time.Duration @@ -115,7 +119,10 @@ func (b *BucketWithRetries) retry(ctx context.Context, f func() error) error { } retries.Wait() } - return lastErr + if lastErr != nil { + level.Error(b.logger).Log("msg", "bucket operation fail after retries", "err", lastErr) + } + return nil } func (b *BucketWithRetries) Name() string { @@ -153,8 +160,17 @@ func (b *BucketWithRetries) Exists(ctx context.Context, name string) (exists boo } func (b *BucketWithRetries) Upload(ctx context.Context, name string, r io.Reader) error { - return b.retry(ctx, func() error { + rs, ok := r.(io.ReadSeeker) + if !ok { + // Skip retry if incoming Reader is not seekable to avoid + // loading entire content into memory return b.bucket.Upload(ctx, name, r) + } + return b.retry(ctx, func() error { + if _, err := rs.Seek(0, io.SeekStart); err != nil { + return err + } + return b.bucket.Upload(ctx, name, rs) }) } diff --git a/pkg/storage/bucket/s3/bucket_client_test.go b/pkg/storage/bucket/s3/bucket_client_test.go new file mode 100644 index 00000000000..38e020426bb --- /dev/null +++ b/pkg/storage/bucket/s3/bucket_client_test.go @@ -0,0 +1,123 @@ +package s3 + +import ( + "bytes" + "context" + "fmt" + "io" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" +) + +func TestBucketWithRetries_UploadSeekable(t *testing.T) { + t.Parallel() + + m := mockBucket{ + FailCount: 3, + } + b := BucketWithRetries{ + bucket: &m, + operationRetries: 5, + retryMinBackoff: 10 * time.Millisecond, + retryMaxBackoff: time.Second, + } + + input := []byte("test input") + err := b.Upload(context.Background(), "dummy", bytes.NewReader(input)) + require.NoError(t, err) + require.Equal(t, input, m.uploadedContent) +} + +func TestBucketWithRetries_UploadNonSeekable(t *testing.T) { + t.Parallel() + + maxFailCount := 3 + m := mockBucket{ + FailCount: maxFailCount, + } + b := BucketWithRetries{ + bucket: &m, + operationRetries: 5, + retryMinBackoff: 10 * time.Millisecond, + retryMaxBackoff: time.Second, + } + + input := &fakeReader{} + err := b.Upload(context.Background(), "dummy", input) + require.Errorf(t, err, "empty byte slice") + require.Equal(t, maxFailCount, m.FailCount) +} + +type fakeReader struct { +} + +func (f *fakeReader) Read(p []byte) (n int, err error) { + return 0, fmt.Errorf("empty byte slice") +} + +type mockBucket struct { + FailCount int + uploadedContent []byte +} + +// Upload mocks objstore.Bucket.Upload() +func (m *mockBucket) Upload(ctx context.Context, name string, r io.Reader) error { + var buf bytes.Buffer + if _, err := buf.ReadFrom(r); err != nil { + return err + } + m.uploadedContent = buf.Bytes() + if m.FailCount > 0 { + m.FailCount-- + return fmt.Errorf("failed upload: %d", m.FailCount) + } + return nil +} + +// Delete mocks objstore.Bucket.Delete() +func (m *mockBucket) Delete(ctx context.Context, name string) error { + return nil +} + +// Name mocks objstore.Bucket.Name() +func (m *mockBucket) Name() string { + return "mock" +} + +// Iter mocks objstore.Bucket.Iter() +func (m *mockBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { + return nil +} + +// Get mocks objstore.Bucket.Get() +func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + return nil, nil +} + +// GetRange mocks objstore.Bucket.GetRange() +func (m *mockBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { + return nil, nil +} + +// Exists mocks objstore.Bucket.Exists() +func (m *mockBucket) Exists(ctx context.Context, name string) (bool, error) { + return false, nil +} + +// IsObjNotFoundErr mocks objstore.Bucket.IsObjNotFoundErr() +func (m *mockBucket) IsObjNotFoundErr(err error) bool { + return false +} + +// ObjectSize mocks objstore.Bucket.Attributes() +func (m *mockBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) { + return objstore.ObjectAttributes{Size: 0, LastModified: time.Now()}, nil +} + +// Close mocks objstore.Bucket.Close() +func (m *mockBucket) Close() error { + return nil +}