Skip to content

Flush tokens to disk #1750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 45 additions & 13 deletions pkg/ring/lifecycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type LifecyclerConfig struct {
NormaliseTokens bool `yaml:"normalise_tokens,omitempty"`
InfNames []string `yaml:"interface_names"`
FinalSleep time.Duration `yaml:"final_sleep"`
TokensFilePath string `yaml:"tokens_file_path,omitempty"`

// For testing, you can override the address and ID of this ingester
Addr string `yaml:"address"`
Expand Down Expand Up @@ -84,6 +85,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag
flagext.DeprecatedFlag(f, prefix+"claim-on-rollout", "DEPRECATED. This feature is no longer optional.")
f.BoolVar(&cfg.NormaliseTokens, prefix+"normalise-tokens", false, "Store tokens in a normalised fashion to reduce allocations.")
f.DurationVar(&cfg.FinalSleep, prefix+"final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.")
f.StringVar(&cfg.TokensFilePath, prefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.")

hostname, err := os.Hostname()
if err != nil {
Expand Down Expand Up @@ -125,7 +127,7 @@ type Lifecycler struct {
// back empty. And it changes during lifecycle of ingester.
stateMtx sync.Mutex
state IngesterState
tokens []uint32
tokens Tokens

// Controls the ready-reporting
readyLock sync.Mutex
Expand Down Expand Up @@ -246,18 +248,24 @@ func (i *Lifecycler) ChangeState(ctx context.Context, state IngesterState) error
return <-err
}

func (i *Lifecycler) getTokens() []uint32 {
func (i *Lifecycler) getTokens() Tokens {
i.stateMtx.Lock()
defer i.stateMtx.Unlock()
return i.tokens
}

func (i *Lifecycler) setTokens(tokens []uint32) {
func (i *Lifecycler) setTokens(tokens Tokens) {
tokensOwned.WithLabelValues(i.RingName).Set(float64(len(tokens)))

i.stateMtx.Lock()
defer i.stateMtx.Unlock()

i.tokens = tokens
if i.cfg.TokensFilePath != "" {
if err := i.tokens.StoreToFile(i.cfg.TokensFilePath); err != nil {
level.Error(util.Logger).Log("msg", "error storing tokens to disk", "path", i.cfg.TokensFilePath, "err", err)
}
}
}

// ClaimTokensFor takes all the tokens for the supplied ingester and assigns them to this ingester.
Expand All @@ -270,7 +278,7 @@ func (i *Lifecycler) ClaimTokensFor(ctx context.Context, ingesterID string) erro
err := make(chan error)

i.actorChan <- func() {
var tokens []uint32
var tokens Tokens

claimTokens := func(in interface{}) (out interface{}, retry bool, err error) {
ringDesc, ok := in.(*Desc)
Expand Down Expand Up @@ -445,9 +453,22 @@ heartbeatLoop:
// - add an ingester entry to the ring
// - copies out our state and tokens if they exist
func (i *Lifecycler) initRing(ctx context.Context) error {
var ringDesc *Desc
var (
ringDesc *Desc
tokensFromFile Tokens
err error
)

if i.cfg.TokensFilePath != "" {
tokensFromFile, err = LoadTokensFromFile(i.cfg.TokensFilePath)
if err != nil {
level.Error(util.Logger).Log("msg", "error in getting tokens from file", "err", err)
}
} else {
level.Info(util.Logger).Log("msg", "not loading tokens from file, tokens file path is empty")
}

err := i.KVStore.CAS(ctx, ConsulKey, func(in interface{}) (out interface{}, retry bool, err error) {
err = i.KVStore.CAS(ctx, ConsulKey, func(in interface{}) (out interface{}, retry bool, err error) {
if in == nil {
ringDesc = NewDesc()
} else {
Expand All @@ -456,6 +477,17 @@ func (i *Lifecycler) initRing(ctx context.Context) error {

ingesterDesc, ok := ringDesc.Ingesters[i.ID]
if !ok {
// We use the tokens from the file only if it does not exist in the ring yet.
if len(tokensFromFile) > 0 {
level.Info(util.Logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile))
if len(tokensFromFile) >= i.cfg.NumTokens {
i.setState(ACTIVE)
}
ringDesc.AddIngester(i.ID, i.Addr, tokensFromFile, i.GetState(), i.cfg.NormaliseTokens)
i.setTokens(tokensFromFile)
return ringDesc, true, nil
}

// Either we are a new ingester, or consul must have restarted
level.Info(util.Logger).Log("msg", "entry not found in ring, adding with no tokens")
ringDesc.AddIngester(i.ID, i.Addr, []uint32{}, i.GetState(), i.cfg.NormaliseTokens)
Expand Down Expand Up @@ -505,7 +537,7 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
newTokens := GenerateTokens(needTokens, takenTokens)

ringTokens = append(ringTokens, newTokens...)
sort.Sort(sortableUint32(ringTokens))
sort.Sort(ringTokens)

ringDesc.AddIngester(i.ID, i.Addr, ringTokens, i.GetState(), i.cfg.NormaliseTokens)

Expand All @@ -527,11 +559,11 @@ func (i *Lifecycler) verifyTokens(ctx context.Context) bool {
return result
}

func (i *Lifecycler) compareTokens(fromRing []uint32) bool {
sort.Sort(sortableUint32(fromRing))
func (i *Lifecycler) compareTokens(fromRing Tokens) bool {
sort.Sort(fromRing)

tokens := i.getTokens()
sort.Sort(sortableUint32(tokens))
sort.Sort(tokens)

if len(tokens) != len(fromRing) {
return false
Expand Down Expand Up @@ -566,9 +598,9 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState IngesterState) er
i.setState(targetState)
ringDesc.AddIngester(i.ID, i.Addr, newTokens, i.GetState(), i.cfg.NormaliseTokens)

tokens := append(myTokens, newTokens...)
sort.Sort(sortableUint32(tokens))
i.setTokens(tokens)
myTokens = append(myTokens, newTokens...)
sort.Sort(myTokens)
i.setTokens(myTokens)

return ringDesc, true, nil
})
Expand Down
86 changes: 85 additions & 1 deletion pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package ring

import (
"context"
"io/ioutil"
"os"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -252,11 +255,92 @@ func TestCheckReady(t *testing.T) {
cfg := testLifecyclerConfig(ringConfig, "ring1")
cfg.MinReadyDuration = 1 * time.Nanosecond
l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester")
l1.setTokens([]uint32{1})
l1.setTokens(Tokens([]uint32{1}))
l1.Start()
require.NoError(t, err)

// Delete the ring key before checking ready
err = l1.CheckReady(context.Background())
require.Error(t, err)
}

type noopFlushTransferer struct {
lifecycler *Lifecycler
}

func (f *noopFlushTransferer) StopIncomingRequests() {}
func (f *noopFlushTransferer) Flush() {}
func (f *noopFlushTransferer) TransferOut(ctx context.Context) error { return nil }

func TestTokensOnDisk(t *testing.T) {
var ringConfig Config
flagext.DefaultValues(&ringConfig)
ringConfig.KVStore.Mock = consul.NewInMemoryClient(GetCodec())

r, err := New(ringConfig, "ingester")
require.NoError(t, err)
defer r.Stop()

tokenDir, err := ioutil.TempDir(os.TempDir(), "tokens_on_disk")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(tokenDir))
}()

lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1")
lifecyclerConfig.NumTokens = 512
lifecyclerConfig.TokensFilePath = tokenDir + "/tokens"
lifecyclerConfig.NormaliseTokens = true

// Start first ingester.
l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester")
require.NoError(t, err)
l1.Start()
// Check this ingester joined, is active, and has 512 token.
var expTokens []uint32
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ConsulKey)
require.NoError(t, err)
desc, ok := d.(*Desc)
if ok {
expTokens = desc.Ingesters["ing1"].Tokens
}
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing1"].State == ACTIVE &&
len(desc.Ingesters["ing1"].Tokens) == 512 &&
len(desc.Tokens) == 0
})

l1.Shutdown()

// Start new ingester at same token directory.
lifecyclerConfig.ID = "ing2"
l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester")
require.NoError(t, err)
l2.Start()
defer l2.Shutdown()

// Check this ingester joined, is active, and has 512 token.
var actTokens []uint32
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ConsulKey)
require.NoError(t, err)
desc, ok := d.(*Desc)
if ok {
actTokens = desc.Ingesters["ing2"].Tokens
}
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing2"].State == ACTIVE &&
len(desc.Ingesters["ing2"].Tokens) == 512 &&
len(desc.Tokens) == 0
})

// Check for same tokens.
sort.Slice(expTokens, func(i, j int) bool { return expTokens[i] < expTokens[j] })
sort.Slice(actTokens, func(i, j int) bool { return actTokens[i] < actTokens[j] })
for i := 0; i < 512; i++ {
require.Equal(t, expTokens, actTokens)
}
}
14 changes: 7 additions & 7 deletions pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func (d *Desc) RemoveIngester(id string) {
// This method assumes that Ring is in the correct state, 'from' ingester has no tokens anywhere,
// and 'to' ingester uses either normalised or non-normalised tokens, but not both. Tokens list must
// be sorted properly. If all of this is true, everything will be fine.
func (d *Desc) ClaimTokens(from, to string, normaliseTokens bool) []uint32 {
var result []uint32
func (d *Desc) ClaimTokens(from, to string, normaliseTokens bool) Tokens {
var result Tokens

if normaliseTokens {

Expand Down Expand Up @@ -169,8 +169,8 @@ func (d *Desc) Ready(now time.Time, heartbeatTimeout time.Duration) error {
}

// TokensFor partitions the tokens into those for the given ID, and those for others.
func (d *Desc) TokensFor(id string) (tokens, other []uint32) {
var takenTokens, myTokens []uint32
func (d *Desc) TokensFor(id string) (tokens, other Tokens) {
takenTokens, myTokens := Tokens{}, Tokens{}
for _, token := range migrateRing(d) {
takenTokens = append(takenTokens, token.Token)
if token.Ingester == id {
Expand Down Expand Up @@ -319,8 +319,8 @@ func buildNormalizedIngestersMap(inputRing *Desc) map[string]IngesterDesc {
continue
}

if !sort.IsSorted(sortableUint32(ing.Tokens)) {
sort.Sort(sortableUint32(ing.Tokens))
if !sort.IsSorted(Tokens(ing.Tokens)) {
sort.Sort(Tokens(ing.Tokens))
}

seen := make(map[uint32]bool)
Expand Down Expand Up @@ -407,7 +407,7 @@ func resolveConflicts(normalizedIngesters map[string]IngesterDesc) {
}
}

sort.Sort(sortableUint32(tokens))
sort.Sort(Tokens(tokens))

// let's store the resolved result back
newTokenLists := map[string][]uint32{}
Expand Down
8 changes: 4 additions & 4 deletions pkg/ring/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,23 @@ func TestClaimTokensFromNormalizedToNormalized(t *testing.T) {
r := normalizedSource()
result := r.ClaimTokens("first", "second", true)

assert.Equal(t, []uint32{100, 200, 300}, result)
assert.Equal(t, Tokens{100, 200, 300}, result)
assert.Equal(t, normalizedOutput(), r)
}

func TestClaimTokensFromNormalizedToUnnormalized(t *testing.T) {
r := normalizedSource()
result := r.ClaimTokens("first", "second", false)

assert.Equal(t, []uint32{100, 200, 300}, result)
assert.Equal(t, Tokens{100, 200, 300}, result)
assert.Equal(t, unnormalizedOutput(), r)
}

func TestClaimTokensFromUnnormalizedToUnnormalized(t *testing.T) {
r := unnormalizedSource()
result := r.ClaimTokens("first", "second", false)

assert.Equal(t, []uint32{100, 200, 300}, result)
assert.Equal(t, Tokens{100, 200, 300}, result)
assert.Equal(t, unnormalizedOutput(), r)
}

Expand All @@ -129,7 +129,7 @@ func TestClaimTokensFromUnnormalizedToNormalized(t *testing.T) {

result := r.ClaimTokens("first", "second", true)

assert.Equal(t, []uint32{100, 200, 300}, result)
assert.Equal(t, Tokens{100, 200, 300}, result)
assert.Equal(t, normalizedOutput(), r)
}

Expand Down
Loading