Skip to content

Commit c7bc4fb

Browse files
authored
Allow custom replication strategy in the Ring (#2443)
Signed-off-by: Marco Pracucci <[email protected]>
1 parent 6569fe7 commit c7bc4fb

File tree

4 files changed

+61
-37
lines changed

4 files changed

+61
-37
lines changed

pkg/ring/replication_strategy.go

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,45 @@ package ring
22

33
import (
44
"fmt"
5+
"time"
56
)
67

7-
// replicationStrategy decides, given the set of ingesters eligible for a key,
8+
type ReplicationStrategy interface {
9+
// Filter out unhealthy instances and checks if there're enough instances
10+
// for an operation to succeed. Returns an error if there are not enough
11+
// instances.
12+
Filter(instances []IngesterDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration) (healthy []IngesterDesc, maxFailures int, err error)
13+
14+
// ShouldExtendReplicaSet returns true if given an instance that's going to be
15+
// added to the replica set, the replica set size should be extended by 1
16+
// more instance for the given operation.
17+
ShouldExtendReplicaSet(instance IngesterDesc, op Operation) bool
18+
}
19+
20+
type DefaultReplicationStrategy struct{}
21+
22+
// Filter decides, given the set of ingesters eligible for a key,
823
// which ingesters you will try and write to and how many failures you will
924
// tolerate.
1025
// - Filters out dead ingesters so the one doesn't even try to write to them.
1126
// - Checks there is enough ingesters for an operation to succeed.
1227
// The ingesters argument may be overwritten.
13-
func (r *Ring) replicationStrategy(ingesters []IngesterDesc, op Operation) ([]IngesterDesc, int, error) {
28+
func (s *DefaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration) ([]IngesterDesc, int, error) {
1429
// We need a response from a quorum of ingesters, which is n/2 + 1. In the
1530
// case of a node joining/leaving, the actual replica set might be bigger
1631
// than the replication factor, so use the bigger or the two.
17-
replicationFactor := r.cfg.ReplicationFactor
1832
if len(ingesters) > replicationFactor {
1933
replicationFactor = len(ingesters)
2034
}
35+
2136
minSuccess := (replicationFactor / 2) + 1
2237
maxFailure := replicationFactor - minSuccess
2338

2439
// Skip those that have not heartbeated in a while. NB these are still
2540
// included in the calculation of minSuccess, so if too many failed ingesters
2641
// will cause the whole write to fail.
2742
for i := 0; i < len(ingesters); {
28-
if r.IsHealthy(&ingesters[i], op) {
43+
if ingesters[i].IsHealthy(op, heartbeatTimeout) {
2944
i++
3045
} else {
3146
ingesters = append(ingesters[:i], ingesters[i+1:]...)
@@ -44,6 +59,22 @@ func (r *Ring) replicationStrategy(ingesters []IngesterDesc, op Operation) ([]In
4459
return ingesters, maxFailure, nil
4560
}
4661

62+
func (s *DefaultReplicationStrategy) ShouldExtendReplicaSet(ingester IngesterDesc, op Operation) bool {
63+
// We do not want to Write to Ingesters that are not ACTIVE, but we do want
64+
// to write the extra replica somewhere. So we increase the size of the set
65+
// of replicas for the key. This means we have to also increase the
66+
// size of the replica set for read, but we can read from Leaving ingesters,
67+
// so don't skip it in this case.
68+
// NB dead ingester will be filtered later by DefaultReplicationStrategy.Filter().
69+
if op == Write && ingester.State != ACTIVE {
70+
return true
71+
} else if op == Read && (ingester.State != ACTIVE && ingester.State != LEAVING) {
72+
return true
73+
}
74+
75+
return false
76+
}
77+
4778
// IsHealthy checks whether an ingester appears to be alive and heartbeating
4879
func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool {
4980
return ingester.IsHealthy(op, r.cfg.HeartbeatTimeout)

pkg/ring/replication_strategy_test.go

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,14 @@
11
package ring
22

33
import (
4-
"context"
54
"fmt"
65
"testing"
76
"time"
87

98
"github.com/stretchr/testify/assert"
10-
"github.com/stretchr/testify/require"
11-
12-
"github.com/cortexproject/cortex/pkg/ring/kv"
13-
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
14-
"github.com/cortexproject/cortex/pkg/util/services"
159
)
1610

17-
func TestReplicationStrategy(t *testing.T) {
11+
func TestRingReplicationStrategy(t *testing.T) {
1812
for i, tc := range []struct {
1913
RF, LiveIngesters, DeadIngesters int
2014
op Operation // Will default to READ
@@ -87,19 +81,10 @@ func TestReplicationStrategy(t *testing.T) {
8781
for i := 0; i < tc.DeadIngesters; i++ {
8882
ingesters = append(ingesters, IngesterDesc{})
8983
}
90-
r, err := New(Config{
91-
KVStore: kv.Config{
92-
Mock: consul.NewInMemoryClient(GetCodec()),
93-
},
94-
HeartbeatTimeout: 100 * time.Second,
95-
ReplicationFactor: tc.RF,
96-
}, "ingester", IngesterRingKey)
97-
require.NoError(t, err)
98-
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
99-
defer services.StopAndAwaitTerminated(context.Background(), r) //nolint:errcheck
10084

10185
t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) {
102-
liveIngesters, maxFailure, err := r.replicationStrategy(ingesters, tc.op)
86+
strategy := &DefaultReplicationStrategy{}
87+
liveIngesters, maxFailure, err := strategy.Filter(ingesters, tc.op, tc.RF, 100*time.Second)
10388
if tc.ExpectedError == "" {
10489
assert.NoError(t, err)
10590
assert.Equal(t, tc.LiveIngesters, len(liveIngesters))

pkg/ring/ring.go

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ type Ring struct {
9494
key string
9595
cfg Config
9696
KVClient kv.Client
97+
strategy ReplicationStrategy
9798

9899
mtx sync.RWMutex
99100
ringDesc *Desc
@@ -108,20 +109,26 @@ type Ring struct {
108109

109110
// New creates a new Ring. Being a service, Ring needs to be started to do anything.
110111
func New(cfg Config, name, key string) (*Ring, error) {
111-
if cfg.ReplicationFactor <= 0 {
112-
return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor)
113-
}
114112
codec := GetCodec()
115113
store, err := kv.NewClient(cfg.KVStore, codec)
116114
if err != nil {
117115
return nil, err
118116
}
119117

118+
return NewWithStoreClientAndStrategy(cfg, name, key, store, &DefaultReplicationStrategy{})
119+
}
120+
121+
func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy) (*Ring, error) {
122+
if cfg.ReplicationFactor <= 0 {
123+
return nil, fmt.Errorf("ReplicationFactor must be greater than zero: %d", cfg.ReplicationFactor)
124+
}
125+
120126
r := &Ring{
121127
name: name,
122128
key: key,
123129
cfg: cfg,
124130
KVClient: store,
131+
strategy: strategy,
125132
ringDesc: &Desc{},
126133
memberOwnershipDesc: prometheus.NewDesc(
127134
"cortex_ring_member_ownership_percent",
@@ -208,22 +215,16 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet
208215
distinctHosts[token.Ingester] = struct{}{}
209216
ingester := r.ringDesc.Ingesters[token.Ingester]
210217

211-
// We do not want to Write to Ingesters that are not ACTIVE, but we do want
212-
// to write the extra replica somewhere. So we increase the size of the set
213-
// of replicas for the key. This means we have to also increase the
214-
// size of the replica set for read, but we can read from Leaving ingesters,
215-
// so don't skip it in this case.
216-
// NB dead ingester will be filtered later (by replication_strategy.go).
217-
if op == Write && ingester.State != ACTIVE {
218-
n++
219-
} else if op == Read && (ingester.State != ACTIVE && ingester.State != LEAVING) {
218+
// Check whether the replica set should be extended given we're including
219+
// this instance.
220+
if r.strategy.ShouldExtendReplicaSet(ingester, op) {
220221
n++
221222
}
222223

223224
ingesters = append(ingesters, ingester)
224225
}
225226

226-
liveIngesters, maxFailure, err := r.replicationStrategy(ingesters, op)
227+
liveIngesters, maxFailure, err := r.strategy.Filter(ingesters, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout)
227228
if err != nil {
228229
return ReplicationSet{}, err
229230
}
@@ -423,8 +424,9 @@ func (r *Ring) Subring(key uint32, n int) (ReadRing, error) {
423424
}
424425

425426
sub := &Ring{
426-
name: "subring",
427-
cfg: r.cfg,
427+
name: "subring",
428+
cfg: r.cfg,
429+
strategy: r.strategy,
428430
ringDesc: &Desc{
429431
Ingesters: ingesters,
430432
},

pkg/ring/ring_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ func benchmarkBatch(b *testing.B, numIngester, numKeys int) {
4646
name: "ingester",
4747
cfg: cfg,
4848
ringDesc: desc,
49+
strategy: &DefaultReplicationStrategy{},
4950
}
5051

5152
ctx := context.Background()
@@ -87,6 +88,7 @@ func TestDoBatchZeroIngesters(t *testing.T) {
8788
name: "ingester",
8889
cfg: Config{},
8990
ringDesc: desc,
91+
strategy: &DefaultReplicationStrategy{},
9092
}
9193
require.Error(t, DoBatch(ctx, &r, keys, callback, cleanup))
9294
}
@@ -143,6 +145,7 @@ func TestSubring(t *testing.T) {
143145
},
144146
ringDesc: r,
145147
ringTokens: r.getTokens(),
148+
strategy: &DefaultReplicationStrategy{},
146149
}
147150

148151
// Subring of 0 invalid
@@ -197,6 +200,7 @@ func TestStableSubring(t *testing.T) {
197200
},
198201
ringDesc: r,
199202
ringTokens: r.getTokens(),
203+
strategy: &DefaultReplicationStrategy{},
200204
}
201205

202206
// Generate the same subring multiple times
@@ -256,6 +260,7 @@ func TestZoneAwareIngesterAssignmentSucccess(t *testing.T) {
256260
},
257261
ringDesc: r,
258262
ringTokens: r.getTokens(),
263+
strategy: &DefaultReplicationStrategy{},
259264
}
260265
// use the GenerateTokens to get an array of random uint32 values
261266
testValues := make([]uint32, testCount)
@@ -320,6 +325,7 @@ func TestZoneAwareIngesterAssignmentFailure(t *testing.T) {
320325
},
321326
ringDesc: r,
322327
ringTokens: r.getTokens(),
328+
strategy: &DefaultReplicationStrategy{},
323329
}
324330
// use the GenerateTokens to get an array of random uint32 values
325331
testValues := make([]uint32, testCount)

0 commit comments

Comments
 (0)