Skip to content

Commit a7cedef

Browse files
authored
Merge pull request #1464 from cortexproject/normalised-ingester-startup
Fix ingester startup with a normalised ring
2 parents 0224dc1 + de92875 commit a7cedef

File tree

3 files changed

+87
-31
lines changed

3 files changed

+87
-31
lines changed

pkg/ring/lifecycler_test.go

Lines changed: 83 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,37 @@ func (f *flushTransferer) TransferOut(ctx context.Context) error {
2424
return f.lifecycler.ChangeState(ctx, ACTIVE)
2525
}
2626

27+
func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig {
28+
var lifecyclerConfig LifecyclerConfig
29+
flagext.DefaultValues(&lifecyclerConfig)
30+
lifecyclerConfig.Addr = "0.0.0.0"
31+
lifecyclerConfig.Port = 1
32+
lifecyclerConfig.RingConfig = ringConfig
33+
lifecyclerConfig.NumTokens = 1
34+
lifecyclerConfig.ClaimOnRollout = true
35+
lifecyclerConfig.ID = id
36+
lifecyclerConfig.FinalSleep = 0
37+
return lifecyclerConfig
38+
}
39+
40+
func checkDenormalised(d interface{}, id string) bool {
41+
desc, ok := d.(*Desc)
42+
return ok &&
43+
len(desc.Ingesters) == 1 &&
44+
desc.Ingesters[id].State == ACTIVE &&
45+
len(desc.Ingesters[id].Tokens) == 0 &&
46+
len(desc.Tokens) == 1
47+
}
48+
49+
func checkNormalised(d interface{}, id string) bool {
50+
desc, ok := d.(*Desc)
51+
return ok &&
52+
len(desc.Ingesters) == 1 &&
53+
desc.Ingesters[id].State == ACTIVE &&
54+
len(desc.Ingesters[id].Tokens) == 1 &&
55+
len(desc.Tokens) == 0
56+
}
57+
2758
func TestRingNormaliseMigration(t *testing.T) {
2859
var ringConfig Config
2960
flagext.DefaultValues(&ringConfig)
@@ -35,15 +66,7 @@ func TestRingNormaliseMigration(t *testing.T) {
3566
defer r.Stop()
3667

3768
// Add an 'ingester' with denormalised tokens.
38-
var lifecyclerConfig1 LifecyclerConfig
39-
flagext.DefaultValues(&lifecyclerConfig1)
40-
lifecyclerConfig1.Addr = "0.0.0.0"
41-
lifecyclerConfig1.Port = 1
42-
lifecyclerConfig1.RingConfig = ringConfig
43-
lifecyclerConfig1.NumTokens = 1
44-
lifecyclerConfig1.ClaimOnRollout = true
45-
lifecyclerConfig1.ID = "ing1"
46-
lifecyclerConfig1.FinalSleep = 0
69+
lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1")
4770

4871
ft := &flushTransferer{}
4972
l1, err := NewLifecycler(lifecyclerConfig1, ft, "ingester")
@@ -53,23 +76,15 @@ func TestRingNormaliseMigration(t *testing.T) {
5376
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
5477
d, err := r.KVClient.Get(context.Background(), ConsulKey)
5578
require.NoError(t, err)
56-
57-
desc, ok := d.(*Desc)
58-
return ok &&
59-
len(desc.Ingesters) == 1 &&
60-
desc.Ingesters["ing1"].State == ACTIVE &&
61-
len(desc.Ingesters["ing1"].Tokens) == 0 &&
62-
len(desc.Tokens) == 1
79+
return checkDenormalised(d, "ing1")
6380
})
6481

6582
token := l1.tokens[0]
6683

6784
// Add a second ingester with normalised tokens.
68-
var lifecyclerConfig2 = lifecyclerConfig1
85+
var lifecyclerConfig2 = testLifecyclerConfig(ringConfig, "ing2")
6986
lifecyclerConfig2.JoinAfter = 100 * time.Second
7087
lifecyclerConfig2.NormaliseTokens = true
71-
lifecyclerConfig2.ID = "ing2"
72-
lifecyclerConfig1.FinalSleep = 0
7388

7489
l2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester")
7590
require.NoError(t, err)
@@ -82,13 +97,55 @@ func TestRingNormaliseMigration(t *testing.T) {
8297
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
8398
d, err := r.KVClient.Get(context.Background(), ConsulKey)
8499
require.NoError(t, err)
100+
return checkNormalised(d, "ing2") &&
101+
d.(*Desc).Ingesters["ing2"].Tokens[0] == token
102+
})
103+
}
104+
105+
type nopFlushTransferer struct{}
106+
107+
func (f *nopFlushTransferer) StopIncomingRequests() {}
108+
func (f *nopFlushTransferer) Flush() {}
109+
func (f *nopFlushTransferer) TransferOut(ctx context.Context) error {
110+
panic("should not be called")
111+
}
112+
113+
func TestRingRestart(t *testing.T) {
114+
var ringConfig Config
115+
flagext.DefaultValues(&ringConfig)
116+
codec := ProtoCodec{Factory: ProtoDescFactory}
117+
ringConfig.KVStore.Mock = NewInMemoryKVClient(codec)
118+
119+
r, err := New(ringConfig, "ingester")
120+
require.NoError(t, err)
121+
defer r.Stop()
122+
123+
// Add an 'ingester' with normalised tokens.
124+
lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1")
125+
lifecyclerConfig1.NormaliseTokens = true
126+
l1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester")
127+
require.NoError(t, err)
128+
129+
// Check this ingester joined, is active, and has one token.
130+
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
131+
d, err := r.KVClient.Get(context.Background(), ConsulKey)
132+
require.NoError(t, err)
133+
return checkNormalised(d, "ing1")
134+
})
85135

86-
desc, ok := d.(*Desc)
87-
return ok &&
88-
len(desc.Ingesters) == 1 &&
89-
desc.Ingesters["ing2"].State == ACTIVE &&
90-
len(desc.Ingesters["ing2"].Tokens) == 1 &&
91-
desc.Ingesters["ing2"].Tokens[0] == token &&
92-
len(desc.Tokens) == 0
136+
token := l1.tokens[0]
137+
138+
// Add a second ingester with the same settings, so it will think it has restarted
139+
l2, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester")
140+
require.NoError(t, err)
141+
142+
// Check the new ingester picked up the same token
143+
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
144+
d, err := r.KVClient.Get(context.Background(), ConsulKey)
145+
require.NoError(t, err)
146+
l2Tokens := l2.getTokens()
147+
return checkNormalised(d, "ing1") &&
148+
len(l2Tokens) == 1 &&
149+
l2Tokens[0] == token
93150
})
94151
}

pkg/ring/model.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (d *Desc) Ready(heartbeatTimeout time.Duration) error {
143143
// TokensFor partitions the tokens into those for the given ID, and those for others.
144144
func (d *Desc) TokensFor(id string) (tokens, other []uint32) {
145145
var takenTokens, myTokens []uint32
146-
for _, token := range d.Tokens {
146+
for _, token := range migrateRing(d) {
147147
takenTokens = append(takenTokens, token.Token)
148148
if token.Ingester == id {
149149
myTokens = append(myTokens, token.Token)

pkg/ring/ring.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func (r *Ring) loop(ctx context.Context) {
150150
}
151151

152152
ringDesc := value.(*Desc)
153-
ringDesc = r.migrateRing(ringDesc)
153+
ringDesc.Tokens = migrateRing(ringDesc)
154154
r.mtx.Lock()
155155
defer r.mtx.Unlock()
156156
r.ringDesc = ringDesc
@@ -159,7 +159,7 @@ func (r *Ring) loop(ctx context.Context) {
159159
}
160160

161161
// migrateRing will denormalise the ring's tokens if stored in normal form.
162-
func (r *Ring) migrateRing(desc *Desc) *Desc {
162+
func migrateRing(desc *Desc) []TokenDesc {
163163
numTokens := len(desc.Tokens)
164164
for _, ing := range desc.Ingesters {
165165
numTokens += len(ing.Tokens)
@@ -175,8 +175,7 @@ func (r *Ring) migrateRing(desc *Desc) *Desc {
175175
}
176176
}
177177
sort.Sort(ByToken(tokens))
178-
desc.Tokens = tokens
179-
return desc
178+
return tokens
180179
}
181180

182181
// Get returns n (or more) ingesters which form the replicas for the given key.

0 commit comments

Comments
 (0)