Skip to content

Commit d344999

Browse files
committed
Filter readOnly ingesters when sharding
Signed-off-by: Daniel Deluiggi <[email protected]>
1 parent 7ed0c41 commit d344999

File tree

8 files changed

+243
-51
lines changed

8 files changed

+243
-51
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
* [BUGFIX] Ingester: Fix possible race condition when `active series per LabelSet` is configured. #6409
6969
* [BUGFIX] Query Frontend: Fix @ modifier not being applied correctly on sub queries. #6450
7070
* [BUGFIX] Cortex Redis flags with multiple dots #6476
71+
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
7172

7273
## 1.18.1 2024-10-14
7374

pkg/compactor/shuffle_sharding_grouper_test.go

+5
Original file line numberDiff line numberDiff line change
@@ -794,6 +794,11 @@ func (r *RingMock) ShuffleShardWithZoneStability(identifier string, size int) ri
794794
return args.Get(0).(ring.ReadRing)
795795
}
796796

797+
func (r *RingMock) ShuffleShardWithOperation(identifier string, size int, op ring.Operation) ring.ReadRing {
798+
args := r.Called(identifier, size, op)
799+
return args.Get(0).(ring.ReadRing)
800+
}
801+
797802
func (r *RingMock) GetInstanceState(instanceID string) (ring.InstanceState, error) {
798803
args := r.Called(instanceID)
799804
return args.Get(0).(ring.InstanceState), args.Error(1)

pkg/distributor/distributor.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -787,7 +787,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
787787

788788
// Obtain a subring if required.
789789
if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
790-
subRing = d.ingestersRing.ShuffleShard(userID, limits.IngestionTenantShardSize)
790+
subRing = d.ingestersRing.ShuffleShardWithOperation(userID, limits.IngestionTenantShardSize, ring.WriteShard)
791791
}
792792

793793
keys := append(seriesKeys, metadataKeys...)

pkg/ring/model.go

+13-4
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,7 @@ type CompareResult int
543543
const (
544544
Equal CompareResult = iota // Both rings contain same exact instances.
545545
EqualButStatesAndTimestamps // Both rings contain the same instances with the same data except states and timestamps (may differ).
546+
EqualButReadOnly // Both rings contain the same instances but Write ring can change due to ReadOnly update
546547
Different // Rings have different set of instances, or their information don't match.
547548
)
548549

@@ -566,6 +567,7 @@ func (d *Desc) RingCompare(o *Desc) CompareResult {
566567
}
567568

568569
equalStatesAndTimestamps := true
570+
equalReadOnly := true
569571

570572
for name, ing := range d.Ingesters {
571573
oing, ok := o.Ingesters[name]
@@ -600,14 +602,21 @@ func (d *Desc) RingCompare(o *Desc) CompareResult {
600602
}
601603

602604
if ing.State != oing.State {
603-
equalStatesAndTimestamps = false
605+
if ing.State == READONLY || oing.State == READONLY {
606+
equalReadOnly = false
607+
} else {
608+
equalStatesAndTimestamps = false
609+
}
604610
}
605611
}
606612

607-
if equalStatesAndTimestamps {
608-
return Equal
613+
if !equalReadOnly {
614+
return EqualButReadOnly
615+
}
616+
if !equalStatesAndTimestamps {
617+
return EqualButStatesAndTimestamps
609618
}
610-
return EqualButStatesAndTimestamps
619+
return Equal
611620
}
612621

613622
func GetOrCreateRingDesc(d interface{}) *Desc {

pkg/ring/model_test.go

+20
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,21 @@ func TestDesc_RingsCompare(t *testing.T) {
395395
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1"}}},
396396
expected: Equal,
397397
},
398+
"same number of instances, from active to readOnly": {
399+
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE}}},
400+
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY}}},
401+
expected: EqualButReadOnly,
402+
},
403+
"same number of instances, from readOnly to active": {
404+
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY}}},
405+
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE}}},
406+
expected: EqualButReadOnly,
407+
},
408+
"same number of instances, prioritize readOnly than timestamp changes": {
409+
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE, Timestamp: 123456}}},
410+
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY, Timestamp: 789012}}},
411+
expected: EqualButReadOnly,
412+
},
398413
"same single instance, different timestamp": {
399414
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Timestamp: 123456}}},
400415
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Timestamp: 789012}}},
@@ -440,6 +455,11 @@ func TestDesc_RingsCompare(t *testing.T) {
440455
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing2": {Addr: "addr1", Tokens: []uint32{1, 2, 3}}}},
441456
expected: Different,
442457
},
458+
"same number of instances, prioritize diff than ReadOnly": {
459+
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "one", State: ACTIVE}}},
460+
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "two", State: READONLY}}},
461+
expected: Different,
462+
},
443463
}
444464

445465
for testName, testData := range tests {

pkg/ring/ring.go

+83-44
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ type ReadRing interface {
7070
// and size (number of instances).
7171
ShuffleShard(identifier string, size int) ReadRing
7272

73+
// ShuffleShardWithOperation returns a subring for the provided identifier (eg. a tenant ID)
74+
// and size (number of instances) filtered for a given operation.
75+
ShuffleShardWithOperation(identifier string, size int, op Operation) ReadRing
76+
7377
// ShuffleShardWithZoneStability does the same as ShuffleShard but using a different shuffle sharding algorithm.
7478
// It doesn't round up shard size to be divisible to number of zones and make sure when scaling up/down one
7579
// shard size at a time, at most 1 instance can be changed.
@@ -112,6 +116,8 @@ var (
112116
return s == READONLY
113117
})
114118

119+
WriteShard = NewOp([]InstanceState{ACTIVE, PENDING, LEAVING, JOINING}, func(s InstanceState) bool { return false })
120+
115121
// Read operation that extends the replica set if an instance is not ACTIVE, PENDING, LEAVING, JOINING OR READONLY
116122
Read = NewOp([]InstanceState{ACTIVE, PENDING, LEAVING, JOINING, READONLY}, func(s InstanceState) bool {
117123
// To match Write with extended replica set we have to also increase the
@@ -222,6 +228,7 @@ type subringCacheKey struct {
222228
shardSize int
223229

224230
zoneStableSharding bool
231+
operation Operation
225232
}
226233

227234
// New creates a new Ring. Being a service, Ring needs to be started to do anything.
@@ -333,12 +340,16 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
333340
}
334341

335342
rc := prevRing.RingCompare(ringDesc)
336-
if rc == Equal || rc == EqualButStatesAndTimestamps {
343+
if rc == Equal || rc == EqualButStatesAndTimestamps || rc == EqualButReadOnly {
337344
// No need to update tokens or zones. Only states and timestamps
338345
// have changed. (If Equal, nothing has changed, but that doesn't happen
339346
// when watching the ring for updates).
340347
r.mtx.Lock()
341348
r.ringDesc = ringDesc
349+
if rc == EqualButReadOnly && r.shuffledSubringCache != nil {
350+
// Invalidate all cached subrings.
351+
r.shuffledSubringCache = make(map[subringCacheKey]*Ring)
352+
}
342353
r.updateRingMetrics(rc)
343354
r.mtx.Unlock()
344355
return
@@ -732,11 +743,15 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) {
732743
// - Shuffling: probabilistically, for a large enough cluster each identifier gets a different
733744
// set of instances, with a reduced number of overlapping instances between two identifiers.
734745
func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
735-
return r.shuffleShardWithCache(identifier, size, false)
746+
return r.shuffleShardWithCache(identifier, size, false, Reporting)
747+
}
748+
749+
func (r *Ring) ShuffleShardWithOperation(identifier string, size int, op Operation) ReadRing {
750+
return r.shuffleShardWithCache(identifier, size, false, op)
736751
}
737752

738753
func (r *Ring) ShuffleShardWithZoneStability(identifier string, size int) ReadRing {
739-
return r.shuffleShardWithCache(identifier, size, true)
754+
return r.shuffleShardWithCache(identifier, size, true, Reporting)
740755
}
741756

742757
// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes all instances
@@ -752,26 +767,26 @@ func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPer
752767
return r
753768
}
754769

755-
return r.shuffleShard(identifier, size, lookbackPeriod, now, false)
770+
return r.shuffleShard(identifier, size, lookbackPeriod, now, false, Reporting)
756771
}
757772

758-
func (r *Ring) shuffleShardWithCache(identifier string, size int, zoneStableSharding bool) ReadRing {
773+
func (r *Ring) shuffleShardWithCache(identifier string, size int, zoneStableSharding bool, op Operation) ReadRing {
759774
// Nothing to do if the shard size is not smaller than the actual ring.
760-
if size <= 0 || r.InstancesCount() <= size {
775+
if size <= 0 || (op == Reporting && r.InstancesCount() <= size) {
761776
return r
762777
}
763778

764-
if cached := r.getCachedShuffledSubring(identifier, size, zoneStableSharding); cached != nil {
779+
if cached := r.getCachedShuffledSubring(identifier, size, zoneStableSharding, op); cached != nil {
765780
return cached
766781
}
767782

768-
result := r.shuffleShard(identifier, size, 0, time.Now(), zoneStableSharding)
783+
result := r.shuffleShard(identifier, size, 0, time.Now(), zoneStableSharding, op)
769784

770-
r.setCachedShuffledSubring(identifier, size, zoneStableSharding, result)
785+
r.setCachedShuffledSubring(identifier, size, zoneStableSharding, op, result)
771786
return result
772787
}
773788

774-
func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) *Ring {
789+
func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool, op Operation) *Ring {
775790
lookbackUntil := now.Add(-lookbackPeriod).Unix()
776791

777792
r.mtx.RLock()
@@ -783,14 +798,16 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
783798
zonesWithExtraInstance int
784799
)
785800

801+
ro := r.getRingForOperation(op)
802+
786803
if r.cfg.ZoneAwarenessEnabled {
787804
if zoneStableSharding {
788-
numInstancesPerZone = size / len(r.ringZones)
789-
zonesWithExtraInstance = size % len(r.ringZones)
805+
numInstancesPerZone = size / len(ro.ringZones)
806+
zonesWithExtraInstance = size % len(ro.ringZones)
790807
} else {
791-
numInstancesPerZone = shardUtil.ShuffleShardExpectedInstancesPerZone(size, len(r.ringZones))
808+
numInstancesPerZone = shardUtil.ShuffleShardExpectedInstancesPerZone(size, len(ro.ringZones))
792809
}
793-
actualZones = r.ringZones
810+
actualZones = ro.ringZones
794811
} else {
795812
numInstancesPerZone = size
796813
actualZones = []string{""}
@@ -802,12 +819,12 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
802819
for _, zone := range actualZones {
803820
var tokens []uint32
804821

805-
if r.cfg.ZoneAwarenessEnabled {
806-
tokens = r.ringTokensByZone[zone]
822+
if ro.cfg.ZoneAwarenessEnabled {
823+
tokens = ro.ringTokensByZone[zone]
807824
} else {
808825
// When zone-awareness is disabled, we just iterate over 1 single fake zone
809826
// and use all tokens in the ring.
810-
tokens = r.ringTokens
827+
tokens = ro.ringTokens
811828
}
812829

813830
// Initialise the random generator used to select instances in the ring.
@@ -835,7 +852,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
835852
// Wrap p around in the ring.
836853
p %= len(tokens)
837854

838-
info, ok := r.ringInstanceByToken[tokens[p]]
855+
info, ok := ro.ringInstanceByToken[tokens[p]]
839856
if !ok {
840857
// This should never happen unless a bug in the ring code.
841858
panic(ErrInconsistentTokensInfo)
@@ -847,7 +864,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
847864
}
848865

849866
instanceID := info.InstanceID
850-
instance := r.ringDesc.Ingesters[instanceID]
867+
instance := ro.ringDesc.Ingesters[instanceID]
851868
shard[instanceID] = instance
852869

853870
// If the lookback is enabled and this instance has been registered within the lookback period
@@ -869,27 +886,7 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
869886
}
870887
}
871888

872-
// Build a read-only ring for the shard.
873-
shardDesc := &Desc{Ingesters: shard}
874-
shardTokensByZone := shardDesc.getTokensByZone()
875-
876-
return &Ring{
877-
cfg: r.cfg,
878-
strategy: r.strategy,
879-
ringDesc: shardDesc,
880-
ringTokens: shardDesc.GetTokens(),
881-
ringTokensByZone: shardTokensByZone,
882-
ringZones: getZones(shardTokensByZone),
883-
KVClient: r.KVClient,
884-
885-
// We reference the original map as is in order to avoid copying. It's safe to do
886-
// because this map is immutable by design and it's a superset of the actual instances
887-
// with the subring.
888-
ringInstanceByToken: r.ringInstanceByToken,
889-
890-
// For caching to work, remember these values.
891-
lastTopologyChange: r.lastTopologyChange,
892-
}
889+
return r.copyWithNewDesc(shard)
893890
}
894891

895892
// GetInstanceState returns the current state of an instance or an error if the
@@ -926,7 +923,7 @@ func (r *Ring) HasInstance(instanceID string) bool {
926923
return ok
927924
}
928925

929-
func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableSharding bool) *Ring {
926+
func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, op Operation) *Ring {
930927
if r.cfg.SubringCacheDisabled {
931928
return nil
932929
}
@@ -935,7 +932,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableS
935932
defer r.mtx.RUnlock()
936933

937934
// if shuffledSubringCache map is nil, reading it returns default value (nil pointer).
938-
cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}]
935+
cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding, operation: op}]
939936
if cached == nil {
940937
return nil
941938
}
@@ -954,7 +951,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableS
954951
return cached
955952
}
956953

957-
func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, subring *Ring) {
954+
func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, op Operation, subring *Ring) {
958955
if subring == nil || r.cfg.SubringCacheDisabled {
959956
return
960957
}
@@ -966,7 +963,49 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableS
966963
// (which can happen between releasing the read lock and getting read-write lock).
967964
// Note that shuffledSubringCache can be only nil when set by test.
968965
if r.shuffledSubringCache != nil && r.lastTopologyChange.Equal(subring.lastTopologyChange) {
969-
r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}] = subring
966+
r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding, operation: op}] = subring
967+
}
968+
}
969+
970+
// getRingForOperation Returns a new ring filtered for operation.
971+
// The ring read lock must be already taken when calling this function.
972+
func (r *Ring) getRingForOperation(op Operation) *Ring {
973+
//Avoid filtering if we are receiving default operation or empty ring
974+
if r.ringDesc == nil || len(r.ringDesc.Ingesters) == 0 || op == Reporting {
975+
return r
976+
}
977+
978+
instanceDescs := make(map[string]InstanceDesc)
979+
for id, instance := range r.ringDesc.Ingesters {
980+
if op.IsInstanceInStateHealthy(instance.State) {
981+
instanceDescs[id] = instance
982+
}
983+
}
984+
985+
return r.copyWithNewDesc(instanceDescs)
986+
}
987+
988+
// copyWithNewDesc Return a new ring with updated data for different InstanceDesc
989+
func (r *Ring) copyWithNewDesc(desc map[string]InstanceDesc) *Ring {
990+
shardDesc := &Desc{Ingesters: desc}
991+
shardTokensByZone := shardDesc.getTokensByZone()
992+
993+
return &Ring{
994+
cfg: r.cfg,
995+
strategy: r.strategy,
996+
ringDesc: shardDesc,
997+
ringTokens: shardDesc.GetTokens(),
998+
ringTokensByZone: shardTokensByZone,
999+
ringZones: getZones(shardTokensByZone),
1000+
KVClient: r.KVClient,
1001+
1002+
// We reference the original map as is in order to avoid copying. It's safe to do
1003+
// because this map is immutable by design and it's a superset of the actual instances
1004+
// with the subring.
1005+
ringInstanceByToken: r.ringInstanceByToken,
1006+
1007+
// For caching to work, remember these values.
1008+
lastTopologyChange: r.lastTopologyChange,
9701009
}
9711010
}
9721011

0 commit comments

Comments
 (0)