Skip to content

Optimise ring.DoBatch() #1624

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 6 additions & 13 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,9 +464,10 @@ type mockRing struct {
replicationFactor uint32
}

func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error) {
func (r mockRing) Get(key uint32, op ring.Operation, buf []ring.IngesterDesc) (ring.ReplicationSet, error) {
result := ring.ReplicationSet{
MaxErrors: 1,
Ingesters: buf[:0],
}
for i := uint32(0); i < r.replicationFactor; i++ {
n := (key + i) % uint32(len(r.ingesters))
Expand All @@ -475,18 +476,6 @@ func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error
return result, nil
}

func (r mockRing) BatchGet(keys []uint32, op ring.Operation) ([]ring.ReplicationSet, error) {
result := []ring.ReplicationSet{}
for i := 0; i < len(keys); i++ {
rs, err := r.Get(keys[i], op)
if err != nil {
return nil, err
}
result = append(result, rs)
}
return result, nil
}

func (r mockRing) GetAll() (ring.ReplicationSet, error) {
return ring.ReplicationSet{
Ingesters: r.ingesters,
Expand All @@ -498,6 +487,10 @@ func (r mockRing) ReplicationFactor() int {
return int(r.replicationFactor)
}

func (r mockRing) IngesterCount() int {
return len(r.ingesters)
}

type mockIngester struct {
sync.Mutex
client.IngesterClient
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (d *Distributor) queryPrep(ctx context.Context, from, to model.Time, matche
// Get ingesters by metricName if one exists, otherwise get all ingesters
metricNameMatcher, _, ok := extract.MetricNameMatcherFromMatchers(matchers)
if !d.cfg.ShardByAllLabels && ok && metricNameMatcher.Type == labels.MatchEqual {
replicationSet, err = d.ring.Get(shardByMetricName(userID, metricNameMatcher.Value), ring.Read)
replicationSet, err = d.ring.Get(shardByMetricName(userID, metricNameMatcher.Value), ring.Read, nil)
} else {
replicationSet, err = d.ring.GetAll()
}
Expand Down
23 changes: 15 additions & 8 deletions pkg/ring/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,26 @@ type itemTracker struct {
//
// Not implemented as a method on Ring so we can test separately.
func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error {
replicationSets, err := r.BatchGet(keys, Write)
if err != nil {
return err
}

expectedTrackers := len(keys) * (r.ReplicationFactor() + 1) / r.IngesterCount()
itemTrackers := make([]itemTracker, len(keys))
ingesters := map[string]ingester{}
for i, replicationSet := range replicationSets {
ingesters := make(map[string]ingester, r.IngesterCount())

const maxExpectedReplicationSet = 5 // Typical replication factor 3, plus one for inactive plus one for luck.
var descs [maxExpectedReplicationSet]IngesterDesc
for i, key := range keys {
replicationSet, err := r.Get(key, Write, descs[:0])
if err != nil {
return err
}
itemTrackers[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors
itemTrackers[i].maxFailures = replicationSet.MaxErrors

for _, desc := range replicationSet.Ingesters {
curr := ingesters[desc.Addr]
curr, found := ingesters[desc.Addr]
if !found {
curr.itemTrackers = make([]*itemTracker, 0, expectedTrackers)
curr.indexes = make([]int, 0, expectedTrackers)
}
ingesters[desc.Addr] = ingester{
desc: desc,
itemTrackers: append(curr.itemTrackers, &itemTrackers[i]),
Expand Down
33 changes: 20 additions & 13 deletions pkg/ring/replication_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ import (
// tolerate.
// - Filters out dead ingesters so the one doesn't even try to write to them.
// - Checks there is enough ingesters for an operation to succeed.
func (r *Ring) replicationStrategy(ingesters []IngesterDesc, op Operation) (
liveIngesters []IngesterDesc, maxFailure int, err error,
) {
// The ingesters argument may be overwritten.
func (r *Ring) replicationStrategy(ingesters []IngesterDesc, op Operation) ([]IngesterDesc, int, error) {
// We need a response from a quorum of ingesters, which is n/2 + 1. In the
// case of a node joining/leaving, the actual replica set might be bigger
// than the replication factor, so use the bigger or the two.
Expand All @@ -21,29 +20,29 @@ func (r *Ring) replicationStrategy(ingesters []IngesterDesc, op Operation) (
replicationFactor = len(ingesters)
}
minSuccess := (replicationFactor / 2) + 1
maxFailure = replicationFactor - minSuccess
maxFailure := replicationFactor - minSuccess

// Skip those that have not heartbeated in a while. NB these are still
// included in the calculation of minSuccess, so if too many failed ingesters
// will cause the whole write to fail.
liveIngesters = make([]IngesterDesc, 0, len(ingesters))
for _, ingester := range ingesters {
if r.IsHealthy(&ingester, op) {
liveIngesters = append(liveIngesters, ingester)
for i := 0; i < len(ingesters); {
if r.IsHealthy(&ingesters[i], op) {
i++
} else {
ingesters = append(ingesters[:i], ingesters[i+1:]...)
maxFailure--
}
}

// This is just a shortcut - if there are not minSuccess available ingesters,
// after filtering out dead ones, don't even bother trying.
if maxFailure < 0 || len(liveIngesters) < minSuccess {
err = fmt.Errorf("at least %d live ingesters required, could only find %d",
minSuccess, len(liveIngesters))
return
if maxFailure < 0 || len(ingesters) < minSuccess {
err := fmt.Errorf("at least %d live ingesters required, could only find %d",
minSuccess, len(ingesters))
return nil, 0, err
}

return
return ingesters, maxFailure, nil
}

// IsHealthy checks whether an ingester appears to be alive and heartbeating
Expand All @@ -60,3 +59,11 @@ func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool {
func (r *Ring) ReplicationFactor() int {
return r.cfg.ReplicationFactor
}

// IngesterCount is number of ingesters in the ring
func (r *Ring) IngesterCount() int {
r.mtx.Lock()
c := len(r.ringDesc.Ingesters)
r.mtx.Unlock()
return c
}
32 changes: 7 additions & 25 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ const (
type ReadRing interface {
prometheus.Collector

Get(key uint32, op Operation) (ReplicationSet, error)
BatchGet(keys []uint32, op Operation) ([]ReplicationSet, error)
// Get returns n (or more) ingesters which form the replicas for the given key.
// buf is a slice to be overwritten for the return value
// to avoid memory allocation; can be nil.
Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error)
GetAll() (ReplicationSet, error)
ReplicationFactor() int
IngesterCount() int
}

// Operation can be Read or Write
Expand Down Expand Up @@ -181,37 +184,16 @@ func migrateRing(desc *Desc) []TokenDesc {
}

// Get returns n (or more) ingesters which form the replicas for the given key.
func (r *Ring) Get(key uint32, op Operation) (ReplicationSet, error) {
func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
return r.getInternal(key, op)
}

// BatchGet returns ReplicationFactor (or more) ingesters which form the replicas
// for the given keys. The order of the result matches the order of the input.
func (r *Ring) BatchGet(keys []uint32, op Operation) ([]ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()

result := make([]ReplicationSet, len(keys), len(keys))
for i, key := range keys {
rs, err := r.getInternal(key, op)
if err != nil {
return nil, err
}
result[i] = rs
}
return result, nil
}

func (r *Ring) getInternal(key uint32, op Operation) (ReplicationSet, error) {
if r.ringDesc == nil || len(r.ringDesc.Tokens) == 0 {
return ReplicationSet{}, ErrEmptyRing
}

var (
n = r.cfg.ReplicationFactor
ingesters = make([]IngesterDesc, 0, n)
ingesters = buf[:0]
distinctHosts = map[string]struct{}{}
start = r.search(key)
iterations = 0
Expand Down
61 changes: 40 additions & 21 deletions pkg/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,32 @@ package ring
import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
"github.com/cortexproject/cortex/pkg/util/flagext"
)

const (
numIngester = 100
numTokens = 512
numTokens = 512
)

func BenchmarkRing(b *testing.B) {
func BenchmarkBatch10x100(b *testing.B) {
benchmarkBatch(b, 10, 100)
}

func BenchmarkBatch100x100(b *testing.B) {
benchmarkBatch(b, 100, 100)
}

func BenchmarkBatch100x1000(b *testing.B) {
benchmarkBatch(b, 100, 1000)
}

func benchmarkBatch(b *testing.B, numIngester, numKeys int) {
// Make a random ring with N ingesters, and M tokens per ingests
desc := NewDesc()
takenTokens := []uint32{}
Expand All @@ -26,26 +38,33 @@ func BenchmarkRing(b *testing.B) {
desc.AddIngester(fmt.Sprintf("%d", i), fmt.Sprintf("ingester%d", i), tokens, ACTIVE, false)
}

consul := consul.NewInMemoryClient(GetCodec())
err := consul.CAS(context.Background(), ConsulKey, func(interface{}) (interface{}, bool, error) {
return desc, false, nil
})
require.NoError(b, err)

r, err := New(Config{
KVStore: kv.Config{
Mock: consul,
},
ReplicationFactor: 3,
}, "ingester")
if err != nil {
b.Fatal(err)
cfg := Config{}
flagext.DefaultValues(&cfg)
r := Ring{
name: "ingester",
cfg: cfg,
ringDesc: desc,
}

ctx := context.Background()
callback := func(IngesterDesc, []int) error {
return nil
}
cleanup := func() {
}
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
keys := make([]uint32, numKeys)
// Generate a batch of N random keys, and look them up
b.ResetTimer()
for i := 0; i < b.N; i++ {
keys := GenerateTokens(100, nil)
r.BatchGet(keys, Write)
generateKeys(rnd, numKeys, keys)
err := DoBatch(ctx, &r, keys, callback, cleanup)
require.NoError(b, err)
}
}

func generateKeys(r *rand.Rand, numTokens int, dest []uint32) {
for i := 0; i < numTokens; i++ {
dest[i] = r.Uint32()
}
}
2 changes: 1 addition & 1 deletion pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (r *Ruler) Evaluate(userID string, item *workItem) {
}

func (r *Ruler) ownsRule(hash uint32) bool {
rlrs, err := r.ring.Get(hash, ring.Read)
rlrs, err := r.ring.Get(hash, ring.Read, nil)
// If an error occurs evaluate a rule as if it is owned
// better to have extra datapoints for a rule than none at all
// TODO: add a temporary cache of owned rule values or something to fall back on
Expand Down