diff --git a/CHANGELOG.md b/CHANGELOG.md index 771bb0554a5..7744fdb6387 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## master / unreleased * [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526 +* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517 ## 1.19.0 in progress diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 4c41a90a991..c15f82ce354 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -3122,7 +3122,7 @@ func (i *Ingester) flushHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -// ModeHandler Change mode of ingester. It will also update set unregisterOnShutdown to true if READONLY mode +// ModeHandler Change mode of ingester. func (i *Ingester) ModeHandler(w http.ResponseWriter, r *http.Request) { err := r.ParseForm() if err != nil { diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 7e8b033e83f..c34d3464019 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -1005,6 +1005,12 @@ func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error level.Info(i.logger).Log("msg", "changing instance state from", "old_state", currState, "new_state", state, "ring", i.RingName) i.setState(state) + + //The instances is rejoining the ring. It should reset its registered time. + if currState == READONLY && state == ACTIVE { + registeredAt := time.Now() + i.setRegisteredAt(registeredAt) + } return i.updateConsul(ctx) } diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index 035cfc8f1b8..e0756765379 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -827,6 +827,86 @@ func TestTokenFileOnDisk(t *testing.T) { } } +func TestRegisteredAtOnBackToActive(t *testing.T) { + ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + var ringConfig Config + flagext.DefaultValues(&ringConfig) + ringConfig.KVStore.Mock = ringStore + + r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), r)) + defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck + + tokenDir := t.TempDir() + + lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") + lifecyclerConfig.NumTokens = 512 + lifecyclerConfig.TokensFilePath = tokenDir + "/tokens" + + // Start first ingester. + l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, true, log.NewNopLogger(), nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1)) + + // Check this ingester joined, is active. + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), ringKey) + require.NoError(t, err) + + desc, ok := d.(*Desc) + return ok && + len(desc.Ingesters) == 1 && + desc.Ingesters["ing1"].State == ACTIVE + }) + + //Get original registeredTime + d, err := r.KVClient.Get(context.Background(), ringKey) + require.NoError(t, err) + desc, ok := d.(*Desc) + require.True(t, ok) + originalRegisterTime := desc.Ingesters["ing1"].RegisteredTimestamp + + // Change state from ACTIVE to READONLY + err = l1.ChangeState(context.Background(), READONLY) + require.NoError(t, err) + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), ringKey) + require.NoError(t, err) + + desc, ok := d.(*Desc) + return ok && + desc.Ingesters["ing1"].State == READONLY + }) + + //Guarantee 1s diff for RegisteredTimestamp + time.Sleep(1 * time.Second) + + // Change state from READONLY to ACTIVE + err = l1.ChangeState(context.Background(), ACTIVE) + require.NoError(t, err) + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), ringKey) + require.NoError(t, err) + + desc, ok := d.(*Desc) + return ok && + desc.Ingesters["ing1"].State == ACTIVE + }) + + d, err = r.KVClient.Get(context.Background(), ringKey) + require.NoError(t, err) + + desc, ok = d.(*Desc) + require.True(t, ok) + ing := desc.Ingesters["ing1"] + require.True(t, ing.RegisteredTimestamp > originalRegisterTime) + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l1)) +} + func TestTokenFileOnDisk_WithoutAutoJoinOnStartup(t *testing.T) { ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) diff --git a/pkg/ring/model.go b/pkg/ring/model.go index 3f0e6944e2f..a465bf0fa91 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -543,10 +543,11 @@ type CompareResult int const ( Equal CompareResult = iota // Both rings contain same exact instances. EqualButStatesAndTimestamps // Both rings contain the same instances with the same data except states and timestamps (may differ). + EqualButReadOnly // Both rings contain the same instances but Write ring can change due to ReadOnly update Different // Rings have different set of instances, or their information don't match. ) -// RingCompare compares this ring against another one and returns one of Equal, EqualButStatesAndTimestamps or Different. +// RingCompare compares this ring against another one and returns one of Equal, EqualButStatesAndTimestamps, EqualButReadOnly or Different. func (d *Desc) RingCompare(o *Desc) CompareResult { if d == nil { if o == nil || len(o.Ingesters) == 0 { @@ -566,6 +567,7 @@ func (d *Desc) RingCompare(o *Desc) CompareResult { } equalStatesAndTimestamps := true + equalReadOnly := true for name, ing := range d.Ingesters { oing, ok := o.Ingesters[name] @@ -600,14 +602,21 @@ func (d *Desc) RingCompare(o *Desc) CompareResult { } if ing.State != oing.State { - equalStatesAndTimestamps = false + if ing.State == READONLY || oing.State == READONLY { + equalReadOnly = false + } else { + equalStatesAndTimestamps = false + } } } - if equalStatesAndTimestamps { - return Equal + if !equalReadOnly { + return EqualButReadOnly + } + if !equalStatesAndTimestamps { + return EqualButStatesAndTimestamps } - return EqualButStatesAndTimestamps + return Equal } func GetOrCreateRingDesc(d interface{}) *Desc { diff --git a/pkg/ring/model_test.go b/pkg/ring/model_test.go index 896aef56897..f34b6e566d2 100644 --- a/pkg/ring/model_test.go +++ b/pkg/ring/model_test.go @@ -395,6 +395,21 @@ func TestDesc_RingsCompare(t *testing.T) { r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1"}}}, expected: Equal, }, + "same number of instances, from active to readOnly": { + r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE}}}, + r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY}}}, + expected: EqualButReadOnly, + }, + "same number of instances, from readOnly to active": { + r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY}}}, + r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE}}}, + expected: EqualButReadOnly, + }, + "same number of instances, prioritize readOnly than timestamp changes": { + r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE, Timestamp: 123456}}}, + r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY, Timestamp: 789012}}}, + expected: EqualButReadOnly, + }, "same single instance, different timestamp": { r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Timestamp: 123456}}}, r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Timestamp: 789012}}}, @@ -440,6 +455,11 @@ func TestDesc_RingsCompare(t *testing.T) { r2: &Desc{Ingesters: map[string]InstanceDesc{"ing2": {Addr: "addr1", Tokens: []uint32{1, 2, 3}}}}, expected: Different, }, + "same number of instances, prioritize diff than ReadOnly": { + r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "one", State: ACTIVE}}}, + r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "two", State: READONLY}}}, + expected: Different, + }, } for testName, testData := range tests { diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 7377cbcccd4..f4557ec5436 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -333,12 +333,16 @@ func (r *Ring) updateRingState(ringDesc *Desc) { } rc := prevRing.RingCompare(ringDesc) - if rc == Equal || rc == EqualButStatesAndTimestamps { + if rc == Equal || rc == EqualButStatesAndTimestamps || rc == EqualButReadOnly { // No need to update tokens or zones. Only states and timestamps // have changed. (If Equal, nothing has changed, but that doesn't happen // when watching the ring for updates). r.mtx.Lock() r.ringDesc = ringDesc + if rc == EqualButReadOnly && r.shuffledSubringCache != nil { + // Invalidate all cached subrings. + r.shuffledSubringCache = make(map[subringCacheKey]*Ring) + } r.updateRingMetrics(rc) r.mtx.Unlock() return @@ -852,7 +856,9 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur // If the lookback is enabled and this instance has been registered within the lookback period // then we should include it in the subring but continuing selecting instances. - if lookbackPeriod > 0 && instance.RegisteredTimestamp >= lookbackUntil { + // If an instance is in READONLY we should always extend. The write path will filter it out when GetRing. + // The read path should extend to get new ingester used on write + if (lookbackPeriod > 0 && instance.RegisteredTimestamp >= lookbackUntil) || instance.State == READONLY { continue } diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index a5937e2e8ec..ff51e9a4e6c 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -2523,6 +2523,111 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) { } } +func TestRing_ShuffleShardWithReadOnlyIngesters(t *testing.T) { + g := NewRandomTokenGenerator() + + const ( + userID = "user-1" + ) + + tests := map[string]struct { + ringInstances map[string]InstanceDesc + ringReplicationFactor int + shardSize int + expectedSize int + op Operation + expectedToBePresent []string + }{ + "single zone, shard size = 1, default scenario": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-a", 128, true)}, + }, + ringReplicationFactor: 1, + shardSize: 1, + expectedSize: 1, + }, + "single zone, shard size = 1, not filter ReadOnly": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: READONLY, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-a", 128, true)}, + }, + ringReplicationFactor: 1, + shardSize: 2, + expectedSize: 2, + }, + "single zone, shard size = 4, do not filter other states": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: JOINING, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-a", 128, true)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-a", State: LEAVING, Tokens: g.GenerateTokens(NewDesc(), "instance-3", "zone-a", 128, true)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: PENDING, Tokens: g.GenerateTokens(NewDesc(), "instance-4", "zone-a", 128, true)}, + }, + ringReplicationFactor: 1, + shardSize: 4, + expectedSize: 4, + }, + "single zone, shard size = 4, extend on readOnly": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{2}}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{4}}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{6}}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: READONLY, Tokens: []uint32{1, 3, 5}}, + }, + ringReplicationFactor: 1, + shardSize: 2, + expectedSize: 3, + expectedToBePresent: []string{"instance-4"}, + }, + "rf = 3, shard size = 4, extend readOnly from different zones": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{2}}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-b", State: ACTIVE, Tokens: []uint32{12}}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-c", State: ACTIVE, Tokens: []uint32{22}}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{4}}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-b", State: ACTIVE, Tokens: []uint32{14}}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE, Tokens: []uint32{24}}, + "instance-7": {Addr: "127.0.0.7", Zone: "zone-a", State: READONLY, Tokens: []uint32{1, 3}}, + "instance-8": {Addr: "127.0.0.8", Zone: "zone-b", State: READONLY, Tokens: []uint32{11, 13}}, + "instance-9": {Addr: "127.0.0.9", Zone: "zone-c", State: READONLY, Tokens: []uint32{21, 23}}, + }, + ringReplicationFactor: 3, + shardSize: 6, + expectedSize: 9, + expectedToBePresent: []string{"instance-7", "instance-8", "instance-9"}, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + // Init the ring. + ringDesc := &Desc{Ingesters: testData.ringInstances} + for id, instance := range ringDesc.Ingesters { + ringDesc.Ingesters[id] = instance + } + + ring := Ring{ + cfg: Config{ + ReplicationFactor: testData.ringReplicationFactor, + }, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(), + KVClient: &MockClient{}, + } + + shardRing := ring.ShuffleShard(userID, testData.shardSize) + assert.Equal(t, testData.expectedSize, shardRing.InstancesCount()) + for _, expectedInstance := range testData.expectedToBePresent { + assert.True(t, shardRing.HasInstance(expectedInstance)) + } + }) + } +} + func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { // The goal of this test is NOT to ensure that the minimum required number of instances // are returned at any given time, BUT at least all required instances are returned.