Skip to content

Commit c0a52a0

Browse files
csmarchbanksbboreham
authored andcommitted
Perform ingester pushes in a background context rather than the request context (#736)
* Perform ingester pushes in a background context rather than the request context * Remove timeout on overall context, and add configured timeout to each sample push
1 parent e2444f8 commit c0a52a0

File tree

1 file changed

+9
-3
lines changed

1 file changed

+9
-3
lines changed

pkg/distributor/distributor.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"golang.org/x/time/rate"
1515

1616
"github.com/go-kit/kit/log/level"
17+
opentracing "github.com/opentracing/opentracing-go"
1718
"github.com/prometheus/client_golang/prometheus"
1819
"github.com/prometheus/common/model"
1920
"github.com/prometheus/prometheus/pkg/labels"
@@ -346,11 +347,16 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
346347
done: make(chan struct{}),
347348
err: make(chan error),
348349
}
349-
ctx, cancel := context.WithTimeout(ctx, d.cfg.RemoteTimeout)
350-
defer cancel() // cancel the timeout to release resources
351350
for ingester, samples := range samplesByIngester {
352351
go func(ingester *ring.IngesterDesc, samples []*sampleTracker) {
353-
d.sendSamples(ctx, ingester, samples, &pushTracker)
352+
// Use a background context to make sure all ingesters get samples even if we return early
353+
localCtx, cancel := context.WithTimeout(context.Background(), d.cfg.RemoteTimeout)
354+
defer cancel()
355+
localCtx = user.InjectOrgID(localCtx, userID)
356+
if sp := opentracing.SpanFromContext(ctx); sp != nil {
357+
localCtx = opentracing.ContextWithSpan(localCtx, sp)
358+
}
359+
d.sendSamples(localCtx, ingester, samples, &pushTracker)
354360
}(ingester, samples)
355361
}
356362
select {

0 commit comments

Comments
 (0)