Skip to content

Fix S3 BucketWithRetries upload empty content issue #5217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* [BUGFIX] Query-frontend: Fix shardable instant queries do not produce sorted results for `sort`, `sort_desc`, `topk`, `bottomk` functions. #5148, #5170
* [BUGFIX] Querier: Fix `/api/v1/series` returning 5XX instead of 4XX when limits are hit. #5169
* [BUGFIX] Compactor: Fix issue that shuffle sharding planner return error if block is under visit by other compactor. #5188
* [BUGFIX] Fix S3 BucketWithRetries upload empty content issue #5217
* [FEATURE] Alertmanager: Add support for time_intervals. #5102

## 1.14.0 2022-12-02
Expand Down
20 changes: 18 additions & 2 deletions pkg/storage/bucket/s3/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/s3"
Expand All @@ -29,6 +30,7 @@ func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucke
return nil, err
}
return &BucketWithRetries{
logger: logger,
bucket: bucket,
operationRetries: defaultOperationRetries,
retryMinBackoff: defaultRetryMinBackoff,
Expand All @@ -48,6 +50,7 @@ func NewBucketReaderClient(cfg Config, name string, logger log.Logger) (objstore
return nil, err
}
return &BucketWithRetries{
logger: logger,
bucket: bucket,
operationRetries: defaultOperationRetries,
retryMinBackoff: defaultRetryMinBackoff,
Expand Down Expand Up @@ -92,6 +95,7 @@ func newS3Config(cfg Config) (s3.Config, error) {
}

type BucketWithRetries struct {
logger log.Logger
bucket objstore.Bucket
operationRetries int
retryMinBackoff time.Duration
Expand All @@ -115,7 +119,10 @@ func (b *BucketWithRetries) retry(ctx context.Context, f func() error) error {
}
retries.Wait()
}
return lastErr
if lastErr != nil {
level.Error(b.logger).Log("msg", "bucket operation fail after retries", "err", lastErr)
}
return nil
}

func (b *BucketWithRetries) Name() string {
Expand Down Expand Up @@ -153,8 +160,17 @@ func (b *BucketWithRetries) Exists(ctx context.Context, name string) (exists boo
}

func (b *BucketWithRetries) Upload(ctx context.Context, name string, r io.Reader) error {
return b.retry(ctx, func() error {
rs, ok := r.(io.ReadSeeker)
if !ok {
// Skip retry if incoming Reader is not seekable to avoid
// loading entire content into memory
return b.bucket.Upload(ctx, name, r)
}
return b.retry(ctx, func() error {
if _, err := rs.Seek(0, io.SeekStart); err != nil {
return err
}
return b.bucket.Upload(ctx, name, rs)
})
}

Expand Down
123 changes: 123 additions & 0 deletions pkg/storage/bucket/s3/bucket_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package s3

import (
"bytes"
"context"
"fmt"
"io"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
)

func TestBucketWithRetries_UploadSeekable(t *testing.T) {
t.Parallel()

m := mockBucket{
FailCount: 3,
}
b := BucketWithRetries{
bucket: &m,
operationRetries: 5,
retryMinBackoff: 10 * time.Millisecond,
retryMaxBackoff: time.Second,
}

input := []byte("test input")
err := b.Upload(context.Background(), "dummy", bytes.NewReader(input))
require.NoError(t, err)
require.Equal(t, input, m.uploadedContent)
}

func TestBucketWithRetries_UploadNonSeekable(t *testing.T) {
t.Parallel()

maxFailCount := 3
m := mockBucket{
FailCount: maxFailCount,
}
b := BucketWithRetries{
bucket: &m,
operationRetries: 5,
retryMinBackoff: 10 * time.Millisecond,
retryMaxBackoff: time.Second,
}

input := &fakeReader{}
err := b.Upload(context.Background(), "dummy", input)
require.Errorf(t, err, "empty byte slice")
require.Equal(t, maxFailCount, m.FailCount)
}

type fakeReader struct {
}

func (f *fakeReader) Read(p []byte) (n int, err error) {
return 0, fmt.Errorf("empty byte slice")
}

type mockBucket struct {
FailCount int
uploadedContent []byte
}

// Upload mocks objstore.Bucket.Upload()
func (m *mockBucket) Upload(ctx context.Context, name string, r io.Reader) error {
var buf bytes.Buffer
if _, err := buf.ReadFrom(r); err != nil {
return err
}
m.uploadedContent = buf.Bytes()
if m.FailCount > 0 {
m.FailCount--
return fmt.Errorf("failed upload: %d", m.FailCount)
}
return nil
}

// Delete mocks objstore.Bucket.Delete()
func (m *mockBucket) Delete(ctx context.Context, name string) error {
return nil
}

// Name mocks objstore.Bucket.Name()
func (m *mockBucket) Name() string {
return "mock"
}

// Iter mocks objstore.Bucket.Iter()
func (m *mockBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
return nil
}

// Get mocks objstore.Bucket.Get()
func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return nil, nil
}

// GetRange mocks objstore.Bucket.GetRange()
func (m *mockBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return nil, nil
}

// Exists mocks objstore.Bucket.Exists()
func (m *mockBucket) Exists(ctx context.Context, name string) (bool, error) {
return false, nil
}

// IsObjNotFoundErr mocks objstore.Bucket.IsObjNotFoundErr()
func (m *mockBucket) IsObjNotFoundErr(err error) bool {
return false
}

// ObjectSize mocks objstore.Bucket.Attributes()
func (m *mockBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
return objstore.ObjectAttributes{Size: 0, LastModified: time.Now()}, nil
}

// Close mocks objstore.Bucket.Close()
func (m *mockBucket) Close() error {
return nil
}