diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 63314ed4d4c..c79e6dcc8ce 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -464,9 +464,10 @@ type mockRing struct { replicationFactor uint32 } -func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error) { +func (r mockRing) Get(key uint32, op ring.Operation, buf []ring.IngesterDesc) (ring.ReplicationSet, error) { result := ring.ReplicationSet{ MaxErrors: 1, + Ingesters: buf[:0], } for i := uint32(0); i < r.replicationFactor; i++ { n := (key + i) % uint32(len(r.ingesters)) @@ -475,18 +476,6 @@ func (r mockRing) Get(key uint32, op ring.Operation) (ring.ReplicationSet, error return result, nil } -func (r mockRing) BatchGet(keys []uint32, op ring.Operation) ([]ring.ReplicationSet, error) { - result := []ring.ReplicationSet{} - for i := 0; i < len(keys); i++ { - rs, err := r.Get(keys[i], op) - if err != nil { - return nil, err - } - result = append(result, rs) - } - return result, nil -} - func (r mockRing) GetAll() (ring.ReplicationSet, error) { return ring.ReplicationSet{ Ingesters: r.ingesters, @@ -498,6 +487,10 @@ func (r mockRing) ReplicationFactor() int { return int(r.replicationFactor) } +func (r mockRing) IngesterCount() int { + return len(r.ingesters) +} + type mockIngester struct { sync.Mutex client.IngesterClient diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 9c1e2b5422e..546aa4779de 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -68,7 +68,7 @@ func (d *Distributor) queryPrep(ctx context.Context, from, to model.Time, matche // Get ingesters by metricName if one exists, otherwise get all ingesters metricNameMatcher, _, ok := extract.MetricNameMatcherFromMatchers(matchers) if !d.cfg.ShardByAllLabels && ok && metricNameMatcher.Type == labels.MatchEqual { - replicationSet, err = d.ring.Get(shardByMetricName(userID, metricNameMatcher.Value), ring.Read) + replicationSet, err = d.ring.Get(shardByMetricName(userID, metricNameMatcher.Value), ring.Read, nil) } else { replicationSet, err = d.ring.GetAll() } diff --git a/pkg/ring/batch.go b/pkg/ring/batch.go index 2decdea48ce..9ef1b9705d9 100644 --- a/pkg/ring/batch.go +++ b/pkg/ring/batch.go @@ -37,19 +37,26 @@ type itemTracker struct { // // Not implemented as a method on Ring so we can test separately. func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error { - replicationSets, err := r.BatchGet(keys, Write) - if err != nil { - return err - } - + expectedTrackers := len(keys) * (r.ReplicationFactor() + 1) / r.IngesterCount() itemTrackers := make([]itemTracker, len(keys)) - ingesters := map[string]ingester{} - for i, replicationSet := range replicationSets { + ingesters := make(map[string]ingester, r.IngesterCount()) + + const maxExpectedReplicationSet = 5 // Typical replication factor 3, plus one for inactive plus one for luck. + var descs [maxExpectedReplicationSet]IngesterDesc + for i, key := range keys { + replicationSet, err := r.Get(key, Write, descs[:0]) + if err != nil { + return err + } itemTrackers[i].minSuccess = len(replicationSet.Ingesters) - replicationSet.MaxErrors itemTrackers[i].maxFailures = replicationSet.MaxErrors for _, desc := range replicationSet.Ingesters { - curr := ingesters[desc.Addr] + curr, found := ingesters[desc.Addr] + if !found { + curr.itemTrackers = make([]*itemTracker, 0, expectedTrackers) + curr.indexes = make([]int, 0, expectedTrackers) + } ingesters[desc.Addr] = ingester{ desc: desc, itemTrackers: append(curr.itemTrackers, &itemTrackers[i]), diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go index a62b1683545..5d0728659fe 100644 --- a/pkg/ring/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -10,9 +10,8 @@ import ( // tolerate. // - Filters out dead ingesters so the one doesn't even try to write to them. // - Checks there is enough ingesters for an operation to succeed. -func (r *Ring) replicationStrategy(ingesters []IngesterDesc, op Operation) ( - liveIngesters []IngesterDesc, maxFailure int, err error, -) { +// The ingesters argument may be overwritten. +func (r *Ring) replicationStrategy(ingesters []IngesterDesc, op Operation) ([]IngesterDesc, int, error) { // We need a response from a quorum of ingesters, which is n/2 + 1. In the // case of a node joining/leaving, the actual replica set might be bigger // than the replication factor, so use the bigger or the two. @@ -21,29 +20,29 @@ func (r *Ring) replicationStrategy(ingesters []IngesterDesc, op Operation) ( replicationFactor = len(ingesters) } minSuccess := (replicationFactor / 2) + 1 - maxFailure = replicationFactor - minSuccess + maxFailure := replicationFactor - minSuccess // Skip those that have not heartbeated in a while. NB these are still // included in the calculation of minSuccess, so if too many failed ingesters // will cause the whole write to fail. - liveIngesters = make([]IngesterDesc, 0, len(ingesters)) - for _, ingester := range ingesters { - if r.IsHealthy(&ingester, op) { - liveIngesters = append(liveIngesters, ingester) + for i := 0; i < len(ingesters); { + if r.IsHealthy(&ingesters[i], op) { + i++ } else { + ingesters = append(ingesters[:i], ingesters[i+1:]...) maxFailure-- } } // This is just a shortcut - if there are not minSuccess available ingesters, // after filtering out dead ones, don't even bother trying. - if maxFailure < 0 || len(liveIngesters) < minSuccess { - err = fmt.Errorf("at least %d live ingesters required, could only find %d", - minSuccess, len(liveIngesters)) - return + if maxFailure < 0 || len(ingesters) < minSuccess { + err := fmt.Errorf("at least %d live ingesters required, could only find %d", + minSuccess, len(ingesters)) + return nil, 0, err } - return + return ingesters, maxFailure, nil } // IsHealthy checks whether an ingester appears to be alive and heartbeating @@ -60,3 +59,11 @@ func (r *Ring) IsHealthy(ingester *IngesterDesc, op Operation) bool { func (r *Ring) ReplicationFactor() int { return r.cfg.ReplicationFactor } + +// IngesterCount is number of ingesters in the ring +func (r *Ring) IngesterCount() int { + r.mtx.Lock() + c := len(r.ringDesc.Ingesters) + r.mtx.Unlock() + return c +} diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index a74b22ecfdc..681f66203fd 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -31,10 +31,13 @@ const ( type ReadRing interface { prometheus.Collector - Get(key uint32, op Operation) (ReplicationSet, error) - BatchGet(keys []uint32, op Operation) ([]ReplicationSet, error) + // Get returns n (or more) ingesters which form the replicas for the given key. + // buf is a slice to be overwritten for the return value + // to avoid memory allocation; can be nil. + Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error) GetAll() (ReplicationSet, error) ReplicationFactor() int + IngesterCount() int } // Operation can be Read or Write @@ -181,37 +184,16 @@ func migrateRing(desc *Desc) []TokenDesc { } // Get returns n (or more) ingesters which form the replicas for the given key. -func (r *Ring) Get(key uint32, op Operation) (ReplicationSet, error) { +func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error) { r.mtx.RLock() defer r.mtx.RUnlock() - return r.getInternal(key, op) -} - -// BatchGet returns ReplicationFactor (or more) ingesters which form the replicas -// for the given keys. The order of the result matches the order of the input. -func (r *Ring) BatchGet(keys []uint32, op Operation) ([]ReplicationSet, error) { - r.mtx.RLock() - defer r.mtx.RUnlock() - - result := make([]ReplicationSet, len(keys), len(keys)) - for i, key := range keys { - rs, err := r.getInternal(key, op) - if err != nil { - return nil, err - } - result[i] = rs - } - return result, nil -} - -func (r *Ring) getInternal(key uint32, op Operation) (ReplicationSet, error) { if r.ringDesc == nil || len(r.ringDesc.Tokens) == 0 { return ReplicationSet{}, ErrEmptyRing } var ( n = r.cfg.ReplicationFactor - ingesters = make([]IngesterDesc, 0, n) + ingesters = buf[:0] distinctHosts = map[string]struct{}{} start = r.search(key) iterations = 0 diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 26377d70560..2178c41023e 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -3,20 +3,32 @@ package ring import ( "context" "fmt" + "math/rand" "testing" + "time" "github.com/stretchr/testify/require" - "github.com/cortexproject/cortex/pkg/ring/kv" - "github.com/cortexproject/cortex/pkg/ring/kv/consul" + "github.com/cortexproject/cortex/pkg/util/flagext" ) const ( - numIngester = 100 - numTokens = 512 + numTokens = 512 ) -func BenchmarkRing(b *testing.B) { +func BenchmarkBatch10x100(b *testing.B) { + benchmarkBatch(b, 10, 100) +} + +func BenchmarkBatch100x100(b *testing.B) { + benchmarkBatch(b, 100, 100) +} + +func BenchmarkBatch100x1000(b *testing.B) { + benchmarkBatch(b, 100, 1000) +} + +func benchmarkBatch(b *testing.B, numIngester, numKeys int) { // Make a random ring with N ingesters, and M tokens per ingests desc := NewDesc() takenTokens := []uint32{} @@ -26,26 +38,33 @@ func BenchmarkRing(b *testing.B) { desc.AddIngester(fmt.Sprintf("%d", i), fmt.Sprintf("ingester%d", i), tokens, ACTIVE, false) } - consul := consul.NewInMemoryClient(GetCodec()) - err := consul.CAS(context.Background(), ConsulKey, func(interface{}) (interface{}, bool, error) { - return desc, false, nil - }) - require.NoError(b, err) - - r, err := New(Config{ - KVStore: kv.Config{ - Mock: consul, - }, - ReplicationFactor: 3, - }, "ingester") - if err != nil { - b.Fatal(err) + cfg := Config{} + flagext.DefaultValues(&cfg) + r := Ring{ + name: "ingester", + cfg: cfg, + ringDesc: desc, } + ctx := context.Background() + callback := func(IngesterDesc, []int) error { + return nil + } + cleanup := func() { + } + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + keys := make([]uint32, numKeys) // Generate a batch of N random keys, and look them up b.ResetTimer() for i := 0; i < b.N; i++ { - keys := GenerateTokens(100, nil) - r.BatchGet(keys, Write) + generateKeys(rnd, numKeys, keys) + err := DoBatch(ctx, &r, keys, callback, cleanup) + require.NoError(b, err) + } +} + +func generateKeys(r *rand.Rand, numTokens int, dest []uint32) { + for i := 0; i < numTokens; i++ { + dest[i] = r.Uint32() } } diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 738250e5aa9..526a6545a57 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -330,7 +330,7 @@ func (r *Ruler) Evaluate(userID string, item *workItem) { } func (r *Ruler) ownsRule(hash uint32) bool { - rlrs, err := r.ring.Get(hash, ring.Read) + rlrs, err := r.ring.Get(hash, ring.Read, nil) // If an error occurs evaluate a rule as if it is owned // better to have extra datapoints for a rule than none at all // TODO: add a temporary cache of owned rule values or something to fall back on