diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index bd45780516b..8dc2954d46b 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -25,6 +25,7 @@ import ( // Client is a client used to interact with Cortex in integration tests type Client struct { alertmanagerClient promapi.Client + querierAddress string rulerAddress string distributorAddress string timeout time.Duration @@ -52,6 +53,7 @@ func NewClient( c := &Client{ distributorAddress: distributorAddress, + querierAddress: querierAddress, rulerAddress: rulerAddress, timeout: 5 * time.Second, httpClient: &http.Client{}, @@ -112,6 +114,32 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) { return value, err } +func (c *Client) QueryRaw(query string) (*http.Response, []byte, error) { + addr := fmt.Sprintf("http://%s/api/prom/api/v1/query?query=%s", c.querierAddress, url.QueryEscape(query)) + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "GET", addr, nil) + if err != nil { + return nil, nil, err + } + + req.Header.Set("X-Scope-OrgID", c.orgID) + + res, err := c.httpClient.Do(req) + if err != nil { + return nil, nil, err + } + defer res.Body.Close() + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return nil, nil, err + } + return res, body, nil +} + // LabelValues gets label values func (c *Client) LabelValues(label string) (model.LabelValues, error) { value, _, err := c.querierClient.LabelValues(context.Background(), label) diff --git a/integration/querier_test.go b/integration/querier_test.go index 2e208185a8a..a16242bb62a 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -709,6 +709,13 @@ func TestQuerierWithChunksStorage(t *testing.T) { c, err := e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", fmt.Sprintf("user-%d", userID)) require.NoError(t, err) + if userID == 0 { // No need to repeat this test for each user. + res, body, err := c.QueryRaw("{instance=~\"hello.*\"}") + require.NoError(t, err) + require.Equal(t, 422, res.StatusCode) + require.Contains(t, string(body), "query must contain metric name") + } + for q := 0; q < numQueriesPerUser; q++ { go func() { defer wg.Done() diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index cccf8470f76..9118f85817b 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -27,7 +27,7 @@ const ( type queryFrontendSetup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) func TestQueryFrontendWithBlocksStorageViaFlags(t *testing.T) { - runQueryFrontendTest(t, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + runQueryFrontendTest(t, false, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { minio := e2edb.NewMinio(9000, BlocksStorageFlags["-experimental.tsdb.s3.bucket-name"]) require.NoError(t, s.StartAndWaitReady(minio)) @@ -36,7 +36,7 @@ func TestQueryFrontendWithBlocksStorageViaFlags(t *testing.T) { } func TestQueryFrontendWithBlocksStorageViaConfigFile(t *testing.T) { - runQueryFrontendTest(t, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + runQueryFrontendTest(t, false, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig))) minio := e2edb.NewMinio(9000, BlocksStorageFlags["-experimental.tsdb.s3.bucket-name"]) @@ -47,7 +47,7 @@ func TestQueryFrontendWithBlocksStorageViaConfigFile(t *testing.T) { } func TestQueryFrontendWithChunksStorageViaFlags(t *testing.T) { - runQueryFrontendTest(t, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + runQueryFrontendTest(t, true, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml))) dynamo := e2edb.NewDynamoDB() @@ -65,7 +65,7 @@ func TestQueryFrontendWithChunksStorageViaFlags(t *testing.T) { } func TestQueryFrontendWithChunksStorageViaConfigFile(t *testing.T) { - runQueryFrontendTest(t, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + runQueryFrontendTest(t, true, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(ChunksStorageConfig))) require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml))) @@ -84,7 +84,7 @@ func TestQueryFrontendWithChunksStorageViaConfigFile(t *testing.T) { } func TestQueryFrontendTLSWithBlocksStorageViaFlags(t *testing.T) { - runQueryFrontendTest(t, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { + runQueryFrontendTest(t, false, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) { minio := e2edb.NewMinio(9000, BlocksStorageFlags["-experimental.tsdb.s3.bucket-name"]) require.NoError(t, s.StartAndWaitReady(minio)) @@ -107,7 +107,7 @@ func TestQueryFrontendTLSWithBlocksStorageViaFlags(t *testing.T) { }) } -func runQueryFrontendTest(t *testing.T, setup queryFrontendSetup) { +func runQueryFrontendTest(t *testing.T, testMissingMetricName bool, setup queryFrontendSetup) { const numUsers = 10 const numQueriesPerUser = 10 @@ -174,6 +174,14 @@ func runQueryFrontendTest(t *testing.T, setup queryFrontendSetup) { c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", fmt.Sprintf("user-%d", userID)) require.NoError(t, err) + // No need to repeat this test for each user. + if userID == 0 && testMissingMetricName { + res, body, err := c.QueryRaw("{instance=~\"hello.*\"}") + require.NoError(t, err) + require.Equal(t, 422, res.StatusCode) + require.Contains(t, string(body), "query must contain metric name") + } + for q := 0; q < numQueriesPerUser; q++ { go func() { defer wg.Done() @@ -188,7 +196,11 @@ func runQueryFrontendTest(t *testing.T, setup queryFrontendSetup) { wg.Wait() - require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser), "cortex_query_frontend_queries_total")) + extra := float64(0) + if testMissingMetricName { + extra = 1 + } + require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser+extra), "cortex_query_frontend_queries_total")) // Ensure no service-specific metrics prefix is used by the wrong service. assertServiceMetricsPrefixes(t, Distributor, distributor) diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index f705ccd2388..41e5d8380ab 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -4,7 +4,6 @@ import ( "context" "flag" "fmt" - "net/http" "sort" "sync" "time" @@ -15,8 +14,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/promql" - "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/util" @@ -26,11 +23,10 @@ import ( ) var ( + ErrQueryMustContainMetricName = QueryError("query must contain metric name") ErrMetricNameLabelMissing = errors.New("metric name label missing") ErrParialDeleteChunkNoOverlap = errors.New("interval for partial deletion has not overlap with chunk interval") -) -var ( indexEntriesPerChunk = promauto.NewHistogram(prometheus.HistogramOpts{ Namespace: "cortex", Name: "chunk_store_index_entries_per_chunk", @@ -44,6 +40,13 @@ var ( }) ) +// Query errors are to be treated as user errors, rather than storage errors. +type QueryError string + +func (e QueryError) Error() string { + return string(e) +} + // StoreConfig specifies config for a ChunkStore type StoreConfig struct { ChunkCacheConfig cache.Config `yaml:"chunk_cache_config"` @@ -286,12 +289,12 @@ func (c *baseStore) validateQueryTimeRange(ctx context.Context, userID string, f defer log.Span.Finish() if *through < *from { - return false, httpgrpc.Errorf(http.StatusBadRequest, "invalid query, through < from (%s < %s)", through, from) + return false, QueryError(fmt.Sprintf("invalid query, through < from (%s < %s)", through, from)) } maxQueryLength := c.limits.MaxQueryLength(userID) if maxQueryLength > 0 && (*through).Sub(*from) > maxQueryLength { - return false, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, (*through).Sub(*from), maxQueryLength) + return false, QueryError(fmt.Sprintf(validation.ErrQueryTooLong, (*through).Sub(*from), maxQueryLength)) } now := model.Now() @@ -333,7 +336,7 @@ func (c *baseStore) validateQuery(ctx context.Context, userID string, from *mode // Check there is a metric name matcher of type equal, metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(matchers) if !ok || metricNameMatcher.Type != labels.MatchEqual { - return "", nil, false, httpgrpc.Errorf(http.StatusBadRequest, "query must contain metric name") + return "", nil, false, ErrQueryMustContainMetricName } return metricNameMatcher.Value, matchers, false, nil @@ -357,7 +360,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, userID string, from, th maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID) if maxChunksPerQuery > 0 && len(filtered) > maxChunksPerQuery { - err := httpgrpc.Errorf(http.StatusBadRequest, "Query %v fetched too many chunks (%d > %d)", allMatchers, len(filtered), maxChunksPerQuery) + err := QueryError(fmt.Sprintf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(filtered), maxChunksPerQuery)) level.Error(log).Log("err", err) return nil, err } @@ -366,7 +369,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, userID string, from, th keys := keysFromChunks(filtered) allChunks, err := c.FetchChunks(ctx, filtered, keys) if err != nil { - return nil, promql.ErrStorage{Err: err} + return nil, err } // Filter out chunks based on the empty matchers in the query. diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 0ed3ae37267..7020937515a 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -179,7 +179,7 @@ func TestChunkStore_Get(t *testing.T) { }, { query: `{__name__=~"foo"}`, - err: "rpc error: code = Code(400) desc = query must contain metric name", + err: "query must contain metric name", }, } for _, schema := range schemas { @@ -908,25 +908,25 @@ func TestChunkStoreError(t *testing.T) { query: "foo", from: model.Time(0).Add(31 * 24 * time.Hour), through: model.Time(0), - err: "rpc error: code = Code(400) desc = invalid query, through < from (0 < 2678400)", + err: "invalid query, through < from (0 < 2678400)", }, { query: "foo", from: model.Time(0), through: model.Time(0).Add(31 * 24 * time.Hour), - err: "rpc error: code = Code(400) desc = invalid query, length > limit (744h0m0s > 720h0m0s)", + err: "invalid query, length > limit (744h0m0s > 720h0m0s)", }, { query: "{foo=\"bar\"}", from: model.Time(0), through: model.Time(0).Add(1 * time.Hour), - err: "rpc error: code = Code(400) desc = query must contain metric name", + err: "query must contain metric name", }, { query: "{__name__=~\"bar\"}", from: model.Time(0), through: model.Time(0).Add(1 * time.Hour), - err: "rpc error: code = Code(400) desc = query must contain metric name", + err: "query must contain metric name", }, } { for _, schema := range schemas { diff --git a/pkg/chunk/series_store.go b/pkg/chunk/series_store.go index 1495d6579fb..39cecbbe4a1 100644 --- a/pkg/chunk/series_store.go +++ b/pkg/chunk/series_store.go @@ -3,7 +3,6 @@ package chunk import ( "context" "fmt" - "net/http" "time" "github.com/go-kit/kit/log/level" @@ -13,7 +12,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/querier/astmapper" @@ -116,7 +114,7 @@ func (c *seriesStore) Get(ctx context.Context, userID string, from, through mode // Protect ourselves against OOMing. maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID) if maxChunksPerQuery > 0 && len(chunks) > maxChunksPerQuery { - err := httpgrpc.Errorf(http.StatusBadRequest, "Query %v fetched too many chunks (%d > %d)", allMatchers, len(chunks), maxChunksPerQuery) + err := QueryError(fmt.Sprintf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(chunks), maxChunksPerQuery)) level.Error(log).Log("err", err) return nil, err } diff --git a/pkg/querier/chunk_store_queryable.go b/pkg/querier/chunk_store_queryable.go index 8b0ad869d61..162ab736b29 100644 --- a/pkg/querier/chunk_store_queryable.go +++ b/pkg/querier/chunk_store_queryable.go @@ -46,7 +46,19 @@ func (q *chunkStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ... } chunks, err := q.store.Get(q.ctx, userID, model.Time(sp.Start), model.Time(sp.End), matchers...) if err != nil { - return nil, nil, promql.ErrStorage{Err: err} + switch err.(type) { + case promql.ErrStorage, promql.ErrTooManySamples, promql.ErrQueryCanceled, promql.ErrQueryTimeout: + // Recognized by Prometheus API, vendor/github.com/prometheus/prometheus/promql/engine.go:91. + // Don't translate those, just in case we use them internally. + return nil, nil, err + case chunk.QueryError: + // This will be returned with status code 422 by Prometheus API. + // vendor/github.com/prometheus/prometheus/web/api/v1/api.go:1393 + return nil, nil, err + default: + // All other errors will be returned as 500. + return nil, nil, promql.ErrStorage{Err: err} + } } return partitionChunks(chunks, q.mint, q.maxt, q.chunkIteratorFunc), nil, nil diff --git a/pkg/querier/chunk_store_queryable_test.go b/pkg/querier/chunk_store_queryable_test.go index 76cc8074243..43d2a4c56b2 100644 --- a/pkg/querier/chunk_store_queryable_test.go +++ b/pkg/querier/chunk_store_queryable_test.go @@ -2,17 +2,29 @@ package querier import ( "context" + "errors" "fmt" + "net/http" + "net/http/httptest" + "regexp" "sort" "testing" "time" "github.com/prometheus/common/model" + "github.com/prometheus/common/route" + "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/promql" + v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/chunk" promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" + "github.com/cortexproject/cortex/pkg/querier/chunkstore" + "github.com/cortexproject/cortex/pkg/util" ) // Make sure that chunkSeries implements SeriesWithChunks @@ -100,3 +112,105 @@ type sortedByLabels []labels.Labels func (b sortedByLabels) Len() int { return len(b) } func (b sortedByLabels) Swap(i, j int) { b[i], b[j] = b[j], b[i] } func (b sortedByLabels) Less(i, j int) bool { return labels.Compare(b[i], b[j]) < 0 } + +func TestApiStatusCodes(t *testing.T) { + for ix, tc := range []struct { + err error + expectedString string + expectedCode int + }{ + { + err: errors.New("some random error"), + expectedString: "some random error", + expectedCode: 500, + }, + + { + err: chunk.QueryError("special handling"), // handled specially by chunk_store_queryable + expectedString: "special handling", + expectedCode: 422, + }, + + { + err: promql.ErrTooManySamples("query execution"), + expectedString: "too many samples", + expectedCode: 422, + }, + + { + err: promql.ErrQueryCanceled("query execution"), + expectedString: "query was canceled", + expectedCode: 503, + }, + + { + err: promql.ErrQueryTimeout("query execution"), + expectedString: "query timed out", + expectedCode: 503, + }, + + // Unfortunately, queryable cannot return anything else than 500 or 422. + { + err: httpgrpc.Errorf(http.StatusBadRequest, "test string"), + expectedString: "test string", + expectedCode: 500, + }, + } { + t.Run(fmt.Sprintf("%d", ix), func(t *testing.T) { + r := createPrometheusAPI(testStore{err: tc.err}) + rec := httptest.NewRecorder() + + req := httptest.NewRequest("GET", "/api/v1/query?query=up", nil) + req = req.WithContext(user.InjectOrgID(context.Background(), "test org")) + + r.ServeHTTP(rec, req) + + require.Equal(t, tc.expectedCode, rec.Code) + require.Contains(t, rec.Body.String(), tc.expectedString) + }) + } +} + +func createPrometheusAPI(store chunkstore.ChunkStore) *route.Router { + engine := promql.NewEngine(promql.EngineOpts{ + Logger: util.Logger, + Reg: nil, + ActiveQueryTracker: nil, + MaxSamples: 100, + Timeout: 5 * time.Second, + }) + + queryable := newChunkStoreQueryable(store, mergeChunks) + + api := v1.NewAPI( + engine, + queryable, + DummyTargetRetriever{}, + DummyAlertmanagerRetriever{}, + func() config.Config { return config.Config{} }, + map[string]string{}, // TODO: include configuration flags + v1.GlobalURLOptions{}, + func(f http.HandlerFunc) http.HandlerFunc { return f }, + func() v1.TSDBAdmin { return nil }, // Only needed for admin APIs. + false, // Disable admin APIs. + util.Logger, + DummyRulesRetriever{}, + 0, 0, 0, // Remote read samples and concurrency limit. + regexp.MustCompile(".*"), + func() (v1.RuntimeInfo, error) { return v1.RuntimeInfo{}, errors.New("not implemented") }, + &v1.PrometheusVersion{}, + ) + + promRouter := route.New().WithPrefix("/api/v1") + api.Register(promRouter) + + return promRouter +} + +type testStore struct { + err error +} + +func (t testStore) Get(context.Context, string, model.Time, model.Time, ...*labels.Matcher) ([]chunk.Chunk, error) { + return nil, t.err +}