Skip to content

Commit e08fbbd

Browse files
authored
Fix retries on store-gateway initial sync failure (#4088)
* Fix retries on store-gateway initial sync failure Signed-off-by: Marco Pracucci <[email protected]> * Added PR number to CHANGELOG Signed-off-by: Marco Pracucci <[email protected]>
1 parent 2d8477c commit e08fbbd

File tree

3 files changed

+83
-2
lines changed

3 files changed

+83
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
* [ENHANCEMENT] Block Storage Ingester: `/flush` now accepts two new parameters: `tenant` to specify tenant to flush and `wait=true` to make call synchronous. Multiple tenants can be specified by repeating `tenant` parameter. If no `tenant` is specified, all tenants are flushed, as before. #4073
4444
* [ENHANCEMENT] Alertmanager: validate configured `-alertmanager.web.external-url` and fail if ends with `/`. #4081
4545
* [ENHANCEMENT] Allow configuration of Cassandra's host selection policy. #4069
46+
* [ENHANCEMENT] Store-gateway: retry synching blocks if a per-tenant sync fails. #3975 #4088
4647
* [BUGFIX] Ruler-API: fix bug where `/api/v1/rules/<namespace>/<group_name>` endpoint return `400` instead of `404`. #4013
4748
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
4849
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959

pkg/storegateway/bucket_stores.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
141141
func (u *BucketStores) InitialSync(ctx context.Context) error {
142142
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")
143143

144-
if err := u.syncUsersBlocks(ctx, func(ctx context.Context, s *store.BucketStore) error {
144+
if err := u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, s *store.BucketStore) error {
145145
return s.InitialSync(ctx)
146146
}); err != nil {
147147
level.Warn(u.logger).Log("msg", "failed to synchronize TSDB blocks", "err", err)
@@ -161,7 +161,7 @@ func (u *BucketStores) SyncBlocks(ctx context.Context) error {
161161

162162
func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *store.BucketStore) error) error {
163163
retries := util.NewBackoff(ctx, util.BackoffConfig{
164-
MinBackoff: 100 * time.Millisecond,
164+
MinBackoff: 1 * time.Second,
165165
MaxBackoff: 10 * time.Second,
166166
MaxRetries: 3,
167167
})

pkg/storegateway/bucket_stores_test.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package storegateway
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"io"
68
"io/ioutil"
79
"math"
810
"os"
@@ -24,6 +26,7 @@ import (
2426
"github.com/stretchr/testify/require"
2527
thanos_metadata "github.com/thanos-io/thanos/pkg/block/metadata"
2628
"github.com/thanos-io/thanos/pkg/extprom"
29+
"github.com/thanos-io/thanos/pkg/objstore"
2730
"github.com/thanos-io/thanos/pkg/store"
2831
"github.com/thanos-io/thanos/pkg/store/labelpb"
2932
"github.com/thanos-io/thanos/pkg/store/storepb"
@@ -118,6 +121,68 @@ func TestBucketStores_InitialSync(t *testing.T) {
118121
assert.Greater(t, testutil.ToFloat64(stores.syncLastSuccess), float64(0))
119122
}
120123

124+
func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) {
125+
ctx := context.Background()
126+
cfg, cleanup := prepareStorageConfig(t)
127+
defer cleanup()
128+
129+
storageDir, err := ioutil.TempDir(os.TempDir(), "storage-*")
130+
require.NoError(t, err)
131+
132+
// Generate a block for the user in the storage.
133+
generateStorageBlock(t, storageDir, "user-1", "series_1", 10, 100, 15)
134+
135+
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
136+
require.NoError(t, err)
137+
138+
// Wrap the bucket to fail the 1st Get() request.
139+
bucket = &failFirstGetBucket{Bucket: bucket}
140+
141+
reg := prometheus.NewPedanticRegistry()
142+
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
143+
require.NoError(t, err)
144+
145+
// Initial sync should succeed even if a transient error occurs.
146+
require.NoError(t, stores.InitialSync(ctx))
147+
148+
// Query series after the initial sync.
149+
seriesSet, warnings, err := querySeries(stores, "user-1", "series_1", 20, 40)
150+
require.NoError(t, err)
151+
assert.Empty(t, warnings)
152+
require.Len(t, seriesSet, 1)
153+
assert.Equal(t, []labelpb.ZLabel{{Name: labels.MetricName, Value: "series_1"}}, seriesSet[0].Labels)
154+
155+
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
156+
# HELP cortex_blocks_meta_syncs_total Total blocks metadata synchronization attempts
157+
# TYPE cortex_blocks_meta_syncs_total counter
158+
cortex_blocks_meta_syncs_total 2
159+
160+
# HELP cortex_blocks_meta_sync_failures_total Total blocks metadata synchronization failures
161+
# TYPE cortex_blocks_meta_sync_failures_total counter
162+
cortex_blocks_meta_sync_failures_total 1
163+
164+
# HELP cortex_bucket_store_blocks_loaded Number of currently loaded blocks.
165+
# TYPE cortex_bucket_store_blocks_loaded gauge
166+
cortex_bucket_store_blocks_loaded 1
167+
168+
# HELP cortex_bucket_store_block_loads_total Total number of remote block loading attempts.
169+
# TYPE cortex_bucket_store_block_loads_total counter
170+
cortex_bucket_store_block_loads_total 1
171+
172+
# HELP cortex_bucket_store_block_load_failures_total Total number of failed remote block loading attempts.
173+
# TYPE cortex_bucket_store_block_load_failures_total counter
174+
cortex_bucket_store_block_load_failures_total 0
175+
`),
176+
"cortex_blocks_meta_syncs_total",
177+
"cortex_blocks_meta_sync_failures_total",
178+
"cortex_bucket_store_block_loads_total",
179+
"cortex_bucket_store_block_load_failures_total",
180+
"cortex_bucket_store_blocks_loaded",
181+
))
182+
183+
assert.Greater(t, testutil.ToFloat64(stores.syncLastSuccess), float64(0))
184+
}
185+
121186
func TestBucketStores_SyncBlocks(t *testing.T) {
122187
const (
123188
userID = "user-1"
@@ -534,3 +599,18 @@ func (u *userShardingStrategy) FilterBlocks(ctx context.Context, userID string,
534599
}
535600
return nil
536601
}
602+
603+
// failFirstGetBucket is an objstore.Bucket wrapper which fails the first Get() request with a mocked error.
604+
type failFirstGetBucket struct {
605+
objstore.Bucket
606+
607+
firstGet atomic.Bool
608+
}
609+
610+
func (f *failFirstGetBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
611+
if f.firstGet.CAS(false, true) {
612+
return nil, errors.New("Get() request mocked error")
613+
}
614+
615+
return f.Bucket.Get(ctx, name)
616+
}

0 commit comments

Comments
 (0)