Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased

* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517

## 1.19.0 in progress

Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -3122,7 +3122,7 @@ func (i *Ingester) flushHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

// ModeHandler Change mode of ingester. It will also update set unregisterOnShutdown to true if READONLY mode
// ModeHandler Change mode of ingester.
func (i *Ingester) ModeHandler(w http.ResponseWriter, r *http.Request) {
err := r.ParseForm()
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,12 @@ func (i *Lifecycler) changeState(ctx context.Context, state InstanceState) error

level.Info(i.logger).Log("msg", "changing instance state from", "old_state", currState, "new_state", state, "ring", i.RingName)
i.setState(state)

//The instances is rejoining the ring. It should reset its registered time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question, what happens if we don't reset the registered time?

Copy link
Contributor Author

@danielblando danielblando Jan 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to catch cases the ingester goes to READONLY and back to ACTIVE.
The query still need to extend on these cases. The change on registeredTimestamp enforce that query will continue to extend requests on these ingesters.

if currState == READONLY && state == ACTIVE {
registeredAt := time.Now()
i.setRegisteredAt(registeredAt)
}
return i.updateConsul(ctx)
}

Expand Down
80 changes: 80 additions & 0 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,86 @@ func TestTokenFileOnDisk(t *testing.T) {
}
}

func TestRegisteredAtOnBackToActive(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck

tokenDir := t.TempDir()

lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
lifecyclerConfig.NumTokens = 512
lifecyclerConfig.TokensFilePath = tokenDir + "/tokens"

// Start first ingester.
l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", ringKey, true, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), l1))

// Check this ingester joined, is active.
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing1"].State == ACTIVE
})

//Get original registeredTime
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)
desc, ok := d.(*Desc)
require.True(t, ok)
originalRegisterTime := desc.Ingesters["ing1"].RegisteredTimestamp

// Change state from ACTIVE to READONLY
err = l1.ChangeState(context.Background(), READONLY)
require.NoError(t, err)
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
return ok &&
desc.Ingesters["ing1"].State == READONLY
})

//Guarantee 1s diff for RegisteredTimestamp
time.Sleep(1 * time.Second)

// Change state from READONLY to ACTIVE
err = l1.ChangeState(context.Background(), ACTIVE)
require.NoError(t, err)
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
return ok &&
desc.Ingesters["ing1"].State == ACTIVE
})

d, err = r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok = d.(*Desc)
require.True(t, ok)
ing := desc.Ingesters["ing1"]
require.True(t, ing.RegisteredTimestamp > originalRegisterTime)

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l1))
}

func TestTokenFileOnDisk_WithoutAutoJoinOnStartup(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand Down
19 changes: 14 additions & 5 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,10 +543,11 @@ type CompareResult int
const (
Equal CompareResult = iota // Both rings contain same exact instances.
EqualButStatesAndTimestamps // Both rings contain the same instances with the same data except states and timestamps (may differ).
EqualButReadOnly // Both rings contain the same instances but Write ring can change due to ReadOnly update
Different // Rings have different set of instances, or their information don't match.
)

// RingCompare compares this ring against another one and returns one of Equal, EqualButStatesAndTimestamps or Different.
// RingCompare compares this ring against another one and returns one of Equal, EqualButStatesAndTimestamps, EqualButReadOnly or Different.
func (d *Desc) RingCompare(o *Desc) CompareResult {
if d == nil {
if o == nil || len(o.Ingesters) == 0 {
Expand All @@ -566,6 +567,7 @@ func (d *Desc) RingCompare(o *Desc) CompareResult {
}

equalStatesAndTimestamps := true
equalReadOnly := true

for name, ing := range d.Ingesters {
oing, ok := o.Ingesters[name]
Expand Down Expand Up @@ -600,14 +602,21 @@ func (d *Desc) RingCompare(o *Desc) CompareResult {
}

if ing.State != oing.State {
equalStatesAndTimestamps = false
if ing.State == READONLY || oing.State == READONLY {
equalReadOnly = false
} else {
equalStatesAndTimestamps = false
}
}
}

if equalStatesAndTimestamps {
return Equal
if !equalReadOnly {
return EqualButReadOnly
}
if !equalStatesAndTimestamps {
return EqualButStatesAndTimestamps
}
return EqualButStatesAndTimestamps
return Equal
}

func GetOrCreateRingDesc(d interface{}) *Desc {
Expand Down
20 changes: 20 additions & 0 deletions pkg/ring/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,21 @@ func TestDesc_RingsCompare(t *testing.T) {
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1"}}},
expected: Equal,
},
"same number of instances, from active to readOnly": {
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE}}},
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY}}},
expected: EqualButReadOnly,
},
"same number of instances, from readOnly to active": {
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY}}},
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE}}},
expected: EqualButReadOnly,
},
"same number of instances, prioritize readOnly than timestamp changes": {
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE, Timestamp: 123456}}},
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: READONLY, Timestamp: 789012}}},
expected: EqualButReadOnly,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused about the name. I understand the prioritization but it is weird to call it equal when you have timestamp different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm I get what you mean, but i dont have a better naming. I think this is ok as ReadOnly is less restrictive than EqualButTimestamporState

},
"same single instance, different timestamp": {
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Timestamp: 123456}}},
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Timestamp: 789012}}},
Expand Down Expand Up @@ -440,6 +455,11 @@ func TestDesc_RingsCompare(t *testing.T) {
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing2": {Addr: "addr1", Tokens: []uint32{1, 2, 3}}}},
expected: Different,
},
"same number of instances, prioritize diff than ReadOnly": {
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "one", State: ACTIVE}}},
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "two", State: READONLY}}},
expected: Different,
},
}

for testName, testData := range tests {
Expand Down
10 changes: 8 additions & 2 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,12 +333,16 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
}

rc := prevRing.RingCompare(ringDesc)
if rc == Equal || rc == EqualButStatesAndTimestamps {
if rc == Equal || rc == EqualButStatesAndTimestamps || rc == EqualButReadOnly {
// No need to update tokens or zones. Only states and timestamps
// have changed. (If Equal, nothing has changed, but that doesn't happen
// when watching the ring for updates).
r.mtx.Lock()
r.ringDesc = ringDesc
if rc == EqualButReadOnly && r.shuffledSubringCache != nil {
// Invalidate all cached subrings.
r.shuffledSubringCache = make(map[subringCacheKey]*Ring)
}
r.updateRingMetrics(rc)
r.mtx.Unlock()
return
Expand Down Expand Up @@ -852,7 +856,9 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur

// If the lookback is enabled and this instance has been registered within the lookback period
// then we should include it in the subring but continuing selecting instances.
if lookbackPeriod > 0 && instance.RegisteredTimestamp >= lookbackUntil {
// If an instance is in READONLY we should always extend. The write path will filter it out when GetRing.
// The read path should extend to get new ingester used on write
if (lookbackPeriod > 0 && instance.RegisteredTimestamp >= lookbackUntil) || instance.State == READONLY {
continue
}

Expand Down
105 changes: 105 additions & 0 deletions pkg/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2523,6 +2523,111 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) {
}
}

func TestRing_ShuffleShardWithReadOnlyIngesters(t *testing.T) {
g := NewRandomTokenGenerator()

const (
userID = "user-1"
)

tests := map[string]struct {
ringInstances map[string]InstanceDesc
ringReplicationFactor int
shardSize int
expectedSize int
op Operation
expectedToBePresent []string
}{
"single zone, shard size = 1, default scenario": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-a", 128, true)},
},
ringReplicationFactor: 1,
shardSize: 1,
expectedSize: 1,
},
"single zone, shard size = 1, not filter ReadOnly": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: READONLY, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-a", 128, true)},
},
ringReplicationFactor: 1,
shardSize: 2,
expectedSize: 2,
},
"single zone, shard size = 4, do not filter other states": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: g.GenerateTokens(NewDesc(), "instance-1", "zone-a", 128, true)},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: JOINING, Tokens: g.GenerateTokens(NewDesc(), "instance-2", "zone-a", 128, true)},
"instance-3": {Addr: "127.0.0.3", Zone: "zone-a", State: LEAVING, Tokens: g.GenerateTokens(NewDesc(), "instance-3", "zone-a", 128, true)},
"instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: PENDING, Tokens: g.GenerateTokens(NewDesc(), "instance-4", "zone-a", 128, true)},
},
ringReplicationFactor: 1,
shardSize: 4,
expectedSize: 4,
},
"single zone, shard size = 4, extend on readOnly": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{2}},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{4}},
"instance-3": {Addr: "127.0.0.3", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{6}},
"instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: READONLY, Tokens: []uint32{1, 3, 5}},
},
ringReplicationFactor: 1,
shardSize: 2,
expectedSize: 3,
expectedToBePresent: []string{"instance-4"},
},
"rf = 3, shard size = 4, extend readOnly from different zones": {
ringInstances: map[string]InstanceDesc{
"instance-1": {Addr: "127.0.0.1", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{2}},
"instance-2": {Addr: "127.0.0.2", Zone: "zone-b", State: ACTIVE, Tokens: []uint32{12}},
"instance-3": {Addr: "127.0.0.3", Zone: "zone-c", State: ACTIVE, Tokens: []uint32{22}},
"instance-4": {Addr: "127.0.0.4", Zone: "zone-a", State: ACTIVE, Tokens: []uint32{4}},
"instance-5": {Addr: "127.0.0.5", Zone: "zone-b", State: ACTIVE, Tokens: []uint32{14}},
"instance-6": {Addr: "127.0.0.6", Zone: "zone-c", State: ACTIVE, Tokens: []uint32{24}},
"instance-7": {Addr: "127.0.0.7", Zone: "zone-a", State: READONLY, Tokens: []uint32{1, 3}},
"instance-8": {Addr: "127.0.0.8", Zone: "zone-b", State: READONLY, Tokens: []uint32{11, 13}},
"instance-9": {Addr: "127.0.0.9", Zone: "zone-c", State: READONLY, Tokens: []uint32{21, 23}},
},
ringReplicationFactor: 3,
shardSize: 6,
expectedSize: 9,
expectedToBePresent: []string{"instance-7", "instance-8", "instance-9"},
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Init the ring.
ringDesc := &Desc{Ingesters: testData.ringInstances}
for id, instance := range ringDesc.Ingesters {
ringDesc.Ingesters[id] = instance
}

ring := Ring{
cfg: Config{
ReplicationFactor: testData.ringReplicationFactor,
},
ringDesc: ringDesc,
ringTokens: ringDesc.GetTokens(),
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
strategy: NewDefaultReplicationStrategy(),
KVClient: &MockClient{},
}

shardRing := ring.ShuffleShard(userID, testData.shardSize)
assert.Equal(t, testData.expectedSize, shardRing.InstancesCount())
for _, expectedInstance := range testData.expectedToBePresent {
assert.True(t, shardRing.HasInstance(expectedInstance))
}
})
}
}

func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) {
// The goal of this test is NOT to ensure that the minimum required number of instances
// are returned at any given time, BUT at least all required instances are returned.
Expand Down
Loading