Skip to content

Commit 4db84b5

Browse files
Merge branch 'cortexproject:master' into list-rules-filter
2 parents f2f00f0 + b98dee6 commit 4db84b5

13 files changed

+86
-24
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
* [BUGFIX] Ring: Add JOINING state to read operation. #5346
3939
* [BUGFIX] Compactor: Partial block with only visit marker should be deleted even there is no deletion marker. #5342
4040
* [BUGFIX] KV: Etcd calls will no longer block indefinitely and will now time out after the DialTimeout period. #5392
41+
* [BUGFIX] Ring: Allow RF greater than number of zones to select more than one instance per zone #5411
4142

4243
## 1.15.1 2023-04-26
4344

pkg/ring/ring.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -345,17 +345,17 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
345345
}
346346

347347
var (
348-
n = r.cfg.ReplicationFactor
349-
instances = bufDescs[:0]
350-
start = searchToken(r.ringTokens, key)
351-
iterations = 0
348+
replicationFactor = r.cfg.ReplicationFactor
349+
instances = bufDescs[:0]
350+
start = searchToken(r.ringTokens, key)
351+
iterations = 0
352352

353353
// We use a slice instead of a map because it's faster to search within a
354354
// slice than lookup a map for a very low number of items.
355355
distinctHosts = bufHosts[:0]
356356
distinctZones = bufZones[:0]
357357
)
358-
for i := start; len(distinctHosts) < n && iterations < len(r.ringTokens); i++ {
358+
for i := start; len(distinctHosts) < replicationFactor && iterations < len(r.ringTokens); i++ {
359359
iterations++
360360
// Wrap i around in the ring.
361361
i %= len(r.ringTokens)
@@ -385,11 +385,16 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
385385
// Check whether the replica set should be extended given we're including
386386
// this instance.
387387
if op.ShouldExtendReplicaSetOnState(instance.State) {
388-
n++
388+
replicationFactor++
389389
} else if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
390390
// We should only add the zone if we are not going to extend,
391391
// as we want to extend the instance in the same AZ.
392392
distinctZones = append(distinctZones, info.Zone)
393+
394+
if len(distinctZones) == len(r.ringZones) {
395+
// reset the zones to repeatedly get hosts from distinct zones
396+
distinctZones = distinctZones[:0]
397+
}
393398
}
394399

395400
instances = append(instances, instance)

pkg/ring/ring_test.go

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -378,19 +378,26 @@ func TestRing_Get_ZoneAwareness(t *testing.T) {
378378
zoneAwarenessEnabled: true,
379379
expectedInstances: 3,
380380
},
381-
"should fail if there are instances in 1 zone only on RF = 3": {
382-
numInstances: 16,
383-
numZones: 1,
381+
"should fail if there are not enough instances": {
382+
numInstances: 1,
383+
numZones: 3,
384384
replicationFactor: 3,
385385
zoneAwarenessEnabled: true,
386386
expectedErr: "at least 2 live replicas required across different availability zones, could only find 1",
387387
},
388-
"should succeed if there are instances in 2 zones on RF = 3": {
388+
"should succeed if there are instances in 3 zones on RF = 4": {
389389
numInstances: 16,
390-
numZones: 2,
391-
replicationFactor: 3,
390+
numZones: 3,
391+
replicationFactor: 4,
392+
zoneAwarenessEnabled: true,
393+
expectedInstances: 4,
394+
},
395+
"should succeed if there are instances in 3 zones on RF = 9": {
396+
numInstances: 16,
397+
numZones: 3,
398+
replicationFactor: 9,
392399
zoneAwarenessEnabled: true,
393-
expectedInstances: 2,
400+
expectedInstances: 9,
394401
},
395402
"should succeed if there are instances in 1 zone only on RF = 3 but zone-awareness is disabled": {
396403
numInstances: 16,
@@ -426,7 +433,7 @@ func TestRing_Get_ZoneAwareness(t *testing.T) {
426433
ringTokens: r.GetTokens(),
427434
ringTokensByZone: r.getTokensByZone(),
428435
ringInstanceByToken: r.getTokensInfo(),
429-
ringZones: getZones(r.getTokensByZone()),
436+
ringZones: []string{"zone-1", "zone-2", "zone-3"},
430437
strategy: NewDefaultReplicationStrategy(),
431438
KVClient: &MockClient{},
432439
}
@@ -447,26 +454,33 @@ func TestRing_Get_ZoneAwareness(t *testing.T) {
447454
set, err = ring.Get(testValues[i], Write, instances, bufHosts, bufZones)
448455
if testData.expectedErr != "" {
449456
require.EqualError(t, err, testData.expectedErr)
457+
continue // Skip the rest of the assertions if we were expecting an error.
450458
} else {
451459
require.NoError(t, err)
452460
}
453461

454-
// Skip the rest of the assertions if we were expecting an error.
455-
if testData.expectedErr != "" {
456-
continue
457-
}
458-
459462
// Check that we have the expected number of instances for replication.
460463
assert.Equal(t, testData.expectedInstances, len(set.Instances))
461464

462465
// Ensure all instances are in a different zone (only if zone-awareness is enabled).
463466
if testData.zoneAwarenessEnabled {
464-
zones := make(map[string]struct{})
467+
zones := make(map[string]int)
468+
maxNumOfHostsPerZone := math.Ceil(float64(testData.replicationFactor) / float64(testData.numZones))
469+
465470
for i := 0; i < len(set.Instances); i++ {
466-
if _, ok := zones[set.Instances[i].Zone]; ok {
467-
t.Fatal("found multiple instances in the same zone")
471+
if _, ok := zones[set.Instances[i].Zone]; !ok {
472+
zones[set.Instances[i].Zone] = 1
473+
} else {
474+
zones[set.Instances[i].Zone]++
475+
476+
if zones[set.Instances[i].Zone] > int(maxNumOfHostsPerZone) {
477+
t.Fatal("instances not spread across zones evenly")
478+
}
468479
}
469-
zones[set.Instances[i].Zone] = struct{}{}
480+
}
481+
482+
if testData.replicationFactor >= testData.numZones && len(zones) != testData.numZones {
483+
t.Fatalf("each zone must have at least one instance")
470484
}
471485
}
472486
}

pkg/storegateway/bucket_index_metadata_fetcher_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
)
2626

2727
func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) {
28+
t.Parallel()
2829
const userID = "user-1"
2930

3031
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
@@ -99,6 +100,7 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) {
99100
}
100101

101102
func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) {
103+
t.Parallel()
102104
const userID = "user-1"
103105

104106
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
@@ -150,6 +152,7 @@ func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) {
150152
}
151153

152154
func TestBucketIndexMetadataFetcher_Fetch_CorruptedBucketIndex(t *testing.T) {
155+
t.Parallel()
153156
const userID = "user-1"
154157

155158
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
@@ -204,6 +207,7 @@ func TestBucketIndexMetadataFetcher_Fetch_CorruptedBucketIndex(t *testing.T) {
204207
}
205208

206209
func TestBucketIndexMetadataFetcher_Fetch_ShouldResetGaugeMetrics(t *testing.T) {
210+
t.Parallel()
207211
const userID = "user-1"
208212

209213
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)

pkg/storegateway/bucket_store_metrics_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
)
1313

1414
func TestBucketStoreMetrics(t *testing.T) {
15+
t.Parallel()
1516
mainReg := prometheus.NewPedanticRegistry()
1617

1718
tsdbMetrics := NewBucketStoreMetrics()

pkg/storegateway/bucket_stores_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
)
4242

4343
func TestBucketStores_InitialSync(t *testing.T) {
44+
t.Parallel()
4445
userToMetric := map[string]string{
4546
"user-1": "series_1",
4647
"user-2": "series_2",
@@ -180,6 +181,7 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) {
180181
}
181182

182183
func TestBucketStores_SyncBlocks(t *testing.T) {
184+
t.Parallel()
183185
const (
184186
userID = "user-1"
185187
metricName = "series_1"
@@ -249,6 +251,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) {
249251
}
250252

251253
func TestBucketStores_syncUsersBlocks(t *testing.T) {
254+
t.Parallel()
252255
allUsers := []string{"user-1", "user-2", "user-3"}
253256

254257
tests := map[string]struct {

pkg/storegateway/chunk_bytes_pool_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
)
1616

1717
func TestChunkBytesPool_Get(t *testing.T) {
18+
t.Parallel()
1819
reg := prometheus.NewPedanticRegistry()
1920
p, err := newChunkBytesPool(cortex_tsdb.ChunkPoolDefaultMinBucketSize, cortex_tsdb.ChunkPoolDefaultMaxBucketSize, 0, reg)
2021
require.NoError(t, err)

pkg/storegateway/gateway_ring_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
)
1111

1212
func TestIsHealthyForStoreGatewayOperations(t *testing.T) {
13-
t.Parallel()
13+
//parallel testing causes data race
1414

1515
tests := map[string]struct {
1616
instance *ring.InstanceDesc
@@ -60,6 +60,7 @@ func TestIsHealthyForStoreGatewayOperations(t *testing.T) {
6060
testData := testData
6161

6262
t.Run(testName, func(t *testing.T) {
63+
t.Parallel()
6364
actual := testData.instance.IsHealthy(BlocksOwnerSync, testData.timeout, time.Now())
6465
assert.Equal(t, testData.ownerSyncExpected, actual)
6566

pkg/storegateway/gateway_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
)
4848

4949
func TestConfig_Validate(t *testing.T) {
50+
t.Parallel()
5051
tests := map[string]struct {
5152
setup func(cfg *Config, limits *validation.Limits)
5253
expected error
@@ -80,7 +81,9 @@ func TestConfig_Validate(t *testing.T) {
8081
}
8182

8283
for testName, testData := range tests {
84+
testData := testData
8385
t.Run(testName, func(t *testing.T) {
86+
t.Parallel()
8487
cfg := &Config{}
8588
limits := &validation.Limits{}
8689
flagext.DefaultValues(cfg, limits)
@@ -92,6 +95,7 @@ func TestConfig_Validate(t *testing.T) {
9295
}
9396

9497
func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) {
98+
t.Parallel()
9599
tests := map[string]struct {
96100
initialExists bool
97101
initialState ring.InstanceState
@@ -123,7 +127,9 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) {
123127
}
124128

125129
for testName, testData := range tests {
130+
testData := testData
126131
t.Run(testName, func(t *testing.T) {
132+
t.Parallel()
127133
ctx := context.Background()
128134
gatewayCfg := mockGatewayConfig()
129135
gatewayCfg.ShardingEnabled = true
@@ -174,6 +180,7 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) {
174180
}
175181

176182
func TestStoreGateway_InitialSyncWithShardingDisabled(t *testing.T) {
183+
t.Parallel()
177184
ctx := context.Background()
178185
gatewayCfg := mockGatewayConfig()
179186
gatewayCfg.ShardingEnabled = false
@@ -195,6 +202,7 @@ func TestStoreGateway_InitialSyncWithShardingDisabled(t *testing.T) {
195202
}
196203

197204
func TestStoreGateway_InitialSyncFailure(t *testing.T) {
205+
t.Parallel()
198206
ctx := context.Background()
199207
gatewayCfg := mockGatewayConfig()
200208
gatewayCfg.ShardingEnabled = true
@@ -223,6 +231,7 @@ func TestStoreGateway_InitialSyncFailure(t *testing.T) {
223231
// their own blocks, regardless which store-gateway joined the ring first or last (even if starting
224232
// at the same time, they will join the ring at a slightly different time).
225233
func TestStoreGateway_InitialSyncWithWaitRingStability(t *testing.T) {
234+
//parallel testing causes data race
226235
bucketClient, storageDir := cortex_testutil.PrepareFilesystemBucket(t)
227236

228237
// This tests uses real TSDB blocks. 24h time range, 2h block range period,
@@ -302,6 +311,7 @@ func TestStoreGateway_InitialSyncWithWaitRingStability(t *testing.T) {
302311
for testName, testData := range tests {
303312
for _, bucketIndexEnabled := range []bool{true, false} {
304313
t.Run(fmt.Sprintf("%s (bucket index enabled = %v)", testName, bucketIndexEnabled), func(t *testing.T) {
314+
//parallel testing causes data race
305315
// Randomise the seed but log it in case we need to reproduce the test on failure.
306316
seed := time.Now().UnixNano()
307317
rand.Seed(seed)
@@ -383,6 +393,7 @@ func TestStoreGateway_InitialSyncWithWaitRingStability(t *testing.T) {
383393
}
384394

385395
func TestStoreGateway_BlocksSyncWithDefaultSharding_RingTopologyChangedAfterScaleUp(t *testing.T) {
396+
t.Parallel()
386397
const (
387398
numUsers = 2
388399
numBlocks = numUsers * 12
@@ -542,6 +553,7 @@ func TestStoreGateway_BlocksSyncWithDefaultSharding_RingTopologyChangedAfterScal
542553
}
543554

544555
func TestStoreGateway_ShouldSupportLoadRingTokensFromFile(t *testing.T) {
556+
t.Parallel()
545557
tests := map[string]struct {
546558
storedTokens ring.Tokens
547559
expectedNumTokens int
@@ -561,7 +573,9 @@ func TestStoreGateway_ShouldSupportLoadRingTokensFromFile(t *testing.T) {
561573
}
562574

563575
for testName, testData := range tests {
576+
testData := testData
564577
t.Run(testName, func(t *testing.T) {
578+
t.Parallel()
565579
tokensFile, err := os.CreateTemp(os.TempDir(), "tokens-*")
566580
require.NoError(t, err)
567581
defer os.Remove(tokensFile.Name()) //nolint:errcheck
@@ -596,6 +610,7 @@ func TestStoreGateway_ShouldSupportLoadRingTokensFromFile(t *testing.T) {
596610
}
597611

598612
func TestStoreGateway_SyncOnRingTopologyChanged(t *testing.T) {
613+
t.Parallel()
599614
registeredAt := time.Now()
600615

601616
tests := map[string]struct {
@@ -704,7 +719,9 @@ func TestStoreGateway_SyncOnRingTopologyChanged(t *testing.T) {
704719
}
705720

706721
for testName, testData := range tests {
722+
testData := testData
707723
t.Run(testName, func(t *testing.T) {
724+
t.Parallel()
708725
ctx := context.Background()
709726
gatewayCfg := mockGatewayConfig()
710727
gatewayCfg.ShardingEnabled = true
@@ -764,6 +781,7 @@ func TestStoreGateway_SyncOnRingTopologyChanged(t *testing.T) {
764781
}
765782

766783
func TestStoreGateway_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testing.T) {
784+
t.Parallel()
767785
const unhealthyInstanceID = "unhealthy-id"
768786
const heartbeatTimeout = time.Minute
769787

@@ -810,6 +828,7 @@ func TestStoreGateway_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testin
810828
}
811829

812830
func TestStoreGateway_SeriesQueryingShouldRemoveExternalLabels(t *testing.T) {
831+
t.Parallel()
813832
ctx := context.Background()
814833
logger := log.NewNopLogger()
815834
userID := "user-1"
@@ -855,7 +874,9 @@ func TestStoreGateway_SeriesQueryingShouldRemoveExternalLabels(t *testing.T) {
855874
}
856875

857876
for _, bucketIndexEnabled := range []bool{true, false} {
877+
bucketIndexEnabled := bucketIndexEnabled
858878
t.Run(fmt.Sprintf("bucket index enabled = %v", bucketIndexEnabled), func(t *testing.T) {
879+
t.Parallel()
859880
// Create a store-gateway used to query back the series from the blocks.
860881
gatewayCfg := mockGatewayConfig()
861882
gatewayCfg.ShardingEnabled = false
@@ -901,6 +922,7 @@ func TestStoreGateway_SeriesQueryingShouldRemoveExternalLabels(t *testing.T) {
901922
}
902923

903924
func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testing.T) {
925+
t.Parallel()
904926
const chunksQueried = 10
905927

906928
tests := map[string]struct {
@@ -948,6 +970,7 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxChunksPerQueryLimit(t *testi
948970

949971
for testName, testData := range tests {
950972
t.Run(testName, func(t *testing.T) {
973+
//parallel testing causes data race
951974
// Customise the limits.
952975
limits := defaultLimitsConfig()
953976
limits.MaxChunksPerQuery = testData.limit
@@ -1036,6 +1059,7 @@ func TestStoreGateway_SeriesQueryingShouldEnforceMaxSeriesPerQueryLimit(t *testi
10361059

10371060
for testName, testData := range tests {
10381061
t.Run(testName, func(t *testing.T) {
1062+
//parallel testing causes data race
10391063
// Customise the limits.
10401064
limits := defaultLimitsConfig()
10411065
limits.MaxFetchedSeriesPerQuery = testData.limit

0 commit comments

Comments
 (0)