Skip to content

Commit 31065a7

Browse files
authored
disallow instance with older timestamp to update instance with newer … (#5480)
* disallow instance with older timestamp to update instance with newer timestamp Signed-off-by: Wen Xu <[email protected]> * add statement in changelog Signed-off-by: Wen Xu <[email protected]> * just update after resolve conflict in the token, no need to do checks again Signed-off-by: Wen Xu <[email protected]> * fix lint Signed-off-by: Wen Xu <[email protected]> * only try to resolve conflict tokens when there is update Signed-off-by: Wen Xu <[email protected]> --------- Signed-off-by: Wen Xu <[email protected]>
1 parent 18e6e4d commit 31065a7

File tree

4 files changed

+45
-32
lines changed

4 files changed

+45
-32
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858
* [BUGFIX] Query Frontend: Fix bug of failing to cancel downstream request context in query frontend v2 mode (query scheduler enabled). #5447
5959
* [BUGFIX] Alertmanager: Remove the user id from state replication key metric label value. #5453
6060
* [BUGFIX] Compactor: Avoid cleaner concurrency issues checking global markers before all blocks. #5457
61-
61+
* [BUGFIX] DDBKV: Disallow instance with older timestamp to update instance with newer timestamp. #5480
6262
## 1.15.1 2023-04-26
6363

6464
* [CHANGE] Alertmanager: Validating new fields on the PagerDuty AM config. #5290

pkg/ring/basic_lifecycler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,9 @@ func (l *BasicLifecycler) registerInstance(ctx context.Context) error {
265265
var exists bool
266266
instanceDesc, exists = ringDesc.Ingesters[l.cfg.ID]
267267
if exists {
268-
level.Info(l.logger).Log("msg", "instance found in the ring", "instance", l.cfg.ID, "ring", l.ringName, "state", instanceDesc.GetState(), "tokens", len(instanceDesc.GetTokens()), "registered_at", instanceDesc.GetRegisteredAt().String())
268+
level.Info(l.logger).Log("msg", "instance found in the ring", "instance", l.cfg.ID, "ip", l.cfg.Addr, "ring", l.ringName, "state", instanceDesc.GetState(), "tokens", len(instanceDesc.GetTokens()), "registered_at", instanceDesc.GetRegisteredAt().String())
269269
} else {
270-
level.Info(l.logger).Log("msg", "instance not found in the ring", "instance", l.cfg.ID, "ring", l.ringName)
270+
level.Info(l.logger).Log("msg", "instance not found in the ring", "instance", l.cfg.ID, "ip", l.cfg.Addr, "ring", l.ringName)
271271
}
272272

273273
// We call the delegate to get the desired state right after the initialization.

pkg/ring/model.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -748,10 +748,18 @@ func (d *Desc) FindDifference(o codec.MultiKey) (interface{}, []string, error) {
748748
if !ok {
749749
toDelete = append(toDelete, name)
750750
} else if !ing.Equal(oing) {
751-
if !tokensEqual(ing.Tokens, oing.Tokens) {
752-
tokensChanged = true
751+
if oing.Timestamp > ing.Timestamp {
752+
toUpdated.Ingesters[name] = oing
753+
if !tokensEqual(ing.Tokens, oing.Tokens) {
754+
tokensChanged = true
755+
}
756+
} else if oing.Timestamp == ing.Timestamp && ing.State != LEFT && oing.State == LEFT {
757+
// we accept LEFT even if timestamp hasn't changed
758+
toUpdated.Ingesters[name] = oing
759+
if !tokensEqual(ing.Tokens, oing.Tokens) {
760+
tokensChanged = true
761+
}
753762
}
754-
toUpdated.Ingesters[name] = oing
755763
}
756764
}
757765

@@ -760,11 +768,10 @@ func (d *Desc) FindDifference(o codec.MultiKey) (interface{}, []string, error) {
760768
resolveConflicts(out.Ingesters)
761769

762770
//Recheck if any instance was updated by the resolveConflict
763-
for name, oing := range out.Ingesters {
764-
ing, ok := d.Ingesters[name]
765-
if !ok || !ing.Equal(oing) {
766-
toUpdated.Ingesters[name] = oing
767-
}
771+
//All ingesters in toUpdated have already passed the timestamp check, so we can skip checking again
772+
for name := range toUpdated.Ingesters {
773+
//name must appear in out Ingesters, so we can skip the contains key check
774+
toUpdated.Ingesters[name] = out.Ingesters[name]
768775
}
769776
}
770777

pkg/ring/model_test.go

Lines changed: 27 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -630,27 +630,27 @@ func TestDesc_FindDifference(t *testing.T) {
630630
toDelete: []string{},
631631
},
632632
"same single instance, different state": {
633-
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE}}},
634-
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: JOINING}}},
635-
toUpdate: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: JOINING}}},
633+
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE, Timestamp: 1000}}},
634+
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: JOINING, Timestamp: 1100}}},
635+
toUpdate: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: JOINING, Timestamp: 1100}}},
636636
toDelete: []string{},
637637
},
638638
"same single instance, different registered timestamp": {
639-
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE, RegisteredTimestamp: 1}}},
640-
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE, RegisteredTimestamp: 2}}},
641-
toUpdate: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE, RegisteredTimestamp: 2}}},
639+
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE, RegisteredTimestamp: 1, Timestamp: 1000}}},
640+
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE, RegisteredTimestamp: 2, Timestamp: 1100}}},
641+
toUpdate: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", State: ACTIVE, RegisteredTimestamp: 2, Timestamp: 1100}}},
642642
toDelete: []string{},
643643
},
644644
"instance in different zone": {
645-
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "one"}}},
646-
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "two"}}},
647-
toUpdate: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "two"}}},
645+
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "one", Timestamp: 1000}}},
646+
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "two", Timestamp: 1100}}},
647+
toUpdate: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Zone: "two", Timestamp: 1100}}},
648648
toDelete: []string{},
649649
},
650650
"same instance, different address": {
651-
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1"}}},
652-
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr2"}}},
653-
toUpdate: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr2"}}},
651+
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Timestamp: 1000}}},
652+
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr2", Timestamp: 1100}}},
653+
toUpdate: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr2", Timestamp: 1100}}},
654654
toDelete: []string{},
655655
},
656656
"more instances in one ring": {
@@ -666,15 +666,15 @@ func TestDesc_FindDifference(t *testing.T) {
666666
toDelete: []string{},
667667
},
668668
"different tokens": {
669-
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Tokens: []uint32{1, 2, 3}}}},
670-
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1"}}},
671-
toUpdate: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1"}}},
669+
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Tokens: []uint32{1, 2, 3}, Timestamp: 1000}}},
670+
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Timestamp: 1100}}},
671+
toUpdate: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Timestamp: 1100}}},
672672
toDelete: []string{},
673673
},
674674
"different tokens 2": {
675-
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Tokens: []uint32{1, 2, 3}}}},
676-
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Tokens: []uint32{1, 2, 4}}}},
677-
toUpdate: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Tokens: []uint32{1, 2, 4}}}},
675+
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Tokens: []uint32{1, 2, 3}, Timestamp: 1000}}},
676+
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Tokens: []uint32{1, 2, 4}, Timestamp: 1100}}},
677+
toUpdate: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr1", Tokens: []uint32{1, 2, 4}, Timestamp: 1100}}},
678678
toDelete: []string{},
679679
},
680680
"different instances, conflictTokens new lose": {
@@ -684,9 +684,9 @@ func TestDesc_FindDifference(t *testing.T) {
684684
toDelete: []string{},
685685
},
686686
"different instances, conflictTokens new win": {
687-
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing5": {Addr: "addr1", Tokens: []uint32{1, 2, 3}}}},
688-
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing5": {Addr: "addr1", Tokens: []uint32{1, 2, 3}}, "ing2": {Addr: "addr1", Tokens: []uint32{1, 2, 4}}}},
689-
toUpdate: &Desc{Ingesters: map[string]InstanceDesc{"ing2": {Addr: "addr1", Tokens: []uint32{1, 2, 4}}, "ing5": {Addr: "addr1", Tokens: []uint32{3}}}},
687+
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing5": {Addr: "addr1", Tokens: []uint32{1, 2, 3}, Timestamp: 1000}}},
688+
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing5": {Addr: "addr1", Tokens: []uint32{1, 2, 3}, Timestamp: 1100}, "ing2": {Addr: "addr1", Tokens: []uint32{1, 2, 4}, Timestamp: 1100}}},
689+
toUpdate: &Desc{Ingesters: map[string]InstanceDesc{"ing2": {Addr: "addr1", Tokens: []uint32{1, 2, 4}, Timestamp: 1100}, "ing5": {Addr: "addr1", Tokens: []uint32{3}, Timestamp: 1100}}},
690690
toDelete: []string{},
691691
},
692692
"same number of instances, using different IDs": {
@@ -695,6 +695,12 @@ func TestDesc_FindDifference(t *testing.T) {
695695
toUpdate: &Desc{Ingesters: map[string]InstanceDesc{"ing2": {Addr: "addr1", Tokens: []uint32{1, 2, 3}}}},
696696
toDelete: []string{"ing1"},
697697
},
698+
"old instance desc should not update newer instance desc": {
699+
r1: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr_new", Tokens: []uint32{1, 2, 3}, Timestamp: int64(1000)}}},
700+
r2: &Desc{Ingesters: map[string]InstanceDesc{"ing1": {Addr: "addr_old", Tokens: []uint32{1, 2, 3}, Timestamp: int64(900)}}},
701+
toUpdate: NewDesc(),
702+
toDelete: []string{},
703+
},
698704
}
699705

700706
for testName, testData := range tests {

0 commit comments

Comments
 (0)