Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 83 additions & 26 deletions pkg/ring/lifecycler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,37 @@ func (f *flushTransferer) TransferOut(ctx context.Context) error {
return f.lifecycler.ChangeState(ctx, ACTIVE)
}

func testLifecyclerConfig(ringConfig Config, id string) LifecyclerConfig {
var lifecyclerConfig LifecyclerConfig
flagext.DefaultValues(&lifecyclerConfig)
lifecyclerConfig.Addr = "0.0.0.0"
lifecyclerConfig.Port = 1
lifecyclerConfig.RingConfig = ringConfig
lifecyclerConfig.NumTokens = 1
lifecyclerConfig.ClaimOnRollout = true
lifecyclerConfig.ID = id
lifecyclerConfig.FinalSleep = 0
return lifecyclerConfig
}

func checkDenormalised(d interface{}, id string) bool {
desc, ok := d.(*Desc)
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters[id].State == ACTIVE &&
len(desc.Ingesters[id].Tokens) == 0 &&
len(desc.Tokens) == 1
}

func checkNormalised(d interface{}, id string) bool {
desc, ok := d.(*Desc)
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters[id].State == ACTIVE &&
len(desc.Ingesters[id].Tokens) == 1 &&
len(desc.Tokens) == 0
}

func TestRingNormaliseMigration(t *testing.T) {
var ringConfig Config
flagext.DefaultValues(&ringConfig)
Expand All @@ -35,15 +66,7 @@ func TestRingNormaliseMigration(t *testing.T) {
defer r.Stop()

// Add an 'ingester' with denormalised tokens.
var lifecyclerConfig1 LifecyclerConfig
flagext.DefaultValues(&lifecyclerConfig1)
lifecyclerConfig1.Addr = "0.0.0.0"
lifecyclerConfig1.Port = 1
lifecyclerConfig1.RingConfig = ringConfig
lifecyclerConfig1.NumTokens = 1
lifecyclerConfig1.ClaimOnRollout = true
lifecyclerConfig1.ID = "ing1"
lifecyclerConfig1.FinalSleep = 0
lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1")

ft := &flushTransferer{}
l1, err := NewLifecycler(lifecyclerConfig1, ft, "ingester")
Expand All @@ -53,23 +76,15 @@ func TestRingNormaliseMigration(t *testing.T) {
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ConsulKey)
require.NoError(t, err)

desc, ok := d.(*Desc)
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing1"].State == ACTIVE &&
len(desc.Ingesters["ing1"].Tokens) == 0 &&
len(desc.Tokens) == 1
return checkDenormalised(d, "ing1")
})

token := l1.tokens[0]

// Add a second ingester with normalised tokens.
var lifecyclerConfig2 = lifecyclerConfig1
var lifecyclerConfig2 = testLifecyclerConfig(ringConfig, "ing2")
lifecyclerConfig2.JoinAfter = 100 * time.Second
lifecyclerConfig2.NormaliseTokens = true
lifecyclerConfig2.ID = "ing2"
lifecyclerConfig1.FinalSleep = 0

l2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester")
require.NoError(t, err)
Expand All @@ -82,13 +97,55 @@ func TestRingNormaliseMigration(t *testing.T) {
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ConsulKey)
require.NoError(t, err)
return checkNormalised(d, "ing2") &&
d.(*Desc).Ingesters["ing2"].Tokens[0] == token
})
}

type nopFlushTransferer struct{}

func (f *nopFlushTransferer) StopIncomingRequests() {}
func (f *nopFlushTransferer) Flush() {}
func (f *nopFlushTransferer) TransferOut(ctx context.Context) error {
panic("should not be called")
}

func TestRingRestart(t *testing.T) {
var ringConfig Config
flagext.DefaultValues(&ringConfig)
codec := ProtoCodec{Factory: ProtoDescFactory}
ringConfig.KVStore.Mock = NewInMemoryKVClient(codec)

r, err := New(ringConfig, "ingester")
require.NoError(t, err)
defer r.Stop()

// Add an 'ingester' with normalised tokens.
lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1")
lifecyclerConfig1.NormaliseTokens = true
l1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester")
require.NoError(t, err)

// Check this ingester joined, is active, and has one token.
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ConsulKey)
require.NoError(t, err)
return checkNormalised(d, "ing1")
})

desc, ok := d.(*Desc)
return ok &&
len(desc.Ingesters) == 1 &&
desc.Ingesters["ing2"].State == ACTIVE &&
len(desc.Ingesters["ing2"].Tokens) == 1 &&
desc.Ingesters["ing2"].Tokens[0] == token &&
len(desc.Tokens) == 0
token := l1.tokens[0]

// Add a second ingester with the same settings, so it will think it has restarted
l2, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester")
require.NoError(t, err)

// Check the new ingester picked up the same token
test.Poll(t, 1000*time.Millisecond, true, func() interface{} {
d, err := r.KVClient.Get(context.Background(), ConsulKey)
require.NoError(t, err)
l2Tokens := l2.getTokens()
return checkNormalised(d, "ing1") &&
len(l2Tokens) == 1 &&
l2Tokens[0] == token
})
}
2 changes: 1 addition & 1 deletion pkg/ring/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (d *Desc) Ready(heartbeatTimeout time.Duration) error {
// TokensFor partitions the tokens into those for the given ID, and those for others.
func (d *Desc) TokensFor(id string) (tokens, other []uint32) {
var takenTokens, myTokens []uint32
for _, token := range d.Tokens {
for _, token := range migrateRing(d) {
takenTokens = append(takenTokens, token.Token)
if token.Ingester == id {
myTokens = append(myTokens, token.Token)
Expand Down
7 changes: 3 additions & 4 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (r *Ring) loop(ctx context.Context) {
}

ringDesc := value.(*Desc)
ringDesc = r.migrateRing(ringDesc)
ringDesc.Tokens = migrateRing(ringDesc)
r.mtx.Lock()
defer r.mtx.Unlock()
r.ringDesc = ringDesc
Expand All @@ -159,7 +159,7 @@ func (r *Ring) loop(ctx context.Context) {
}

// migrateRing will denormalise the ring's tokens if stored in normal form.
func (r *Ring) migrateRing(desc *Desc) *Desc {
func migrateRing(desc *Desc) []TokenDesc {
numTokens := len(desc.Tokens)
for _, ing := range desc.Ingesters {
numTokens += len(ing.Tokens)
Expand All @@ -175,8 +175,7 @@ func (r *Ring) migrateRing(desc *Desc) *Desc {
}
}
sort.Sort(ByToken(tokens))
desc.Tokens = tokens
return desc
return tokens
}

// Get returns n (or more) ingesters which form the replicas for the given key.
Expand Down