Skip to content

Commit 1c5ff0c

Browse files
pstibranygouthamve
authored andcommitted
Removed support for writing denormalised tokens in the ring. (#1809)
* Removed support for using denormalised tokens in the ring. Still supported: - claiming tokens from another ingester, that is using denormalised tokens - finding denormalised tokens in the ring, when joining Other ingesters can still use denormalised tokens, migrateRing takes care of that. Signed-off-by: Peter Štibraný <[email protected]> * Updated arguments.md Signed-off-by: Peter Štibraný <[email protected]> * Added dot, updated arguments.md. Signed-off-by: Peter Štibraný <[email protected]> * Updated comment, added test. Signed-off-by: Peter Štibraný <[email protected]> * If there are tokens for this ingester already, keep them. Signed-off-by: Peter Štibraný <[email protected]> * Added changelog entry. Signed-off-by: Peter Štibraný <[email protected]> * Added PR number. Signed-off-by: Peter Štibraný <[email protected]> * Document that AddIngester replaces existing tokens with new ones. Signed-off-by: Peter Štibraný <[email protected]> * Test that AddIngester replaces existing tokens with new ones. Signed-off-by: Peter Štibraný <[email protected]> * Test that AddIngester replaces existing tokens with new ones. Signed-off-by: Peter Štibraný <[email protected]> * Switching back to denormalised tokens requires Cortex 0.4.0, Signed-off-by: Peter Štibraný <[email protected]> * Move changelog entry to the top. Signed-off-by: Peter Štibraný <[email protected]> * Move unused flags to the bottom of struct. Signed-off-by: Peter Štibraný <[email protected]>
1 parent b63bf96 commit 1c5ff0c

File tree

8 files changed

+141
-118
lines changed

8 files changed

+141
-118
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
* `ruler.poll-interval` has been added to specify the interval in which to poll new rule groups.
1212
* [CHANGE] Use relative links from /ring page to make it work when used behind reverse proxy. #1896
1313
* [CHANGE] Deprecated `-distributor.limiter-reload-period` flag. #1766
14+
* [CHANGE] Ingesters now write only normalised tokens to the ring, although they can still read denormalised tokens used by other ingesters. `-ingester.normalise-tokens` is now deprecated, and ignored. If you want to switch back to using denormalised tokens, you need to downgrade to Cortex 0.4.0. Previous versions don't handle claiming tokens from normalised ingesters correctly. #1809
1415
* [FEATURE] The distributor can now drop labels from samples (similar to the removal of the replica label for HA ingestion) per user via the `distributor.drop-label` flag. #1726
1516
* [FEATURE] Added `global` ingestion rate limiter strategy. Deprecated `-distributor.limiter-reload-period` flag. #1766
1617
* [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861

docs/configuration/arguments.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,9 +242,11 @@ It also talks to a KVStore and has it's own copies of the same flags used by the
242242

243243
- `-ingester.normalise-tokens`
244244

245-
Write out "normalised" tokens to the ring. Normalised tokens consume less memory to encode and decode; as the ring is unmarshalled regularly, this significantly reduces memory usage of anything that watches the ring.
245+
Deprecated. New ingesters always write "normalised" tokens to the ring. Normalised tokens consume less memory to encode and decode; as the ring is unmarshalled regularly, this significantly reduces memory usage of anything that watches the ring.
246246

247-
Before enabling, rollout a version of Cortex that supports normalised token for all jobs that interact with the ring, then rollout with this flag set to `true` on the ingesters. The new ring code can still read and write the old ring format, so is backwards compatible.
247+
Cortex 0.4.0 is the last version that can *write* denormalised tokens. Cortex 0.5.0 and later will always *write* normalised tokens, although it can still *read* denormalised tokens written by older ingesters.
248+
249+
It's perfectly OK to have a mix of ingesters running denormalised (<= 0.4.0) and normalised tokens (either by using `-ingester.normalise-tokens` in Cortex <= 0.4.0, or Cortex 0.5.0+) during upgrades.
248250

249251
- `-ingester.chunk-encoding`
250252

pkg/ring/lifecycler.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@ type LifecyclerConfig struct {
4949
ObservePeriod time.Duration `yaml:"observe_period,omitempty"`
5050
JoinAfter time.Duration `yaml:"join_after,omitempty"`
5151
MinReadyDuration time.Duration `yaml:"min_ready_duration,omitempty"`
52-
UnusedFlag bool `yaml:"claim_on_rollout,omitempty"` // DEPRECATED - left for backwards-compatibility
53-
NormaliseTokens bool `yaml:"normalise_tokens,omitempty"`
5452
InfNames []string `yaml:"interface_names"`
5553
FinalSleep time.Duration `yaml:"final_sleep"`
5654
TokensFilePath string `yaml:"tokens_file_path,omitempty"`
@@ -60,6 +58,10 @@ type LifecyclerConfig struct {
6058
Port int
6159
ID string
6260
SkipUnregister bool
61+
62+
// graveyard for unused flags.
63+
UnusedFlag bool `yaml:"claim_on_rollout,omitempty"` // DEPRECATED - left for backwards-compatibility
64+
UnusedFlag2 bool `yaml:"normalise_tokens,omitempty"` // DEPRECATED - left for backwards-compatibility
6365
}
6466

6567
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -83,7 +85,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
8385
f.DurationVar(&cfg.ObservePeriod, prefix+"observe-period", 0*time.Second, "Observe tokens after generating to resolve collisions. Useful when using gossiping ring.")
8486
f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 1*time.Minute, "Minimum duration to wait before becoming ready. This is to work around race conditions with ingesters exiting and updating the ring.")
8587
flagext.DeprecatedFlag(f, prefix+"claim-on-rollout", "DEPRECATED. This feature is no longer optional.")
86-
f.BoolVar(&cfg.NormaliseTokens, prefix+"normalise-tokens", false, "Store tokens in a normalised fashion to reduce allocations.")
88+
flagext.DeprecatedFlag(f, prefix+"normalise-tokens", "DEPRECATED. This feature is no longer optional.")
8789
f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.")
8890
f.StringVar(&cfg.TokensFilePath, prefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")
8991

@@ -287,7 +289,7 @@ func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) erro
287289
return nil, false, fmt.Errorf("Cannot claim tokens in an empty ring")
288290
}
289291

290-
tokens = ringDesc.ClaimTokens(ingesterID, i.ID, i.cfg.NormaliseTokens)
292+
tokens = ringDesc.ClaimTokens(ingesterID, i.ID)
291293
// update timestamp to give gossiping client a chance register ring change.
292294
ing := ringDesc.Ingesters[i.ID]
293295
ing.Timestamp = time.Now().Unix()
@@ -484,14 +486,14 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
484486
if len(tokensFromFile) >= i.cfg.NumTokens {
485487
i.setState(ACTIVE)
486488
}
487-
ringDesc.AddIngester(i.ID, i.Addr, tokensFromFile, i.GetState(), i.cfg.NormaliseTokens)
489+
ringDesc.AddIngester(i.ID, i.Addr, tokensFromFile, i.GetState())
488490
i.setTokens(tokensFromFile)
489491
return ringDesc, true, nil
490492
}
491493

492494
// Either we are a new ingester, or consul must have restarted
493495
level.Info(util.Logger).Log("msg", "instance not found in ring, adding with no tokens", "ring", i.RingName)
494-
ringDesc.AddIngester(i.ID, i.Addr, []uint32{}, i.GetState(), i.cfg.NormaliseTokens)
496+
ringDesc.AddIngester(i.ID, i.Addr, []uint32{}, i.GetState())
495497
return ringDesc, true, nil
496498
}
497499

@@ -540,7 +542,7 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
540542
ringTokens = append(ringTokens, newTokens...)
541543
sort.Sort(ringTokens)
542544

543-
ringDesc.AddIngester(i.ID, i.Addr, ringTokens, i.GetState(), i.cfg.NormaliseTokens)
545+
ringDesc.AddIngester(i.ID, i.Addr, ringTokens, i.GetState())
544546

545547
i.setTokens(ringTokens)
546548

@@ -597,12 +599,13 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState IngesterState) er
597599

598600
newTokens := GenerateTokens(i.cfg.NumTokens-len(myTokens), takenTokens)
599601
i.setState(targetState)
600-
ringDesc.AddIngester(i.ID, i.Addr, newTokens, i.GetState(), i.cfg.NormaliseTokens)
601602

602603
myTokens = append(myTokens, newTokens...)
603604
sort.Sort(myTokens)
604605
i.setTokens(myTokens)
605606

607+
ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState())
608+
606609
return ringDesc, true, nil
607610
})
608611

@@ -630,7 +633,7 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error {
630633
if !ok {
631634
// consul must have restarted
632635
level.Info(util.Logger).Log("msg", "found empty ring, inserting tokens", "ring", i.RingName)
633-
ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState(), i.cfg.NormaliseTokens)
636+
ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState())
634637
} else {
635638
ingesterDesc.Timestamp = time.Now().Unix()
636639
ingesterDesc.State = i.GetState()

pkg/ring/lifecycler_test.go

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,11 @@ func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig {
4343
return lifecyclerConfig
4444
}
4545

46-
func checkDenormalised(d interface{}, id string) bool {
46+
func checkDenormalisedLeaving(d interface{}, id string) bool {
4747
desc, ok := d.(*Desc)
4848
return ok &&
4949
len(desc.Ingesters) == 1 &&
50-
desc.Ingesters[id].State == ACTIVE &&
50+
desc.Ingesters[id].State == LEAVING &&
5151
len(desc.Ingesters[id].Tokens) == 0 &&
5252
len(desc.Tokens) == 1
5353
}
@@ -73,37 +73,59 @@ func TestRingNormaliseMigration(t *testing.T) {
7373
// Add an 'ingester' with denormalised tokens.
7474
lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1")
7575

76-
ft := &flushTransferer{}
77-
l1, err := NewLifecycler(lifecyclerConfig1, ft, "ingester", IngesterRingKey)
76+
// Since code to insert ingester with denormalised tokens into ring was removed,
77+
// instead of running lifecycler, we do it manually here.
78+
token := uint32(0)
79+
err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
80+
require.Nil(t, in)
81+
r := NewDesc()
82+
tks := GenerateTokens(lifecyclerConfig1.NumTokens, nil)
83+
r.Ingesters[lifecyclerConfig1.ID] = IngesterDesc{
84+
Addr: lifecyclerConfig1.Addr,
85+
Timestamp: time.Now().Unix(),
86+
State: LEAVING, // expected by second ingester`
87+
}
88+
for _, t := range tks {
89+
r.Tokens = append(r.Tokens, TokenDesc{
90+
Token: t,
91+
Ingester: lifecyclerConfig1.ID,
92+
})
93+
}
94+
token = tks[0]
95+
return r, true, nil
96+
})
7897
require.NoError(t, err)
79-
l1.Start()
8098

8199
// Check this ingester joined, is active, and has one token.
82100
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
83101
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
84102
require.NoError(t, err)
85-
return checkDenormalised(d, "ing1")
103+
return checkDenormalisedLeaving(d, "ing1")
86104
})
87105

88-
token := l1.tokens[0]
89-
90106
// Add a second ingester with normalised tokens.
91107
var lifecyclerConfig2 = testLifecyclerConfig(ringConfig, "ing2")
92108
lifecyclerConfig2.JoinAfter = 100 * time.Second
93-
lifecyclerConfig2.NormaliseTokens = true
94109

95110
l2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester", IngesterRingKey)
96111
require.NoError(t, err)
97112
l2.Start()
98113

99-
// This will block until l1 has successfully left the ring.
100-
ft.lifecycler = l2 // When l1 shutsdown, call l2.ClaimTokensFor("ing1")
101-
l1.Shutdown()
114+
// Since there is nothing that would make l2 to claim tokens from l1 (normally done on transfer)
115+
// we do it manually.
116+
require.NoError(t, l2.ClaimTokensFor(context.Background(), "ing1"))
117+
require.NoError(t, l2.ChangeState(context.Background(), ACTIVE))
102118

103119
// Check the new ingester joined, has the same token, and is active.
104120
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
105121
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
106122
require.NoError(t, err)
123+
124+
if desc, ok := d.(*Desc); ok {
125+
// lifecycler for ingester 1 isn't running, so we need to delete it manually
126+
// (to make checkNormalised happy)
127+
delete(desc.Ingesters, lifecyclerConfig1.ID)
128+
}
107129
return checkNormalised(d, "ing2") &&
108130
d.(*Desc).Ingesters["ing2"].Tokens[0] == token
109131
})
@@ -227,7 +249,6 @@ func TestRingRestart(t *testing.T) {
227249

228250
// Add an 'ingester' with normalised tokens.
229251
lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1")
230-
lifecyclerConfig1.NormaliseTokens = true
231252
l1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey)
232253
require.NoError(t, err)
233254
l1.Start()
@@ -344,7 +365,6 @@ func TestTokensOnDisk(t *testing.T) {
344365
lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
345366
lifecyclerConfig.NumTokens = 512
346367
lifecyclerConfig.TokensFilePath = tokenDir + "/tokens"
347-
lifecyclerConfig.NormaliseTokens = true
348368

349369
// Start first ingester.
350370
l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey)

pkg/ring/model.go

Lines changed: 34 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@ func NewDesc() *Desc {
3535
}
3636
}
3737

38-
// AddIngester adds the given ingester to the ring.
39-
func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState, normaliseTokens bool) {
38+
// AddIngester adds the given ingester to the ring. Ingester will only use supplied tokens,
39+
// any other tokens are removed.
40+
func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState) {
4041
if d.Ingesters == nil {
4142
d.Ingesters = map[string]IngesterDesc{}
4243
}
@@ -45,18 +46,18 @@ func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState
4546
Addr: addr,
4647
Timestamp: time.Now().Unix(),
4748
State: state,
49+
Tokens: tokens,
4850
}
4951

50-
if normaliseTokens {
51-
ingester.Tokens = tokens
52-
} else {
53-
for _, token := range tokens {
54-
d.Tokens = append(d.Tokens, TokenDesc{
55-
Token: token,
56-
Ingester: id,
57-
})
52+
// Since this ingester is only using normalised tokens, let's delete any denormalised
53+
// tokens for this ingester. There may be such tokens eg. if previous instance
54+
// of the same ingester was running with denormalized tokens.
55+
for ix := 0; ix < len(d.Tokens); {
56+
if d.Tokens[ix].Ingester == id {
57+
d.Tokens = append(d.Tokens[:ix], d.Tokens[ix+1:]...)
58+
} else {
59+
ix++
5860
}
59-
sort.Sort(ByToken(d.Tokens))
6061
}
6162

6263
d.Ingesters[id] = ingester
@@ -79,58 +80,33 @@ func (d *Desc) RemoveIngester(id string) {
7980
// This method assumes that Ring is in the correct state, 'from' ingester has no tokens anywhere,
8081
// and 'to' ingester uses either normalised or non-normalised tokens, but not both. Tokens list must
8182
// be sorted properly. If all of this is true, everything will be fine.
82-
func (d *Desc) ClaimTokens(from, to string, normaliseTokens bool) Tokens {
83+
func (d *Desc) ClaimTokens(from, to string) Tokens {
8384
var result Tokens
8485

85-
if normaliseTokens {
86-
87-
// If the ingester we are claiming from is normalising, get its tokens then erase them from the ring.
88-
if fromDesc, found := d.Ingesters[from]; found {
89-
result = fromDesc.Tokens
90-
fromDesc.Tokens = nil
91-
d.Ingesters[from] = fromDesc
92-
}
93-
94-
// If we are storing the tokens in a normalise form, we need to deal with
95-
// the migration from denormalised by removing the tokens from the tokens
96-
// list.
97-
// When all ingesters are in normalised mode, d.Tokens is empty here
98-
for i := 0; i < len(d.Tokens); {
99-
if d.Tokens[i].Ingester == from {
100-
result = append(result, d.Tokens[i].Token)
101-
d.Tokens = append(d.Tokens[:i], d.Tokens[i+1:]...)
102-
continue
103-
}
104-
i++
105-
}
106-
107-
ing := d.Ingesters[to]
108-
ing.Tokens = result
109-
d.Ingesters[to] = ing
110-
111-
} else {
112-
// If source ingester is normalising, copy its tokens to d.Tokens, and set new owner
113-
if fromDesc, found := d.Ingesters[from]; found {
114-
result = fromDesc.Tokens
115-
fromDesc.Tokens = nil
116-
d.Ingesters[from] = fromDesc
117-
118-
for _, t := range result {
119-
d.Tokens = append(d.Tokens, TokenDesc{Ingester: to, Token: t})
120-
}
121-
122-
sort.Sort(ByToken(d.Tokens))
123-
}
124-
125-
// if source was normalising, this should not find new tokens
126-
for i := 0; i < len(d.Tokens); i++ {
127-
if d.Tokens[i].Ingester == from {
128-
d.Tokens[i].Ingester = to
129-
result = append(result, d.Tokens[i].Token)
130-
}
86+
// If the ingester we are claiming from is normalising, get its tokens then erase them from the ring.
87+
if fromDesc, found := d.Ingesters[from]; found {
88+
result = fromDesc.Tokens
89+
fromDesc.Tokens = nil
90+
d.Ingesters[from] = fromDesc
91+
}
92+
93+
// If we are storing the tokens in a normalise form, we need to deal with
94+
// the migration from denormalised by removing the tokens from the tokens
95+
// list.
96+
// When all ingesters are in normalised mode, d.Tokens is empty here
97+
for i := 0; i < len(d.Tokens); {
98+
if d.Tokens[i].Ingester == from {
99+
result = append(result, d.Tokens[i].Token)
100+
d.Tokens = append(d.Tokens[:i], d.Tokens[i+1:]...)
101+
continue
131102
}
103+
i++
132104
}
133105

106+
ing := d.Ingesters[to]
107+
ing.Tokens = result
108+
d.Ingesters[to] = ing
109+
134110
// not necessary, but makes testing simpler
135111
if len(d.Tokens) == 0 {
136112
d.Tokens = nil

pkg/ring/model_test.go

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -94,48 +94,18 @@ func normalizedOutput() *Desc {
9494
}
9595
}
9696

97-
func unnormalizedOutput() *Desc {
98-
return &Desc{
99-
Ingesters: map[string]IngesterDesc{
100-
"first": {},
101-
"second": {},
102-
},
103-
Tokens: []TokenDesc{
104-
{Token: 100, Ingester: "second"},
105-
{Token: 200, Ingester: "second"},
106-
{Token: 300, Ingester: "second"},
107-
},
108-
}
109-
}
110-
11197
func TestClaimTokensFromNormalizedToNormalized(t *testing.T) {
11298
r := normalizedSource()
113-
result := r.ClaimTokens("first", "second", true)
99+
result := r.ClaimTokens("first", "second")
114100

115101
assert.Equal(t, Tokens{100, 200, 300}, result)
116102
assert.Equal(t, normalizedOutput(), r)
117103
}
118104

119-
func TestClaimTokensFromNormalizedToUnnormalized(t *testing.T) {
120-
r := normalizedSource()
121-
result := r.ClaimTokens("first", "second", false)
122-
123-
assert.Equal(t, Tokens{100, 200, 300}, result)
124-
assert.Equal(t, unnormalizedOutput(), r)
125-
}
126-
127-
func TestClaimTokensFromUnnormalizedToUnnormalized(t *testing.T) {
128-
r := unnormalizedSource()
129-
result := r.ClaimTokens("first", "second", false)
130-
131-
assert.Equal(t, Tokens{100, 200, 300}, result)
132-
assert.Equal(t, unnormalizedOutput(), r)
133-
}
134-
135105
func TestClaimTokensFromUnnormalizedToNormalized(t *testing.T) {
136106
r := unnormalizedSource()
137107

138-
result := r.ClaimTokens("first", "second", true)
108+
result := r.ClaimTokens("first", "second")
139109

140110
assert.Equal(t, Tokens{100, 200, 300}, result)
141111
assert.Equal(t, normalizedOutput(), r)

0 commit comments

Comments
 (0)