Skip to content

Commit 8007a94

Browse files
committed
Add dedicated instant/range query handlers
Signed-off-by: SungJin1212 <[email protected]>
1 parent 7046357 commit 8007a94

File tree

4 files changed

+708
-11
lines changed

4 files changed

+708
-11
lines changed

pkg/api/handlers.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
"github.com/cortexproject/cortex/pkg/querier"
2929
"github.com/cortexproject/cortex/pkg/querier/codec"
30+
"github.com/cortexproject/cortex/pkg/querier/customapi"
3031
"github.com/cortexproject/cortex/pkg/querier/stats"
3132
"github.com/cortexproject/cortex/pkg/util"
3233
util_log "github.com/cortexproject/cortex/pkg/util/log"
@@ -195,6 +196,8 @@ func NewQuerierHandler(
195196
Help: "Current number of inflight requests to the querier.",
196197
}, []string{"method", "route"})
197198

199+
statsRenderer := querier.StatsRenderer
200+
corsOrigin := regexp.MustCompile(".*")
198201
api := v1.NewAPI(
199202
engine,
200203
querier.NewErrorTranslateSampleAndChunkQueryable(queryable), // Translate errors to errors expected by API.
@@ -214,7 +217,7 @@ func NewQuerierHandler(
214217
func(context.Context) v1.RulesRetriever { return &querier.DummyRulesRetriever{} },
215218
0, 0, 0, // Remote read samples and concurrency limit.
216219
false,
217-
regexp.MustCompile(".*"),
220+
corsOrigin,
218221
func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, errors.New("not implemented") },
219222
&v1.PrometheusVersion{
220223
Version: version.Version,
@@ -229,7 +232,7 @@ func NewQuerierHandler(
229232
// This is used for the stats API which we should not support. Or find other ways to.
230233
prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { return nil, nil }),
231234
reg,
232-
querier.StatsRenderer,
235+
statsRenderer,
233236
false,
234237
nil,
235238
false,
@@ -240,11 +243,18 @@ func NewQuerierHandler(
240243
api.ClearCodecs()
241244
cm := codec.NewInstrumentedCodecMetrics(reg)
242245

243-
api.InstallCodec(codec.NewInstrumentedCodec(v1.JSONCodec{}, cm))
244-
// Install Protobuf codec to give the option for using either.
245-
api.InstallCodec(codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: false}, cm))
246-
// Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation.
247-
api.InstallCodec(codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: true}, cm))
246+
codecs := []v1.Codec{
247+
codec.NewInstrumentedCodec(v1.JSONCodec{}, cm),
248+
// Protobuf codec to give the option for using either.
249+
codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: false}, cm),
250+
// Protobuf codec for Cortex internal requests. This should be used by Cortex Ruler only for remote evaluation.
251+
codec.NewInstrumentedCodec(codec.ProtobufCodec{CortexInternal: true}, cm),
252+
}
253+
254+
// Install codecs
255+
for _, c := range codecs {
256+
api.InstallCodec(c)
257+
}
248258

249259
router := mux.NewRouter()
250260

@@ -269,13 +279,15 @@ func NewQuerierHandler(
269279
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
270280
api.Register(legacyPromRouter)
271281

282+
c := customapi.NewCustomAPI(engine, queryable, statsRenderer, logger, codecs, corsOrigin)
283+
272284
// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
273285
// https://github.com/prometheus/prometheus/pull/7125/files
274286
router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
275287
router.Path(path.Join(prefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
276288
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter)
277-
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(promRouter)
278-
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(promRouter)
289+
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(c.Wrap(c.InstantHandler))
290+
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(c.Wrap(c.RangeQueryHandler))
279291
router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(promRouter)
280292
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(promRouter)
281293
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(promRouter)
@@ -287,8 +299,8 @@ func NewQuerierHandler(
287299
router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier))
288300
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger))
289301
router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter)
290-
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(legacyPromRouter)
291-
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(legacyPromRouter)
302+
router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(c.Wrap(c.InstantHandler))
303+
router.Path(path.Join(legacyPrefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(c.Wrap(c.RangeQueryHandler))
292304
router.Path(path.Join(legacyPrefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(legacyPromRouter)
293305
router.Path(path.Join(legacyPrefix, "/api/v1/labels")).Methods("GET", "POST").Handler(legacyPromRouter)
294306
router.Path(path.Join(legacyPrefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(legacyPromRouter)
Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,289 @@
1+
package customapi
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net/http"
8+
"time"
9+
10+
"github.com/go-kit/log"
11+
"github.com/go-kit/log/level"
12+
"github.com/grafana/regexp"
13+
jsoniter "github.com/json-iterator/go"
14+
"github.com/munnerz/goautoneg"
15+
"github.com/prometheus/prometheus/promql"
16+
"github.com/prometheus/prometheus/storage"
17+
"github.com/prometheus/prometheus/util/annotations"
18+
"github.com/prometheus/prometheus/util/httputil"
19+
v1 "github.com/prometheus/prometheus/web/api/v1"
20+
)
21+
22+
type CustomAPI struct {
23+
queryable storage.SampleAndChunkQueryable
24+
queryEngine promql.QueryEngine
25+
now func() time.Time
26+
statsRenderer v1.StatsRenderer
27+
logger log.Logger
28+
codecs []v1.Codec
29+
CORSOrigin *regexp.Regexp
30+
}
31+
32+
func NewCustomAPI(
33+
qe promql.QueryEngine,
34+
q storage.SampleAndChunkQueryable,
35+
statsRenderer v1.StatsRenderer,
36+
logger log.Logger,
37+
codecs []v1.Codec,
38+
CORSOrigin *regexp.Regexp,
39+
) *CustomAPI {
40+
return &CustomAPI{
41+
queryable: q,
42+
queryEngine: qe,
43+
now: time.Now,
44+
statsRenderer: statsRenderer,
45+
logger: logger,
46+
codecs: codecs,
47+
CORSOrigin: CORSOrigin,
48+
}
49+
}
50+
51+
// Custom handler for Query range API
52+
func (c *CustomAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
53+
start, err := parseTime(r.FormValue("start"))
54+
if err != nil {
55+
return invalidParamError(err, "start")
56+
}
57+
end, err := parseTime(r.FormValue("end"))
58+
if err != nil {
59+
return invalidParamError(err, "end")
60+
}
61+
if end.Before(start) {
62+
return invalidParamError(errors.New("end timestamp must not be before start time"), "end")
63+
}
64+
65+
step, err := parseDuration(r.FormValue("step"))
66+
if err != nil {
67+
return invalidParamError(err, "step")
68+
}
69+
70+
if step <= 0 {
71+
return invalidParamError(errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer"), "step")
72+
}
73+
74+
// For safety, limit the number of returned points per timeseries.
75+
// This is sufficient for 60s resolution for a week or 1h resolution for a year.
76+
if end.Sub(start)/step > 11000 {
77+
err := errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)")
78+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
79+
}
80+
81+
ctx := r.Context()
82+
if to := r.FormValue("timeout"); to != "" {
83+
var cancel context.CancelFunc
84+
timeout, err := parseDuration(to)
85+
if err != nil {
86+
return invalidParamError(err, "timeout")
87+
}
88+
89+
ctx, cancel = context.WithTimeout(ctx, timeout)
90+
defer cancel()
91+
}
92+
93+
opts, err := extractQueryOpts(r)
94+
if err != nil {
95+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
96+
}
97+
qry, err := c.queryEngine.NewRangeQuery(ctx, c.queryable, opts, r.FormValue("query"), start, end, step)
98+
if err != nil {
99+
return invalidParamError(err, "query")
100+
}
101+
// From now on, we must only return with a finalizer in the result (to
102+
// be called by the caller) or call qry.Close ourselves (which is
103+
// required in the case of a panic).
104+
defer func() {
105+
if result.finalizer == nil {
106+
qry.Close()
107+
}
108+
}()
109+
110+
ctx = httputil.ContextFromRequest(ctx, r)
111+
112+
res := qry.Exec(ctx)
113+
if res.Err != nil {
114+
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
115+
}
116+
117+
warnings := res.Warnings
118+
qs := c.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))
119+
120+
return apiFuncResult{&v1.QueryData{
121+
ResultType: res.Value.Type(),
122+
Result: res.Value,
123+
Stats: qs,
124+
}, nil, warnings, qry.Close}
125+
}
126+
127+
// Custom handler for Query API
128+
func (c *CustomAPI) InstantHandler(r *http.Request) (result apiFuncResult) {
129+
ts, err := parseTimeParam(r, "time", c.now())
130+
if err != nil {
131+
return invalidParamError(err, "time")
132+
}
133+
134+
ctx := r.Context()
135+
if to := r.FormValue("timeout"); to != "" {
136+
var cancel context.CancelFunc
137+
timeout, err := parseDuration(to)
138+
if err != nil {
139+
return invalidParamError(err, "timeout")
140+
}
141+
142+
ctx, cancel = context.WithDeadline(ctx, c.now().Add(timeout))
143+
defer cancel()
144+
}
145+
146+
opts, err := extractQueryOpts(r)
147+
if err != nil {
148+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
149+
}
150+
qry, err := c.queryEngine.NewInstantQuery(ctx, c.queryable, opts, r.FormValue("query"), ts)
151+
if err != nil {
152+
return invalidParamError(err, "query")
153+
}
154+
155+
// From now on, we must only return with a finalizer in the result (to
156+
// be called by the caller) or call qry.Close ourselves (which is
157+
// required in the case of a panic).
158+
defer func() {
159+
if result.finalizer == nil {
160+
qry.Close()
161+
}
162+
}()
163+
164+
ctx = httputil.ContextFromRequest(ctx, r)
165+
166+
res := qry.Exec(ctx)
167+
if res.Err != nil {
168+
return apiFuncResult{nil, returnAPIError(res.Err), res.Warnings, qry.Close}
169+
}
170+
171+
warnings := res.Warnings
172+
qs := c.statsRenderer(ctx, qry.Stats(), r.FormValue("stats"))
173+
174+
return apiFuncResult{&v1.QueryData{
175+
ResultType: res.Value.Type(),
176+
Result: res.Value,
177+
Stats: qs,
178+
}, nil, warnings, qry.Close}
179+
}
180+
181+
func (c *CustomAPI) Wrap(f apiFunc) http.HandlerFunc {
182+
return func(w http.ResponseWriter, r *http.Request) {
183+
httputil.SetCORS(w, c.CORSOrigin, r)
184+
185+
result := f(r)
186+
if result.finalizer != nil {
187+
defer result.finalizer()
188+
}
189+
190+
if result.err != nil {
191+
c.respondError(w, result.err, result.data)
192+
return
193+
}
194+
195+
if result.data != nil {
196+
c.respond(w, r, result.data, result.warnings, r.FormValue("query"))
197+
return
198+
}
199+
w.WriteHeader(http.StatusNoContent)
200+
}
201+
}
202+
203+
func (c *CustomAPI) respondError(w http.ResponseWriter, apiErr *apiError, data interface{}) {
204+
json := jsoniter.ConfigCompatibleWithStandardLibrary
205+
b, err := json.Marshal(&response{
206+
Status: statusError,
207+
ErrorType: apiErr.typ,
208+
Error: apiErr.err.Error(),
209+
Data: data,
210+
})
211+
if err != nil {
212+
level.Error(c.logger).Log("error marshaling json response", "err", err)
213+
http.Error(w, err.Error(), http.StatusInternalServerError)
214+
return
215+
}
216+
217+
var code int
218+
switch apiErr.typ {
219+
case errorBadData:
220+
code = http.StatusBadRequest
221+
case errorExec:
222+
code = http.StatusUnprocessableEntity
223+
case errorCanceled:
224+
code = statusClientClosedConnection
225+
case errorTimeout:
226+
code = http.StatusServiceUnavailable
227+
case errorInternal:
228+
code = http.StatusInternalServerError
229+
case errorNotFound:
230+
code = http.StatusNotFound
231+
case errorNotAcceptable:
232+
code = http.StatusNotAcceptable
233+
default:
234+
code = http.StatusInternalServerError
235+
}
236+
237+
w.Header().Set("Content-Type", "application/json")
238+
w.WriteHeader(code)
239+
if n, err := w.Write(b); err != nil {
240+
level.Error(c.logger).Log("error writing response", "bytesWritten", n, "err", err)
241+
}
242+
}
243+
244+
func (c *CustomAPI) respond(w http.ResponseWriter, req *http.Request, data interface{}, warnings annotations.Annotations, query string) {
245+
warn, info := warnings.AsStrings(query, 10, 10)
246+
247+
resp := &v1.Response{
248+
Status: statusSuccess,
249+
Data: data,
250+
Warnings: warn,
251+
Infos: info,
252+
}
253+
254+
codec, err := c.negotiateCodec(req, resp)
255+
if err != nil {
256+
c.respondError(w, &apiError{errorNotAcceptable, err}, nil)
257+
return
258+
}
259+
260+
b, err := codec.Encode(resp)
261+
if err != nil {
262+
level.Error(c.logger).Log("error marshaling response", "url", req.URL, "err", err)
263+
http.Error(w, err.Error(), http.StatusInternalServerError)
264+
return
265+
}
266+
267+
w.Header().Set("Content-Type", codec.ContentType().String())
268+
w.WriteHeader(http.StatusOK)
269+
if n, err := w.Write(b); err != nil {
270+
level.Error(c.logger).Log("error writing response", "url", req.URL, "bytesWritten", n, "err", err)
271+
}
272+
}
273+
274+
func (c *CustomAPI) negotiateCodec(req *http.Request, resp *v1.Response) (v1.Codec, error) {
275+
for _, clause := range goautoneg.ParseAccept(req.Header.Get("Accept")) {
276+
for _, codec := range c.codecs {
277+
if codec.ContentType().Satisfies(clause) && codec.CanEncode(resp) {
278+
return codec, nil
279+
}
280+
}
281+
}
282+
283+
defaultCodec := c.codecs[0]
284+
if !defaultCodec.CanEncode(resp) {
285+
return nil, fmt.Errorf("cannot encode response as %s", defaultCodec.ContentType())
286+
}
287+
288+
return defaultCodec, nil
289+
}

0 commit comments

Comments
 (0)