Skip to content

Commit 8633cd8

Browse files
fix race in processing of headers in sharded queries (#2762)
* fix race in processing of headers in sharded queries Signed-off-by: Sandeep Sukhani <[email protected]> * put a lock for extra protection from panicing Signed-off-by: Sandeep Sukhani <[email protected]> * update changelog Signed-off-by: Sandeep Sukhani <[email protected]>
1 parent 3df7a37 commit 8633cd8

File tree

3 files changed

+16
-12
lines changed

3 files changed

+16
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@
140140
* [BUGFIX] Ingester: increment series per metric when recovering from WAL or transfer. #2674
141141
* [BUGFIX] Fixed `wrong number of arguments for 'mget' command` Redis error when a query has no chunks to lookup from storage. #2700
142142
* [BUGFIX] Ingester: Automatically remove old tmp checkpoints, fixing a potential disk space leak after an ingester crashes.
143+
* [BUGFIX] Fix race in processing of headers in sharded queries. #2762
143144

144145
## 1.1.0 / 2020-05-21
145146

pkg/querier/queryrange/queryable.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@ func (q *ShardedQueryable) Querier(ctx context.Context, mint, maxt int64) (stora
3030
return q.sharededQuerier, nil
3131
}
3232

33-
func (q *ShardedQueryable) getResponseHeaders() map[string][]string {
34-
return q.sharededQuerier.ResponseHeaders
33+
func (q *ShardedQueryable) getResponseHeaders() []*PrometheusResponseHeader {
34+
q.sharededQuerier.ResponseHeadersMtx.Lock()
35+
defer q.sharededQuerier.ResponseHeadersMtx.Unlock()
36+
37+
return headersMapToPrometheusResponseHeaders(q.sharededQuerier.ResponseHeaders)
3538
}
3639

3740
// ShardedQuerier is a an implementor of the Querier interface.
@@ -95,8 +98,8 @@ func (q *ShardedQuerier) handleEmbeddedQuery(encoded string) storage.SeriesSet {
9598
errCh <- err
9699
return
97100
}
98-
samplesCh <- streams
99101
q.setResponseHeaders(resp.(*PrometheusResponse).Headers)
102+
samplesCh <- streams
100103
}(query)
101104
}
102105

@@ -141,3 +144,11 @@ func (q *ShardedQuerier) LabelNames() ([]string, storage.Warnings, error) {
141144
func (q *ShardedQuerier) Close() error {
142145
return nil
143146
}
147+
148+
func headersMapToPrometheusResponseHeaders(headersMap map[string][]string) (prs []*PrometheusResponseHeader) {
149+
for h, v := range headersMap {
150+
prs = append(prs, &PrometheusResponseHeader{Name: h, Values: v})
151+
}
152+
153+
return
154+
}

pkg/querier/queryrange/querysharding.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func (qs *queryShard) Do(ctx context.Context, r Request) (Response, error) {
236236
ResultType: string(res.Value.Type()),
237237
Result: extracted,
238238
},
239-
Headers: headersMapToPrometheusResponseHeaders(shardedQueryable.getResponseHeaders()),
239+
Headers: shardedQueryable.getResponseHeaders(),
240240
}, nil
241241
}
242242

@@ -322,11 +322,3 @@ func partitionRequest(r Request, t time.Time) (before Request, after Request) {
322322

323323
return r.WithStartEnd(r.GetStart(), boundary), r.WithStartEnd(boundary, r.GetEnd())
324324
}
325-
326-
func headersMapToPrometheusResponseHeaders(headersMap map[string][]string) (prs []*PrometheusResponseHeader) {
327-
for h, v := range headersMap {
328-
prs = append(prs, &PrometheusResponseHeader{Name: h, Values: v})
329-
}
330-
331-
return
332-
}

0 commit comments

Comments
 (0)