Skip to content

Commit c9d4394

Browse files
authored
Cancel abandoned operations in ReplicationSet.Do() (#3178)
Generally it will start a set of operations in parallel, and return once enough of them have succeeded. Pass down a context to each one, and cancel that context when ReplicationSet.Do() returns. Signed-off-by: Bryan Boreham <[email protected]>
1 parent 9f3ce2c commit c9d4394

File tree

4 files changed

+16
-17
lines changed

4 files changed

+16
-17
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
* [BUGFIX] Configs: prevent validation of templates to fail when using template functions. #3157
8383
* [BUGFIX] Configuring the S3 URL with an `@` but without username and password doesn't enable the AWS static credentials anymore. #3170
8484
* [BUGFIX] Limit errors on ranged queries (`api/v1/query_range`) no longer return a status code `500` but `422` instead. #3167
85+
* [BUGFIX] No-longer-needed ingester operations from queries are now canceled. #3178
8586

8687
## 1.3.0 / 2020-08-21
8788

pkg/distributor/distributor.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ func (d *Distributor) send(ctx context.Context, ingester ring.IngesterDesc, time
604604
}
605605

606606
// ForAllIngesters runs f, in parallel, for all ingesters
607-
func (d *Distributor) ForAllIngesters(ctx context.Context, reallyAll bool, f func(client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
607+
func (d *Distributor) ForAllIngesters(ctx context.Context, reallyAll bool, f func(context.Context, client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
608608
replicationSet, err := d.ingestersRing.GetAll(ring.Read)
609609
if err != nil {
610610
return nil, err
@@ -613,13 +613,13 @@ func (d *Distributor) ForAllIngesters(ctx context.Context, reallyAll bool, f fun
613613
replicationSet.MaxErrors = 0
614614
}
615615

616-
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ing *ring.IngesterDesc) (interface{}, error) {
616+
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.IngesterDesc) (interface{}, error) {
617617
client, err := d.ingesterPool.GetClientFor(ing.Addr)
618618
if err != nil {
619619
return nil, err
620620
}
621621

622-
return f(client.(ingester_client.IngesterClient))
622+
return f(ctx, client.(ingester_client.IngesterClient))
623623
})
624624
}
625625

@@ -628,7 +628,7 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName mod
628628
req := &client.LabelValuesRequest{
629629
LabelName: string(labelName),
630630
}
631-
resps, err := d.ForAllIngesters(ctx, false, func(client client.IngesterClient) (interface{}, error) {
631+
resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
632632
return client.LabelValues(ctx, req)
633633
})
634634
if err != nil {
@@ -652,7 +652,7 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, labelName mod
652652
// LabelNames returns all of the label names.
653653
func (d *Distributor) LabelNames(ctx context.Context) ([]string, error) {
654654
req := &client.LabelNamesRequest{}
655-
resps, err := d.ForAllIngesters(ctx, false, func(client client.IngesterClient) (interface{}, error) {
655+
resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
656656
return client.LabelNames(ctx, req)
657657
})
658658
if err != nil {
@@ -684,7 +684,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
684684
return nil, err
685685
}
686686

687-
resps, err := d.ForAllIngesters(ctx, false, func(client client.IngesterClient) (interface{}, error) {
687+
resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
688688
return client.MetricsForLabelMatchers(ctx, req)
689689
})
690690
if err != nil {
@@ -712,7 +712,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
712712
func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) {
713713
req := &ingester_client.MetricsMetadataRequest{}
714714
// TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled.
715-
resps, err := d.ForAllIngesters(ctx, false, func(client client.IngesterClient) (interface{}, error) {
715+
resps, err := d.ForAllIngesters(ctx, false, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
716716
return client.MetricsMetadata(ctx, req)
717717
})
718718
if err != nil {
@@ -746,7 +746,7 @@ func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetad
746746
// UserStats returns statistics about the current user.
747747
func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) {
748748
req := &client.UserStatsRequest{}
749-
resps, err := d.ForAllIngesters(ctx, true, func(client client.IngesterClient) (interface{}, error) {
749+
resps, err := d.ForAllIngesters(ctx, true, func(ctx context.Context, client client.IngesterClient) (interface{}, error) {
750750
return client.UserStats(ctx, req)
751751
})
752752
if err != nil {

pkg/distributor/query.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (d *Distributor) queryPrep(ctx context.Context, from, to model.Time, matche
8888
func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.ReplicationSet, req *client.QueryRequest) (model.Matrix, error) {
8989
// Fetch samples from multiple ingesters in parallel, using the replicationSet
9090
// to deal with consistency.
91-
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ing *ring.IngesterDesc) (interface{}, error) {
91+
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.IngesterDesc) (interface{}, error) {
9292
client, err := d.ingesterPool.GetClientFor(ing.Addr)
9393
if err != nil {
9494
return nil, err
@@ -133,7 +133,7 @@ func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.Re
133133
// queryIngesterStream queries the ingesters using the new streaming API.
134134
func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
135135
// Fetch samples from multiple ingesters
136-
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ing *ring.IngesterDesc) (interface{}, error) {
136+
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.IngesterDesc) (interface{}, error) {
137137
client, err := d.ingesterPool.GetClientFor(ing.Addr)
138138
if err != nil {
139139
return nil, err

pkg/ring/replication_set.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,15 @@ type ReplicationSet struct {
1414

1515
// Do function f in parallel for all replicas in the set, erroring is we exceed
1616
// MaxErrors and returning early otherwise.
17-
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(*IngesterDesc) (interface{}, error)) ([]interface{}, error) {
17+
func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *IngesterDesc) (interface{}, error)) ([]interface{}, error) {
1818
var (
1919
errs = make(chan error, len(r.Ingesters))
2020
resultsChan = make(chan interface{}, len(r.Ingesters))
2121
minSuccess = len(r.Ingesters) - r.MaxErrors
22-
done = make(chan struct{})
2322
forceStart = make(chan struct{}, r.MaxErrors)
2423
)
25-
defer func() {
26-
close(done)
27-
}()
24+
ctx, cancel := context.WithCancel(ctx)
25+
defer cancel()
2826

2927
for i := range r.Ingesters {
3028
go func(i int, ing *IngesterDesc) {
@@ -33,13 +31,13 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(*Ing
3331
after := time.NewTimer(delay)
3432
defer after.Stop()
3533
select {
36-
case <-done:
34+
case <-ctx.Done():
3735
return
3836
case <-forceStart:
3937
case <-after.C:
4038
}
4139
}
42-
result, err := f(ing)
40+
result, err := f(ctx, ing)
4341
if err != nil {
4442
errs <- err
4543
} else {

0 commit comments

Comments
 (0)