Skip to content

Commit 339744e

Browse files
committed
Added shuffle sharding support to generate a subring
Signed-off-by: Marco Pracucci <[email protected]>
1 parent f1b95c6 commit 339744e

File tree

5 files changed

+593
-36
lines changed

5 files changed

+593
-36
lines changed

pkg/ring/model.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -397,11 +397,7 @@ type TokenDesc struct {
397397

398398
// getTokens returns sorted list of tokens with ingester IDs, owned by each ingester in the ring.
399399
func (d *Desc) getTokens() []TokenDesc {
400-
numTokens := 0
401-
for _, ing := range d.Ingesters {
402-
numTokens += len(ing.Tokens)
403-
}
404-
tokens := make([]TokenDesc, 0, numTokens)
400+
tokens := make([]TokenDesc, 0, d.getTokensCountEstimation())
405401
for key, ing := range d.Ingesters {
406402
for _, token := range ing.Tokens {
407403
tokens = append(tokens, TokenDesc{Token: token, Ingester: key, Zone: ing.GetZone()})
@@ -412,6 +408,37 @@ func (d *Desc) getTokens() []TokenDesc {
412408
return tokens
413409
}
414410

411+
// TODO comment
412+
// TODO test me
413+
func (d *Desc) getTokensByZone() map[string][]TokenDesc {
414+
zones := map[string][]TokenDesc{}
415+
416+
for key, ing := range d.Ingesters {
417+
// TODO initialize zones[ing.Zone] with a slice allocated for getTokensCountEstimation() ?
418+
for _, token := range ing.Tokens {
419+
zones[ing.Zone] = append(zones[ing.Zone], TokenDesc{Token: token, Ingester: key, Zone: ing.GetZone()})
420+
}
421+
}
422+
423+
// Ensure tokens are sorted within each zone.
424+
for zone := range zones {
425+
sort.Sort(ByToken(zones[zone]))
426+
}
427+
428+
return zones
429+
}
430+
431+
// getTokensCountEstimation returns a quick estimation of the number of tokens
432+
// across all instances in the ring, assuming each instance has the same number
433+
// of tokens.
434+
func (d *Desc) getTokensCountEstimation() int {
435+
for _, ing := range d.Ingesters {
436+
return len(ing.Tokens) * len(d.Ingesters)
437+
}
438+
439+
return 0
440+
}
441+
415442
func GetOrCreateRingDesc(d interface{}) *Desc {
416443
if d == nil {
417444
return NewDesc()

pkg/ring/replication_strategy.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ func (r *Ring) ReplicationFactor() int {
8585

8686
// IngesterCount is number of ingesters in the ring
8787
func (r *Ring) IngesterCount() int {
88-
r.mtx.Lock()
88+
r.mtx.RLock()
8989
c := len(r.ringDesc.Ingesters)
90-
r.mtx.Unlock()
90+
r.mtx.RUnlock()
9191
return c
9292
}

pkg/ring/ring.go

Lines changed: 112 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ package ring
44

55
import (
66
"context"
7+
"crypto/md5"
8+
"encoding/binary"
79
"errors"
810
"flag"
911
"fmt"
1012
"math"
11-
"sort"
13+
"math/rand"
1214
"sync"
1315
"time"
1416

@@ -107,9 +109,14 @@ type Ring struct {
107109
KVClient kv.Client
108110
strategy ReplicationStrategy
109111

110-
mtx sync.RWMutex
111-
ringDesc *Desc
112-
ringTokens []TokenDesc
112+
mtx sync.RWMutex
113+
ringDesc *Desc
114+
ringTokens []TokenDesc
115+
ringTokensByZone map[string][]TokenDesc
116+
117+
// List of zones for which there's at least 1 instance in the ring. This list is guaranteed
118+
// to be sorted alphabetically.
119+
ringZones []string
113120

114121
memberOwnershipDesc *prometheus.Desc
115122
numMembersDesc *prometheus.Desc
@@ -190,11 +197,15 @@ func (r *Ring) loop(ctx context.Context) error {
190197

191198
ringDesc := value.(*Desc)
192199
ringTokens := ringDesc.getTokens()
200+
ringTokensByZone := ringDesc.getTokensByZone()
201+
ringZones := getZones(ringTokensByZone)
193202

194203
r.mtx.Lock()
195204
defer r.mtx.Unlock()
196205
r.ringDesc = ringDesc
197206
r.ringTokens = ringTokens
207+
r.ringTokensByZone = ringTokensByZone
208+
r.ringZones = ringZones
198209
return true
199210
})
200211
return nil
@@ -213,7 +224,7 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet
213224
ingesters = buf[:0]
214225
distinctHosts = map[string]struct{}{}
215226
distinctZones = map[string]struct{}{}
216-
start = r.search(key)
227+
start = searchToken(r.ringTokens, key)
217228
iterations = 0
218229
)
219230
for i := start; len(distinctHosts) < n && iterations < len(r.ringTokens); i++ {
@@ -290,16 +301,6 @@ func (r *Ring) GetAll(op Operation) (ReplicationSet, error) {
290301
}, nil
291302
}
292303

293-
func (r *Ring) search(key uint32) int {
294-
i := sort.Search(len(r.ringTokens), func(x int) bool {
295-
return r.ringTokens[x].Token > key
296-
})
297-
if i >= len(r.ringTokens) {
298-
i = 0
299-
}
300-
return i
301-
}
302-
303304
// Describe implements prometheus.Collector.
304305
func (r *Ring) Describe(ch chan<- *prometheus.Desc) {
305306
ch <- r.memberOwnershipDesc
@@ -411,7 +412,7 @@ func (r *Ring) Subring(key uint32, n int) ReadRing {
411412
var (
412413
ingesters = make(map[string]IngesterDesc, n)
413414
distinctHosts = map[string]struct{}{}
414-
start = r.search(key)
415+
start = searchToken(r.ringTokens, key)
415416
iterations = 0
416417
)
417418

@@ -464,6 +465,101 @@ func (r *Ring) Subring(key uint32, n int) ReadRing {
464465
return sub
465466
}
466467

468+
// ShuffleShard returns a subring for the provided identifier (eg. a tenant ID)
469+
// and size (number of instances). The size is expected to be a multiple of the
470+
// number of zones and the returned subring will contain the same number of
471+
// instances per zone as far as there are enough registered instances in the ring.
472+
//
473+
// The algorithm used to build the subring is a shuffle sharder based on probabilistic
474+
// hashing. We treat each zone as a separate ring and pick N unique replicas from each
475+
// zone, walking the ring starting from random but predictable numbers. The random
476+
// generator is initialised with a seed based on the provided identifier.
477+
//
478+
// This implementation guarantees:
479+
// - Stability: given the same ring, two invocations returns the same result.
480+
// - Consistency: adding/removing 1 instance from the ring generates a resulting
481+
// subring with no more then 1 difference.
482+
// - Shuffling: probabilistically, for a large enough cluster each identifier gets
483+
// a different set of instances, with a reduced number of overlapping instances
484+
// between two identifiers.
485+
func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
486+
// Nothing to do if the shard size is not smaller then the actual ring.
487+
if size <= 0 || r.IngesterCount() <= size {
488+
return r
489+
}
490+
491+
// Use the identifier to compute an hash we'll use to seed the random.
492+
hasher := md5.New()
493+
hasher.Write([]byte(identifier)) // nolint:errcheck
494+
checksum := hasher.Sum(nil)
495+
496+
// Generate the seed based on the first 64 bits of the checksum.
497+
seed := int64(binary.BigEndian.Uint64(checksum))
498+
499+
// Initialise the random generator used to select instances in the ring.
500+
random := rand.New(rand.NewSource(seed))
501+
502+
r.mtx.RLock()
503+
504+
// We expect the shard size to be divisible by the number of zones, in order to
505+
// have nodes balanced across zones. If it's not, we do round up.
506+
numInstancesPerZone := int(math.Ceil(float64(size) / float64(len(r.ringZones))))
507+
508+
shard := make(map[string]IngesterDesc, size)
509+
510+
// We need to iterate zones always in the same order to guarantee stability.
511+
for _, zone := range r.ringZones {
512+
tokens := r.ringTokensByZone[zone]
513+
514+
// To select one more instance while guaranteeing the "consistency" property,
515+
// we do pick a random value from the generator and resolve uniqueness collisions
516+
// (if any) continuing walking the ring.
517+
for i := 0; i < numInstancesPerZone; i++ {
518+
start := searchToken(tokens, random.Uint32())
519+
iterations := 0
520+
found := false
521+
522+
for p := start; iterations < len(tokens); p++ {
523+
iterations++
524+
525+
// Wrap p around in the ring.
526+
p %= len(tokens)
527+
528+
// Ensure we select an unique instance.
529+
if _, ok := shard[tokens[p].Ingester]; ok {
530+
continue
531+
}
532+
533+
shard[tokens[p].Ingester] = r.ringDesc.Ingesters[tokens[p].Ingester]
534+
found = true
535+
break
536+
}
537+
538+
// If one more instance has not been found, we can stop looking for
539+
// more instances in this zone, because it means the zone has no more
540+
// instances which haven't been already selected.
541+
if !found {
542+
break
543+
}
544+
}
545+
}
546+
547+
r.mtx.RUnlock()
548+
549+
// Build a read-only ring for the shard.
550+
shardDesc := &Desc{Ingesters: shard}
551+
shardTokensByZone := shardDesc.getTokensByZone()
552+
553+
return &Ring{
554+
cfg: r.cfg,
555+
strategy: r.strategy,
556+
ringDesc: shardDesc,
557+
ringTokens: shardDesc.getTokens(),
558+
ringTokensByZone: shardTokensByZone,
559+
ringZones: getZones(shardTokensByZone),
560+
}
561+
}
562+
467563
// GetInstanceState returns the current state of an instance or an error if the
468564
// instance does not exist in the ring.
469565
func (r *Ring) GetInstanceState(instanceID string) (IngesterState, error) {

0 commit comments

Comments
 (0)