Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* [BUGFIX] Query Frontend: Fix panic caused by nil pointer dereference. #6609
* [BUGFIX] Ingester: Add check to avoid query 5xx when closing tsdb. #6616
* [BUGFIX] Querier: Fix panic when marshaling QueryResultRequest. #6601
* [BUGFIX] Ingester: Avoid resharding for query when restart readonly ingesters. #6642

## 1.19.0 2025-02-27

Expand Down
13 changes: 9 additions & 4 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,10 +655,15 @@ func (i *Lifecycler) stopping(runningError error) error {
i.setPreviousState(currentState)
}

// Mark ourselved as Leaving so no more samples are send to us.
err := i.changeState(context.Background(), LEAVING)
if err != nil {
level.Error(i.logger).Log("msg", "failed to set state to LEAVING", "ring", i.RingName, "err", err)
// We dont need to mark us as leaving if READONLY. There is not request sent to us.
// Also important to avoid this change so we dont have resharding(for querier) happen when READONLY restart as we extended shard on READONLY but not on LEAVING
// Query also keeps calling pods on LEAVING or JOINING not causing any difference if left on READONLY
if i.GetState() != READONLY {
// Mark ourselved as Leaving so no more samples are send to us.
err := i.changeState(context.Background(), LEAVING)
if err != nil {
level.Error(i.logger).Log("msg", "failed to set state to LEAVING", "ring", i.RingName, "err", err)
}
}

// Do the transferring / flushing on a background goroutine so we can continue
Expand Down
86 changes: 86 additions & 0 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,92 @@ func TestRestartIngester_DisabledHeartbeat_unregister_on_shutdown_false(t *testi
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))
}

func TestRestartIngester_READONLY(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = ringStore

r, err := New(ringConfig, "ingester", ringKey, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))

// poll function waits for a condition and returning actual state of the ingesters after the condition succeed.
poll := func(condition func(*Desc) bool) map[string]InstanceDesc {
var ingesters map[string]InstanceDesc
test.Poll(t, 5*time.Second, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ringKey)
require.NoError(t, err)

desc, ok := d.(*Desc)

if ok {
ingesters = desc.Ingesters
}
return ok && condition(desc)
})

return ingesters
}

// Starts Ingester and wait it to became active
startIngesterAndWaitState := func(ingId string, addr string, expectedState InstanceState) *Lifecycler {
lifecyclerConfig := testLifecyclerConfigWithAddr(ringConfig, ingId, addr)
lifecyclerConfig.UnregisterOnShutdown = false
lifecycler, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "lifecycler", ringKey, true, true, log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), lifecycler))
poll(func(desc *Desc) bool {
return desc.Ingesters[ingId].State == expectedState
})
return lifecycler
}

l1 := startIngesterAndWaitState("ing1", "0.0.0.0", ACTIVE)
l2 := startIngesterAndWaitState("ing2", "0.0.0.0", ACTIVE)

err = l2.ChangeState(context.Background(), READONLY)
require.NoError(t, err)
poll(func(desc *Desc) bool {
return desc.Ingesters["ing2"].State == READONLY
})

ingesters := poll(func(desc *Desc) bool {
return len(desc.Ingesters) == 2 && desc.Ingesters["ing1"].State == ACTIVE && desc.Ingesters["ing2"].State == READONLY
})

// Both Ingester should be active and running
assert.Equal(t, ACTIVE, ingesters["ing1"].State)
assert.Equal(t, READONLY, ingesters["ing2"].State)

// Stop ingester 1 gracefully should leave it on LEAVING STATE on the ring
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l1))
// Stop ingester 2 gracefully should keep on READONLY
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), l2))

ingesters = poll(func(desc *Desc) bool {
return len(desc.Ingesters) == 2 && desc.Ingesters["ing1"].State == LEAVING
})

assert.Equal(t, LEAVING, ingesters["ing1"].State)
assert.Equal(t, READONLY, ingesters["ing2"].State)

// Start Ingester1 again - Should flip back to ACTIVE in the ring
defer services.StopAndAwaitTerminated(context.Background(), startIngesterAndWaitState("ing1", "0.0.0.0", ACTIVE)) //nolint:errcheck

// Start Ingester2 again - Should keep on READONLY
defer services.StopAndAwaitTerminated(context.Background(), startIngesterAndWaitState("ing2", "0.0.0.0", READONLY)) //nolint:errcheck

ingesters = poll(func(desc *Desc) bool {
return len(desc.Ingesters) == 2 && desc.Ingesters["ing1"].State == ACTIVE
})

assert.Equal(t, ACTIVE, ingesters["ing1"].State)
assert.Equal(t, READONLY, ingesters["ing2"].State)
}

func TestTokenFileOnDisk(t *testing.T) {
ringStore, closer := consul.NewInMemoryClient(GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand Down
Loading