diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index 03308e725e0..0f4457d5f92 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -24,6 +24,37 @@ func (f *flushTransferer) TransferOut(ctx context.Context) error { return f.lifecycler.ChangeState(ctx, ACTIVE) } +func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig { + var lifecyclerConfig LifecyclerConfig + flagext.DefaultValues(&lifecyclerConfig) + lifecyclerConfig.Addr = "0.0.0.0" + lifecyclerConfig.Port = 1 + lifecyclerConfig.RingConfig = ringConfig + lifecyclerConfig.NumTokens = 1 + lifecyclerConfig.ClaimOnRollout = true + lifecyclerConfig.ID = id + lifecyclerConfig.FinalSleep = 0 + return lifecyclerConfig +} + +func checkDenormalised(d interface{}, id string) bool { + desc, ok := d.(*Desc) + return ok && + len(desc.Ingesters) == 1 && + desc.Ingesters[id].State == ACTIVE && + len(desc.Ingesters[id].Tokens) == 0 && + len(desc.Tokens) == 1 +} + +func checkNormalised(d interface{}, id string) bool { + desc, ok := d.(*Desc) + return ok && + len(desc.Ingesters) == 1 && + desc.Ingesters[id].State == ACTIVE && + len(desc.Ingesters[id].Tokens) == 1 && + len(desc.Tokens) == 0 +} + func TestRingNormaliseMigration(t *testing.T) { var ringConfig Config flagext.DefaultValues(&ringConfig) @@ -35,15 +66,7 @@ func TestRingNormaliseMigration(t *testing.T) { defer r.Stop() // Add an 'ingester' with denormalised tokens. - var lifecyclerConfig1 LifecyclerConfig - flagext.DefaultValues(&lifecyclerConfig1) - lifecyclerConfig1.Addr = "0.0.0.0" - lifecyclerConfig1.Port = 1 - lifecyclerConfig1.RingConfig = ringConfig - lifecyclerConfig1.NumTokens = 1 - lifecyclerConfig1.ClaimOnRollout = true - lifecyclerConfig1.ID = "ing1" - lifecyclerConfig1.FinalSleep = 0 + lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1") ft := &flushTransferer{} l1, err := NewLifecycler(lifecyclerConfig1, ft, "ingester") @@ -53,23 +76,15 @@ func TestRingNormaliseMigration(t *testing.T) { test.Poll(t, 1000*time.Millisecond, true, func() interface{} { d, err := r.KVClient.Get(context.Background(), ConsulKey) require.NoError(t, err) - - desc, ok := d.(*Desc) - return ok && - len(desc.Ingesters) == 1 && - desc.Ingesters["ing1"].State == ACTIVE && - len(desc.Ingesters["ing1"].Tokens) == 0 && - len(desc.Tokens) == 1 + return checkDenormalised(d, "ing1") }) token := l1.tokens[0] // Add a second ingester with normalised tokens. - var lifecyclerConfig2 = lifecyclerConfig1 + var lifecyclerConfig2 = testLifecyclerConfig(ringConfig, "ing2") lifecyclerConfig2.JoinAfter = 100 * time.Second lifecyclerConfig2.NormaliseTokens = true - lifecyclerConfig2.ID = "ing2" - lifecyclerConfig1.FinalSleep = 0 l2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester") require.NoError(t, err) @@ -82,13 +97,55 @@ func TestRingNormaliseMigration(t *testing.T) { test.Poll(t, 1000*time.Millisecond, true, func() interface{} { d, err := r.KVClient.Get(context.Background(), ConsulKey) require.NoError(t, err) + return checkNormalised(d, "ing2") && + d.(*Desc).Ingesters["ing2"].Tokens[0] == token + }) +} + +type nopFlushTransferer struct{} + +func (f *nopFlushTransferer) StopIncomingRequests() {} +func (f *nopFlushTransferer) Flush() {} +func (f *nopFlushTransferer) TransferOut(ctx context.Context) error { + panic("should not be called") +} + +func TestRingRestart(t *testing.T) { + var ringConfig Config + flagext.DefaultValues(&ringConfig) + codec := ProtoCodec{Factory: ProtoDescFactory} + ringConfig.KVStore.Mock = NewInMemoryKVClient(codec) + + r, err := New(ringConfig, "ingester") + require.NoError(t, err) + defer r.Stop() + + // Add an 'ingester' with normalised tokens. + lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1") + lifecyclerConfig1.NormaliseTokens = true + l1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester") + require.NoError(t, err) + + // Check this ingester joined, is active, and has one token. + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), ConsulKey) + require.NoError(t, err) + return checkNormalised(d, "ing1") + }) - desc, ok := d.(*Desc) - return ok && - len(desc.Ingesters) == 1 && - desc.Ingesters["ing2"].State == ACTIVE && - len(desc.Ingesters["ing2"].Tokens) == 1 && - desc.Ingesters["ing2"].Tokens[0] == token && - len(desc.Tokens) == 0 + token := l1.tokens[0] + + // Add a second ingester with the same settings, so it will think it has restarted + l2, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester") + require.NoError(t, err) + + // Check the new ingester picked up the same token + test.Poll(t, 1000*time.Millisecond, true, func() interface{} { + d, err := r.KVClient.Get(context.Background(), ConsulKey) + require.NoError(t, err) + l2Tokens := l2.getTokens() + return checkNormalised(d, "ing1") && + len(l2Tokens) == 1 && + l2Tokens[0] == token }) } diff --git a/pkg/ring/model.go b/pkg/ring/model.go index 5644acaf51d..82d0de2f319 100644 --- a/pkg/ring/model.go +++ b/pkg/ring/model.go @@ -143,7 +143,7 @@ func (d *Desc) Ready(heartbeatTimeout time.Duration) error { // TokensFor partitions the tokens into those for the given ID, and those for others. func (d *Desc) TokensFor(id string) (tokens, other []uint32) { var takenTokens, myTokens []uint32 - for _, token := range d.Tokens { + for _, token := range migrateRing(d) { takenTokens = append(takenTokens, token.Token) if token.Ingester == id { myTokens = append(myTokens, token.Token) diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 61c1aa31287..46bdfc01d82 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -150,7 +150,7 @@ func (r *Ring) loop(ctx context.Context) { } ringDesc := value.(*Desc) - ringDesc = r.migrateRing(ringDesc) + ringDesc.Tokens = migrateRing(ringDesc) r.mtx.Lock() defer r.mtx.Unlock() r.ringDesc = ringDesc @@ -159,7 +159,7 @@ func (r *Ring) loop(ctx context.Context) { } // migrateRing will denormalise the ring's tokens if stored in normal form. -func (r *Ring) migrateRing(desc *Desc) *Desc { +func migrateRing(desc *Desc) []TokenDesc { numTokens := len(desc.Tokens) for _, ing := range desc.Ingesters { numTokens += len(ing.Tokens) @@ -175,8 +175,7 @@ func (r *Ring) migrateRing(desc *Desc) *Desc { } } sort.Sort(ByToken(tokens)) - desc.Tokens = tokens - return desc + return tokens } // Get returns n (or more) ingesters which form the replicas for the given key.