@@ -3,6 +3,12 @@ package handler
3
3
import (
4
4
"context"
5
5
"fmt"
6
+ "math"
7
+ "net/http"
8
+ "strconv"
9
+ "strings"
10
+ "time"
11
+
6
12
"github.com/cortexproject/cortex/pkg/querier/tripperware/instantquery"
7
13
"github.com/go-kit/log"
8
14
"github.com/go-kit/log/level"
@@ -11,19 +17,15 @@ import (
11
17
"github.com/pkg/errors"
12
18
"github.com/prometheus/common/model"
13
19
v1 "github.com/prometheus/prometheus/web/api/v1"
14
- "math"
15
- "net/http"
16
- "strconv"
17
- "strings"
18
- "time"
19
20
20
21
"github.com/cortexproject/cortex/pkg/cortexpb"
21
- github_com_cortexproject_cortex_pkg_cortexpb "github.com/cortexproject/cortex/pkg/cortexpb"
22
+ cortex_pb "github.com/cortexproject/cortex/pkg/cortexpb"
22
23
"github.com/cortexproject/cortex/pkg/querier/tripperware"
23
24
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
24
25
"github.com/prometheus/prometheus/promql"
25
26
"github.com/prometheus/prometheus/promql/parser"
26
27
"github.com/prometheus/prometheus/storage"
28
+ "github.com/prometheus/prometheus/util/annotations"
27
29
"github.com/prometheus/prometheus/util/httputil"
28
30
"github.com/prometheus/prometheus/util/stats"
29
31
thanos_api "github.com/thanos-io/thanos/pkg/api"
@@ -67,15 +69,15 @@ type response struct {
67
69
68
70
type API struct {
69
71
Queryable storage.SampleAndChunkQueryable
70
- QueryEngine v1 .QueryEngine
72
+ QueryEngine promql .QueryEngine
71
73
Now func () time.Time
72
74
Logger log.Logger
73
75
StatsRenderer v1.StatsRenderer
74
76
}
75
77
76
78
// NewAPI returns an initialized API type.
77
79
func NewAPI (
78
- qe v1 .QueryEngine ,
80
+ qe promql .QueryEngine ,
79
81
q storage.SampleAndChunkQueryable ,
80
82
logger log.Logger ,
81
83
statsRenderer v1.StatsRenderer ,
@@ -101,13 +103,13 @@ type queryData struct {
101
103
Stats stats.QueryStats `json:"stats,omitempty"`
102
104
}
103
105
104
- func invalidParamError (err error , parameter string ) (data interface {}, warnings [] error , error * thanos_api.ApiError , finalizer func ()) {
105
- return nil , nil , & thanos_api.ApiError {
106
+ func invalidParamError (err error , parameter string ) (data interface {}, error * thanos_api.ApiError , warnings annotations. Annotations , finalizer func ()) {
107
+ return nil , & thanos_api.ApiError {
106
108
thanos_api .ErrorBadData , errors .Wrapf (err , "invalid parameter %q" , parameter ),
107
- }, nil
109
+ }, nil , nil
108
110
}
109
111
110
- func (api * API ) Query (r * http.Request ) (data interface {}, warnings [] error , error * thanos_api.ApiError , finalizer func ()) {
112
+ func (api * API ) Query (r * http.Request ) (data interface {}, error * thanos_api.ApiError , warnings annotations. Annotations , finalizer func ()) {
111
113
ts , err := parseTimeParam (r , "time" , api .Now ())
112
114
if err != nil {
113
115
return invalidParamError (err , "time" )
@@ -120,13 +122,13 @@ func (api *API) Query(r *http.Request) (data interface{}, warnings []error, erro
120
122
return invalidParamError (err , "timeout" )
121
123
}
122
124
123
- ctx , cancel = context .WithTimeout (ctx , timeout )
125
+ ctx , cancel = context .WithDeadline (ctx , api . Now (). Add ( timeout ) )
124
126
defer cancel ()
125
127
}
126
128
127
129
opts , err := extractQueryOpts (r )
128
130
if err != nil {
129
- return nil , nil , & thanos_api.ApiError {thanos_api .ErrorBadData , err }, nil
131
+ return nil , & thanos_api.ApiError {thanos_api .ErrorBadData , err }, nil , nil
130
132
}
131
133
qry , err := api .QueryEngine .NewInstantQuery (ctx , api .Queryable , opts , r .FormValue ("query" ), ts )
132
134
if err != nil {
@@ -146,7 +148,7 @@ func (api *API) Query(r *http.Request) (data interface{}, warnings []error, erro
146
148
147
149
res := qry .Exec (ctx )
148
150
if res .Err != nil {
149
- return nil , res . Warnings , returnAPIError (res .Err ), qry .Close
151
+ return nil , returnAPIError (res .Err ), res . Warnings , qry .Close
150
152
}
151
153
152
154
// Optional stats field in response if parameter "stats" is not empty.
@@ -178,26 +180,26 @@ func (api *API) Query(r *http.Request) (data interface{}, warnings []error, erro
178
180
}
179
181
}
180
182
if err != nil {
181
- return nil , res . Warnings , & thanos_api.ApiError {thanos_api .ErrorBadData , err }, qry .Close
183
+ return nil , & thanos_api.ApiError {thanos_api .ErrorBadData , err }, res . Warnings , qry .Close
182
184
}
183
- return data , res .Warnings , nil , qry .Close
185
+ return data , nil , res .Warnings , qry .Close
184
186
}
185
187
186
- func extractQueryOpts (r * http.Request ) (* promql.QueryOpts , error ) {
187
- opts := & promql.QueryOpts {
188
- EnablePerStepStats : r .FormValue ("stats" ) == "all" ,
189
- }
188
+ func extractQueryOpts (r * http.Request ) (promql.QueryOpts , error ) {
189
+ var duration time.Duration
190
+
190
191
if strDuration := r .FormValue ("lookback_delta" ); strDuration != "" {
191
- duration , err := parseDuration (strDuration )
192
+ parsedDuration , err := parseDuration (strDuration )
192
193
if err != nil {
193
194
return nil , fmt .Errorf ("error parsing lookback delta duration: %w" , err )
194
195
}
195
- opts . LookbackDelta = duration
196
+ duration = parsedDuration
196
197
}
197
- return opts , nil
198
+
199
+ return promql .NewPrometheusQueryOpts (r .FormValue ("stats" ) == "all" , duration ), nil
198
200
}
199
201
200
- func (api * API ) QueryRange (r * http.Request ) (data interface {}, warnings [] error , error * thanos_api.ApiError , finalizer func ()) {
202
+ func (api * API ) QueryRange (r * http.Request ) (data interface {}, error * thanos_api.ApiError , warnings annotations. Annotations , finalizer func ()) {
201
203
start , err := parseTime (r .FormValue ("start" ))
202
204
if err != nil {
203
205
return invalidParamError (err , "start" )
@@ -223,7 +225,7 @@ func (api *API) QueryRange(r *http.Request) (data interface{}, warnings []error,
223
225
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
224
226
if end .Sub (start )/ step > 11000 {
225
227
err := errors .New ("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)" )
226
- return nil , nil , & thanos_api.ApiError {thanos_api .ErrorBadData , err }, nil
228
+ return nil , & thanos_api.ApiError {thanos_api .ErrorBadData , err }, nil , nil
227
229
}
228
230
229
231
ctx := r .Context ()
@@ -240,7 +242,7 @@ func (api *API) QueryRange(r *http.Request) (data interface{}, warnings []error,
240
242
241
243
opts , err := extractQueryOpts (r )
242
244
if err != nil {
243
- return nil , nil , & thanos_api.ApiError {thanos_api .ErrorBadData , err }, nil
245
+ return nil , & thanos_api.ApiError {thanos_api .ErrorBadData , err }, nil , nil
244
246
}
245
247
qry , err := api .QueryEngine .NewRangeQuery (ctx , api .Queryable , opts , r .FormValue ("query" ), start , end , step )
246
248
if err != nil {
@@ -259,7 +261,7 @@ func (api *API) QueryRange(r *http.Request) (data interface{}, warnings []error,
259
261
260
262
res := qry .Exec (ctx )
261
263
if res .Err != nil {
262
- return nil , res . Warnings , returnAPIError (res .Err ), qry .Close
264
+ return nil , returnAPIError (res .Err ), res . Warnings , qry .Close
263
265
}
264
266
265
267
// Optional stats field in response if parameter "stats" is not empty.
@@ -292,9 +294,9 @@ func (api *API) QueryRange(r *http.Request) (data interface{}, warnings []error,
292
294
}
293
295
294
296
if err != nil {
295
- return nil , res . Warnings , & thanos_api.ApiError {thanos_api .ErrorBadData , err }, qry .Close
297
+ return nil , & thanos_api.ApiError {thanos_api .ErrorBadData , err }, res . Warnings , qry .Close
296
298
}
297
- return data , res .Warnings , nil , qry .Close
299
+ return data , nil , res .Warnings , qry .Close
298
300
}
299
301
300
302
func parseTimeParam (r * http.Request , paramName string , defaultValue time.Time ) (time.Time , error ) {
@@ -325,9 +327,9 @@ func parseTime(s string) (time.Time, error) {
325
327
// Upstream issue: https://github.com/golang/go/issues/20555
326
328
switch s {
327
329
case minTimeFormatted :
328
- return minTime , nil
330
+ return v1 . MinTime , nil
329
331
case maxTimeFormatted :
330
- return maxTime , nil
332
+ return v1 . MaxTime , nil
331
333
}
332
334
return time.Time {}, errors .Errorf ("cannot parse %q to a valid timestamp" , s )
333
335
}
@@ -373,18 +375,13 @@ func returnAPIError(err error) *thanos_api.ApiError {
373
375
}
374
376
375
377
var (
376
- minTime = time .Unix (math .MinInt64 / 1000 + 62135596801 , 0 ).UTC ()
377
- maxTime = time .Unix (math .MaxInt64 / 1000 - 62135596801 , 999999999 ).UTC ()
378
-
379
- minTimeFormatted = minTime .Format (time .RFC3339Nano )
380
- maxTimeFormatted = maxTime .Format (time .RFC3339Nano )
378
+ minTimeFormatted = v1 .MinTime .Format (time .RFC3339Nano )
379
+ maxTimeFormatted = v1 .MaxTime .Format (time .RFC3339Nano )
381
380
)
382
381
383
- func (api * API ) Respond (w http.ResponseWriter , data interface {}, warnings storage.Warnings ) {
384
- var warningStrings []string
385
- for _ , warning := range warnings {
386
- warningStrings = append (warningStrings , warning .Error ())
387
- }
382
+ func (api * API ) Respond (w http.ResponseWriter , req * http.Request , data interface {}, warnings annotations.Annotations , query string ) {
383
+ statusMessage := statusSuccess
384
+
388
385
var b []byte
389
386
var err error
390
387
switch resp := data .(type ) {
@@ -393,26 +390,29 @@ func (api *API) Respond(w http.ResponseWriter, data interface{}, warnings storag
393
390
for h , hv := range w .Header () {
394
391
resp .Headers = append (resp .Headers , & tripperware.PrometheusResponseHeader {Name : h , Values : hv })
395
392
}
393
+ resp .Warnings = warnings .AsStrings (query , 10 )
396
394
b , err = proto .Marshal (resp )
397
395
case * instantquery.PrometheusInstantQueryResponse :
398
396
w .Header ().Set (contentTypeHeader , applicationProtobuf )
399
397
for h , hv := range w .Header () {
400
398
resp .Headers = append (resp .Headers , & tripperware.PrometheusResponseHeader {Name : h , Values : hv })
401
399
}
400
+ resp .Warnings = warnings .AsStrings (query , 10 )
402
401
b , err = proto .Marshal (resp )
403
402
case * queryData :
404
403
w .Header ().Set (contentTypeHeader , applicationJson )
405
404
json := jsoniter .ConfigCompatibleWithStandardLibrary
406
405
b , err = json .Marshal (& response {
407
- Status : statusSuccess ,
406
+ Status : statusMessage ,
408
407
Data : data ,
409
- Warnings : warningStrings ,
408
+ Warnings : warnings . AsStrings ( query , 10 ) ,
410
409
})
411
410
default :
412
411
level .Error (api .Logger ).Log ("msg" , "error asserting response type" )
413
412
http .Error (w , "error asserting response type" , http .StatusInternalServerError )
414
413
return
415
414
}
415
+
416
416
if err != nil {
417
417
level .Error (api .Logger ).Log ("msg" , "error marshaling response" , "err" , err )
418
418
http .Error (w , err .Error (), http .StatusInternalServerError )
@@ -558,11 +558,11 @@ func getSampleStreams(data *queryData) *[]tripperware.SampleStream {
558
558
559
559
for i := 0 ; i < sampleStreamsLen ; i ++ {
560
560
labelsLen := len (data .Result .(promql.Matrix )[i ].Metric )
561
- var labels []github_com_cortexproject_cortex_pkg_cortexpb .LabelAdapter
561
+ var labels []cortex_pb .LabelAdapter
562
562
if labelsLen > 0 {
563
- labels = make ([]github_com_cortexproject_cortex_pkg_cortexpb .LabelAdapter , labelsLen )
563
+ labels = make ([]cortex_pb .LabelAdapter , labelsLen )
564
564
for j := 0 ; j < labelsLen ; j ++ {
565
- labels [j ] = github_com_cortexproject_cortex_pkg_cortexpb .LabelAdapter {
565
+ labels [j ] = cortex_pb .LabelAdapter {
566
566
Name : data .Result .(promql.Matrix )[i ].Metric [j ].Name ,
567
567
Value : data .Result .(promql.Matrix )[i ].Metric [j ].Value ,
568
568
}
@@ -591,19 +591,19 @@ func getSamples(data *queryData) *[]*instantquery.Sample {
591
591
592
592
for i := 0 ; i < vectorSamplesLen ; i ++ {
593
593
labelsLen := len (data .Result .(promql.Vector )[i ].Metric )
594
- var labels []github_com_cortexproject_cortex_pkg_cortexpb .LabelAdapter
594
+ var labels []cortex_pb .LabelAdapter
595
595
if labelsLen > 0 {
596
- labels = make ([]github_com_cortexproject_cortex_pkg_cortexpb .LabelAdapter , labelsLen )
596
+ labels = make ([]cortex_pb .LabelAdapter , labelsLen )
597
597
for j := 0 ; j < labelsLen ; j ++ {
598
- labels [j ] = github_com_cortexproject_cortex_pkg_cortexpb .LabelAdapter {
598
+ labels [j ] = cortex_pb .LabelAdapter {
599
599
Name : data .Result .(promql.Vector )[i ].Metric [j ].Name ,
600
600
Value : data .Result .(promql.Vector )[i ].Metric [j ].Value ,
601
601
}
602
602
}
603
603
}
604
604
605
605
vectorSamples [i ] = & instantquery.Sample {Labels : labels ,
606
- Sample : cortexpb.Sample {
606
+ Sample : & cortexpb.Sample {
607
607
TimestampMs : data .Result .(promql.Vector )[i ].T ,
608
608
Value : data .Result .(promql.Vector )[i ].F ,
609
609
},
0 commit comments