Skip to content

Removed support for writing denormalised tokens in the ring. #1809

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* `ruler.poll-interval` has been added to specify the interval in which to poll new rule groups.
* [CHANGE] Use relative links from /ring page to make it work when used behind reverse proxy. #1896
* [CHANGE] Deprecated `-distributor.limiter-reload-period` flag. #1766
* [CHANGE] Ingesters now write only normalised tokens to the ring, although they can still read denormalised tokens used by other ingesters. `-ingester.normalise-tokens` is now deprecated, and ignored. If you want to switch back to using denormalised tokens, you need to downgrade to Cortex 0.4.0. Previous versions don't handle claiming tokens from normalised ingesters correctly. #1809
* [FEATURE] The distributor can now drop labels from samples (similar to the removal of the replica label for HA ingestion) per user via the `distributor.drop-label` flag. #1726
* [FEATURE] Added `global` ingestion rate limiter strategy. Deprecated `-distributor.limiter-reload-period` flag. #1766
* [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861
Expand Down
6 changes: 4 additions & 2 deletions docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,11 @@ It also talks to a KVStore and has it's own copies of the same flags used by the

- `-ingester.normalise-tokens`

Write out "normalised" tokens to the ring. Normalised tokens consume less memory to encode and decode; as the ring is unmarshalled regularly, this significantly reduces memory usage of anything that watches the ring.
Deprecated. New ingesters always write "normalised" tokens to the ring. Normalised tokens consume less memory to encode and decode; as the ring is unmarshalled regularly, this significantly reduces memory usage of anything that watches the ring.

Before enabling, rollout a version of Cortex that supports normalised token for all jobs that interact with the ring, then rollout with this flag set to `true` on the ingesters. The new ring code can still read and write the old ring format, so is backwards compatible.
Cortex 0.4.0 is the last version that can *write* denormalised tokens. Cortex 0.5.0 and later will always *write* normalised tokens, although it can still *read* denormalised tokens written by older ingesters.

It's perfectly OK to have a mix of ingesters running denormalised (<= 0.4.0) and normalised tokens (either by using `-ingester.normalise-tokens` in Cortex <= 0.4.0, or Cortex 0.5.0+) during upgrades.

- `-ingester.chunk-encoding`

Expand Down
21 changes: 12 additions & 9 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ type LifecyclerConfig struct {
ObservePeriod time.Duration `yaml:"observe_period,omitempty"`
JoinAfter time.Duration `yaml:"join_after,omitempty"`
MinReadyDuration time.Duration `yaml:"min_ready_duration,omitempty"`
UnusedFlag bool `yaml:"claim_on_rollout,omitempty"` // DEPRECATED - left for backwards-compatibility
NormaliseTokens bool `yaml:"normalise_tokens,omitempty"`
InfNames []string `yaml:"interface_names"`
FinalSleep time.Duration `yaml:"final_sleep"`
TokensFilePath string `yaml:"tokens_file_path,omitempty"`
Expand All @@ -60,6 +58,10 @@ type LifecyclerConfig struct {
Port int
ID string
SkipUnregister bool

// graveyard for unused flags.
UnusedFlag bool `yaml:"claim_on_rollout,omitempty"` // DEPRECATED - left for backwards-compatibility
UnusedFlag2 bool `yaml:"normalise_tokens,omitempty"` // DEPRECATED - left for backwards-compatibility
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -83,7 +85,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
f.DurationVar(&cfg.ObservePeriod, prefix+"observe-period", 0*time.Second, "Observe tokens after generating to resolve collisions. Useful when using gossiping ring.")
f.DurationVar(&cfg.MinReadyDuration, prefix+"min-ready-duration", 1*time.Minute, "Minimum duration to wait before becoming ready. This is to work around race conditions with ingesters exiting and updating the ring.")
flagext.DeprecatedFlag(f, prefix+"claim-on-rollout", "DEPRECATED. This feature is no longer optional.")
f.BoolVar(&cfg.NormaliseTokens, prefix+"normalise-tokens", false, "Store tokens in a normalised fashion to reduce allocations.")
flagext.DeprecatedFlag(f, prefix+"normalise-tokens", "DEPRECATED. This feature is no longer optional.")
f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.")
f.StringVar(&cfg.TokensFilePath, prefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")

Expand Down Expand Up @@ -287,7 +289,7 @@ func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) erro
return nil, false, fmt.Errorf("Cannot claim tokens in an empty ring")
}

tokens = ringDesc.ClaimTokens(ingesterID, i.ID, i.cfg.NormaliseTokens)
tokens = ringDesc.ClaimTokens(ingesterID, i.ID)
// update timestamp to give gossiping client a chance register ring change.
ing := ringDesc.Ingesters[i.ID]
ing.Timestamp = time.Now().Unix()
Expand Down Expand Up @@ -484,14 +486,14 @@ func (i *Lifecycler) initRing(ctx context.Context) error {
if len(tokensFromFile) >= i.cfg.NumTokens {
i.setState(ACTIVE)
}
ringDesc.AddIngester(i.ID, i.Addr, tokensFromFile, i.GetState(), i.cfg.NormaliseTokens)
ringDesc.AddIngester(i.ID, i.Addr, tokensFromFile, i.GetState())
i.setTokens(tokensFromFile)
return ringDesc, true, nil
}

// Either we are a new ingester, or consul must have restarted
level.Info(util.Logger).Log("msg", "instance not found in ring, adding with no tokens", "ring", i.RingName)
ringDesc.AddIngester(i.ID, i.Addr, []uint32{}, i.GetState(), i.cfg.NormaliseTokens)
ringDesc.AddIngester(i.ID, i.Addr, []uint32{}, i.GetState())
return ringDesc, true, nil
}

Expand Down Expand Up @@ -540,7 +542,7 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
ringTokens = append(ringTokens, newTokens...)
sort.Sort(ringTokens)

ringDesc.AddIngester(i.ID, i.Addr, ringTokens, i.GetState(), i.cfg.NormaliseTokens)
ringDesc.AddIngester(i.ID, i.Addr, ringTokens, i.GetState())

i.setTokens(ringTokens)

Expand Down Expand Up @@ -597,12 +599,13 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState IngesterState) er

newTokens := GenerateTokens(i.cfg.NumTokens-len(myTokens), takenTokens)
i.setState(targetState)
ringDesc.AddIngester(i.ID, i.Addr, newTokens, i.GetState(), i.cfg.NormaliseTokens)

myTokens = append(myTokens, newTokens...)
sort.Sort(myTokens)
i.setTokens(myTokens)

ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState())

return ringDesc, true, nil
})

Expand Down Expand Up @@ -630,7 +633,7 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error {
if !ok {
// consul must have restarted
level.Info(util.Logger).Log("msg", "found empty ring, inserting tokens", "ring", i.RingName)
ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState(), i.cfg.NormaliseTokens)
ringDesc.AddIngester(i.ID, i.Addr, i.getTokens(), i.GetState())
} else {
ingesterDesc.Timestamp = time.Now().Unix()
ingesterDesc.State = i.GetState()
Expand Down
48 changes: 34 additions & 14 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig {
return lifecyclerConfig
}

func checkDenormalised(d interface{}, id string) bool {
func checkDenormalisedLeaving(d interface{}, id string) bool {
desc, ok := d.(*Desc)
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters[id].State == ACTIVE &&
desc.Ingesters[id].State == LEAVING &&
len(desc.Ingesters[id].Tokens) == 0 &&
len(desc.Tokens) == 1
}
Expand All @@ -73,37 +73,59 @@ func TestRingNormaliseMigration(t *testing.T) {
// Add an 'ingester' with denormalised tokens.
lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1")

ft := &flushTransferer{}
l1, err := NewLifecycler(lifecyclerConfig1, ft, "ingester", IngesterRingKey)
// Since code to insert ingester with denormalised tokens into ring was removed,
// instead of running lifecycler, we do it manually here.
token := uint32(0)
err = r.KVClient.CAS(context.Background(), IngesterRingKey, func(in interface{}) (out interface{}, retry bool, err error) {
require.Nil(t, in)
r := NewDesc()
tks := GenerateTokens(lifecyclerConfig1.NumTokens, nil)
r.Ingesters[lifecyclerConfig1.ID] = IngesterDesc{
Addr: lifecyclerConfig1.Addr,
Timestamp: time.Now().Unix(),
State: LEAVING, // expected by second ingester`
}
for _, t := range tks {
r.Tokens = append(r.Tokens, TokenDesc{
Token: t,
Ingester: lifecyclerConfig1.ID,
})
}
token = tks[0]
return r, true, nil
})
require.NoError(t, err)
l1.Start()

// Check this ingester joined, is active, and has one token.
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
require.NoError(t, err)
return checkDenormalised(d, "ing1")
return checkDenormalisedLeaving(d, "ing1")
})

token := l1.tokens[0]

// Add a second ingester with normalised tokens.
var lifecyclerConfig2 = testLifecyclerConfig(ringConfig, "ing2")
lifecyclerConfig2.JoinAfter = 100 * time.Second
lifecyclerConfig2.NormaliseTokens = true

l2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester", IngesterRingKey)
require.NoError(t, err)
l2.Start()

// This will block until l1 has successfully left the ring.
ft.lifecycler = l2 // When l1 shutsdown, call l2.ClaimTokensFor("ing1")
l1.Shutdown()
// Since there is nothing that would make l2 to claim tokens from l1 (normally done on transfer)
// we do it manually.
require.NoError(t, l2.ClaimTokensFor(context.Background(), "ing1"))
require.NoError(t, l2.ChangeState(context.Background(), ACTIVE))

// Check the new ingester joined, has the same token, and is active.
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), IngesterRingKey)
require.NoError(t, err)

if desc, ok := d.(*Desc); ok {
// lifecycler for ingester 1 isn't running, so we need to delete it manually
// (to make checkNormalised happy)
delete(desc.Ingesters, lifecyclerConfig1.ID)
}
return checkNormalised(d, "ing2") &&
d.(*Desc).Ingesters["ing2"].Tokens[0] == token
})
Expand Down Expand Up @@ -227,7 +249,6 @@ func TestRingRestart(t *testing.T) {

// Add an 'ingester' with normalised tokens.
lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1")
lifecyclerConfig1.NormaliseTokens = true
l1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey)
require.NoError(t, err)
l1.Start()
Expand Down Expand Up @@ -344,7 +365,6 @@ func TestTokensOnDisk(t *testing.T) {
lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
lifecyclerConfig.NumTokens = 512
lifecyclerConfig.TokensFilePath = tokenDir + "/tokens"
lifecyclerConfig.NormaliseTokens = true

// Start first ingester.
l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey)
Expand Down
92 changes: 34 additions & 58 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ func NewDesc() *Desc {
}
}

// AddIngester adds the given ingester to the ring.
func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState, normaliseTokens bool) {
// AddIngester adds the given ingester to the ring. Ingester will only use supplied tokens,
// any other tokens are removed.
func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState) {
if d.Ingesters == nil {
d.Ingesters = map[string]IngesterDesc{}
}
Expand All @@ -45,18 +46,18 @@ func (d *Desc) AddIngester(id, addr string, tokens []uint32, state IngesterState
Addr: addr,
Timestamp: time.Now().Unix(),
State: state,
Tokens: tokens,
}

if normaliseTokens {
ingester.Tokens = tokens
} else {
for _, token := range tokens {
d.Tokens = append(d.Tokens, TokenDesc{
Token: token,
Ingester: id,
})
// Since this ingester is only using normalised tokens, let's delete any denormalised
// tokens for this ingester. There may be such tokens eg. if previous instance
// of the same ingester was running with denormalized tokens.
for ix := 0; ix < len(d.Tokens); {
if d.Tokens[ix].Ingester == id {
d.Tokens = append(d.Tokens[:ix], d.Tokens[ix+1:]...)
} else {
ix++
}
sort.Sort(ByToken(d.Tokens))
}

d.Ingesters[id] = ingester
Expand All @@ -79,58 +80,33 @@ func (d *Desc) RemoveIngester(id string) {
// This method assumes that Ring is in the correct state, 'from' ingester has no tokens anywhere,
// and 'to' ingester uses either normalised or non-normalised tokens, but not both. Tokens list must
// be sorted properly. If all of this is true, everything will be fine.
func (d *Desc) ClaimTokens(from, to string, normaliseTokens bool) Tokens {
func (d *Desc) ClaimTokens(from, to string) Tokens {
var result Tokens

if normaliseTokens {

// If the ingester we are claiming from is normalising, get its tokens then erase them from the ring.
if fromDesc, found := d.Ingesters[from]; found {
result = fromDesc.Tokens
fromDesc.Tokens = nil
d.Ingesters[from] = fromDesc
}

// If we are storing the tokens in a normalise form, we need to deal with
// the migration from denormalised by removing the tokens from the tokens
// list.
// When all ingesters are in normalised mode, d.Tokens is empty here
for i := 0; i < len(d.Tokens); {
if d.Tokens[i].Ingester == from {
result = append(result, d.Tokens[i].Token)
d.Tokens = append(d.Tokens[:i], d.Tokens[i+1:]...)
continue
}
i++
}

ing := d.Ingesters[to]
ing.Tokens = result
d.Ingesters[to] = ing

} else {
// If source ingester is normalising, copy its tokens to d.Tokens, and set new owner
if fromDesc, found := d.Ingesters[from]; found {
result = fromDesc.Tokens
fromDesc.Tokens = nil
d.Ingesters[from] = fromDesc

for _, t := range result {
d.Tokens = append(d.Tokens, TokenDesc{Ingester: to, Token: t})
}

sort.Sort(ByToken(d.Tokens))
}

// if source was normalising, this should not find new tokens
for i := 0; i < len(d.Tokens); i++ {
if d.Tokens[i].Ingester == from {
d.Tokens[i].Ingester = to
result = append(result, d.Tokens[i].Token)
}
// If the ingester we are claiming from is normalising, get its tokens then erase them from the ring.
if fromDesc, found := d.Ingesters[from]; found {
result = fromDesc.Tokens
fromDesc.Tokens = nil
d.Ingesters[from] = fromDesc
}

// If we are storing the tokens in a normalise form, we need to deal with
// the migration from denormalised by removing the tokens from the tokens
// list.
// When all ingesters are in normalised mode, d.Tokens is empty here
for i := 0; i < len(d.Tokens); {
if d.Tokens[i].Ingester == from {
result = append(result, d.Tokens[i].Token)
d.Tokens = append(d.Tokens[:i], d.Tokens[i+1:]...)
continue
}
i++
}

ing := d.Ingesters[to]
ing.Tokens = result
d.Ingesters[to] = ing

// not necessary, but makes testing simpler
if len(d.Tokens) == 0 {
d.Tokens = nil
Expand Down
34 changes: 2 additions & 32 deletions pkg/ring/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,48 +94,18 @@ func normalizedOutput() *Desc {
}
}

func unnormalizedOutput() *Desc {
return &Desc{
Ingesters: map[string]IngesterDesc{
"first": {},
"second": {},
},
Tokens: []TokenDesc{
{Token: 100, Ingester: "second"},
{Token: 200, Ingester: "second"},
{Token: 300, Ingester: "second"},
},
}
}

func TestClaimTokensFromNormalizedToNormalized(t *testing.T) {
r := normalizedSource()
result := r.ClaimTokens("first", "second", true)
result := r.ClaimTokens("first", "second")

assert.Equal(t, Tokens{100, 200, 300}, result)
assert.Equal(t, normalizedOutput(), r)
}

func TestClaimTokensFromNormalizedToUnnormalized(t *testing.T) {
r := normalizedSource()
result := r.ClaimTokens("first", "second", false)

assert.Equal(t, Tokens{100, 200, 300}, result)
assert.Equal(t, unnormalizedOutput(), r)
}

func TestClaimTokensFromUnnormalizedToUnnormalized(t *testing.T) {
r := unnormalizedSource()
result := r.ClaimTokens("first", "second", false)

assert.Equal(t, Tokens{100, 200, 300}, result)
assert.Equal(t, unnormalizedOutput(), r)
}

func TestClaimTokensFromUnnormalizedToNormalized(t *testing.T) {
r := unnormalizedSource()

result := r.ClaimTokens("first", "second", true)
result := r.ClaimTokens("first", "second")

assert.Equal(t, Tokens{100, 200, 300}, result)
assert.Equal(t, normalizedOutput(), r)
Expand Down
Loading