Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
// Client is a client used to interact with Cortex in integration tests
type Client struct {
alertmanagerClient promapi.Client
querierAddress string
rulerAddress string
distributorAddress string
timeout time.Duration
Expand Down Expand Up @@ -52,6 +53,7 @@ func NewClient(

c := &Client{
distributorAddress: distributorAddress,
querierAddress: querierAddress,
rulerAddress: rulerAddress,
timeout: 5 * time.Second,
httpClient: &http.Client{},
Expand Down Expand Up @@ -112,6 +114,32 @@ func (c *Client) Query(query string, ts time.Time) (model.Value, error) {
return value, err
}

func (c *Client) QueryRaw(query string) (*http.Response, []byte, error) {
addr := fmt.Sprintf("http://%s/api/prom/api/v1/query?query=%s", c.querierAddress, url.QueryEscape(query))

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, "GET", addr, nil)
if err != nil {
return nil, nil, err
}

req.Header.Set("X-Scope-OrgID", c.orgID)

res, err := c.httpClient.Do(req)
if err != nil {
return nil, nil, err
}
defer res.Body.Close()

body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, nil, err
}
return res, body, nil
}

// LabelValues gets label values
func (c *Client) LabelValues(label string) (model.LabelValues, error) {
value, _, err := c.querierClient.LabelValues(context.Background(), label)
Expand Down
7 changes: 7 additions & 0 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,13 @@ func TestQuerierWithChunksStorage(t *testing.T) {
c, err := e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", fmt.Sprintf("user-%d", userID))
require.NoError(t, err)

if userID == 0 { // No need to repeat this test for each user.
res, body, err := c.QueryRaw("{instance=~\"hello.*\"}")
require.NoError(t, err)
require.Equal(t, 422, res.StatusCode)
require.Contains(t, string(body), "query must contain metric name")
}

for q := 0; q < numQueriesPerUser; q++ {
go func() {
defer wg.Done()
Expand Down
26 changes: 19 additions & 7 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
type queryFrontendSetup func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string)

func TestQueryFrontendWithBlocksStorageViaFlags(t *testing.T) {
runQueryFrontendTest(t, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
runQueryFrontendTest(t, false, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
minio := e2edb.NewMinio(9000, BlocksStorageFlags["-experimental.tsdb.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

Expand All @@ -36,7 +36,7 @@ func TestQueryFrontendWithBlocksStorageViaFlags(t *testing.T) {
}

func TestQueryFrontendWithBlocksStorageViaConfigFile(t *testing.T) {
runQueryFrontendTest(t, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
runQueryFrontendTest(t, false, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(BlocksStorageConfig)))

minio := e2edb.NewMinio(9000, BlocksStorageFlags["-experimental.tsdb.s3.bucket-name"])
Expand All @@ -47,7 +47,7 @@ func TestQueryFrontendWithBlocksStorageViaConfigFile(t *testing.T) {
}

func TestQueryFrontendWithChunksStorageViaFlags(t *testing.T) {
runQueryFrontendTest(t, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
runQueryFrontendTest(t, true, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))

dynamo := e2edb.NewDynamoDB()
Expand All @@ -65,7 +65,7 @@ func TestQueryFrontendWithChunksStorageViaFlags(t *testing.T) {
}

func TestQueryFrontendWithChunksStorageViaConfigFile(t *testing.T) {
runQueryFrontendTest(t, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
runQueryFrontendTest(t, true, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
require.NoError(t, writeFileToSharedDir(s, cortexConfigFile, []byte(ChunksStorageConfig)))
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))

Expand All @@ -84,7 +84,7 @@ func TestQueryFrontendWithChunksStorageViaConfigFile(t *testing.T) {
}

func TestQueryFrontendTLSWithBlocksStorageViaFlags(t *testing.T) {
runQueryFrontendTest(t, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
runQueryFrontendTest(t, false, func(t *testing.T, s *e2e.Scenario) (configFile string, flags map[string]string) {
minio := e2edb.NewMinio(9000, BlocksStorageFlags["-experimental.tsdb.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

Expand All @@ -107,7 +107,7 @@ func TestQueryFrontendTLSWithBlocksStorageViaFlags(t *testing.T) {
})
}

func runQueryFrontendTest(t *testing.T, setup queryFrontendSetup) {
func runQueryFrontendTest(t *testing.T, testMissingMetricName bool, setup queryFrontendSetup) {
const numUsers = 10
const numQueriesPerUser = 10

Expand Down Expand Up @@ -174,6 +174,14 @@ func runQueryFrontendTest(t *testing.T, setup queryFrontendSetup) {
c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", fmt.Sprintf("user-%d", userID))
require.NoError(t, err)

// No need to repeat this test for each user.
if userID == 0 && testMissingMetricName {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[thinking loudly] I'm a bit dubious about adding this further test here. The query-frontend integration test is become the place where everyone adds further cases and we risk to make it just more and more complicated. This case is already covered by the assertion you added to TestQuerierWithChunksStorage, so do we really need this too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally I suspected that status code is mangled by query-frontend and wanted to make sure that it is propagated properly. Repeating some of the tests between querier and query-frontend makes sense imho, and we should perhaps refactor the code to avoid duplicated code, but repeat those tests.

res, body, err := c.QueryRaw("{instance=~\"hello.*\"}")
require.NoError(t, err)
require.Equal(t, 422, res.StatusCode)
require.Contains(t, string(body), "query must contain metric name")
}

for q := 0; q < numQueriesPerUser; q++ {
go func() {
defer wg.Done()
Expand All @@ -188,7 +196,11 @@ func runQueryFrontendTest(t *testing.T, setup queryFrontendSetup) {

wg.Wait()

require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser), "cortex_query_frontend_queries_total"))
extra := float64(0)
if testMissingMetricName {
extra = 1
}
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser+extra), "cortex_query_frontend_queries_total"))

// Ensure no service-specific metrics prefix is used by the wrong service.
assertServiceMetricsPrefixes(t, Distributor, distributor)
Expand Down
23 changes: 13 additions & 10 deletions pkg/chunk/chunk_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"flag"
"fmt"
"net/http"
"sort"
"sync"
"time"
Expand All @@ -15,8 +14,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/util"
Expand All @@ -26,11 +23,10 @@ import (
)

var (
ErrQueryMustContainMetricName = QueryError("query must contain metric name")
ErrMetricNameLabelMissing = errors.New("metric name label missing")
ErrParialDeleteChunkNoOverlap = errors.New("interval for partial deletion has not overlap with chunk interval")
)

var (
indexEntriesPerChunk = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "chunk_store_index_entries_per_chunk",
Expand All @@ -44,6 +40,13 @@ var (
})
)

// Query errors are to be treated as user errors, rather than storage errors.
type QueryError string

func (e QueryError) Error() string {
return string(e)
}

// StoreConfig specifies config for a ChunkStore
type StoreConfig struct {
ChunkCacheConfig cache.Config `yaml:"chunk_cache_config"`
Expand Down Expand Up @@ -286,12 +289,12 @@ func (c *baseStore) validateQueryTimeRange(ctx context.Context, userID string, f
defer log.Span.Finish()

if *through < *from {
return false, httpgrpc.Errorf(http.StatusBadRequest, "invalid query, through < from (%s < %s)", through, from)
return false, QueryError(fmt.Sprintf("invalid query, through < from (%s < %s)", through, from))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not using httpgrpc.Errorf() in validateQueryTimeRange() changes the behaviour for LabelValuesForMetricName() and LabelNamesForMetricName() where we want to return an httpgrpc error. An option could be always wrapping the error there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LabelValuesForMetricName and LabelNamesForMetricName are not even used in Cortex :) (but they are in Loki).

I think Loki should recognize QueryError and convert it to appropriate error in the layer it is using. Store itself should not return gRPC errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make sure that it is handled correctly in Loki, if this PR is merged.

}

maxQueryLength := c.limits.MaxQueryLength(userID)
if maxQueryLength > 0 && (*through).Sub(*from) > maxQueryLength {
return false, httpgrpc.Errorf(http.StatusBadRequest, validation.ErrQueryTooLong, (*through).Sub(*from), maxQueryLength)
return false, QueryError(fmt.Sprintf(validation.ErrQueryTooLong, (*through).Sub(*from), maxQueryLength))
}

now := model.Now()
Expand Down Expand Up @@ -333,7 +336,7 @@ func (c *baseStore) validateQuery(ctx context.Context, userID string, from *mode
// Check there is a metric name matcher of type equal,
metricNameMatcher, matchers, ok := extract.MetricNameMatcherFromMatchers(matchers)
if !ok || metricNameMatcher.Type != labels.MatchEqual {
return "", nil, false, httpgrpc.Errorf(http.StatusBadRequest, "query must contain metric name")
return "", nil, false, ErrQueryMustContainMetricName
}

return metricNameMatcher.Value, matchers, false, nil
Expand All @@ -357,7 +360,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, userID string, from, th

maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID)
if maxChunksPerQuery > 0 && len(filtered) > maxChunksPerQuery {
err := httpgrpc.Errorf(http.StatusBadRequest, "Query %v fetched too many chunks (%d > %d)", allMatchers, len(filtered), maxChunksPerQuery)
err := QueryError(fmt.Sprintf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(filtered), maxChunksPerQuery))
level.Error(log).Log("err", err)
return nil, err
}
Expand All @@ -366,7 +369,7 @@ func (c *store) getMetricNameChunks(ctx context.Context, userID string, from, th
keys := keysFromChunks(filtered)
allChunks, err := c.FetchChunks(ctx, filtered, keys)
if err != nil {
return nil, promql.ErrStorage{Err: err}
return nil, err
}

// Filter out chunks based on the empty matchers in the query.
Expand Down
10 changes: 5 additions & 5 deletions pkg/chunk/chunk_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func TestChunkStore_Get(t *testing.T) {
},
{
query: `{__name__=~"foo"}`,
err: "rpc error: code = Code(400) desc = query must contain metric name",
err: "query must contain metric name",
},
}
for _, schema := range schemas {
Expand Down Expand Up @@ -908,25 +908,25 @@ func TestChunkStoreError(t *testing.T) {
query: "foo",
from: model.Time(0).Add(31 * 24 * time.Hour),
through: model.Time(0),
err: "rpc error: code = Code(400) desc = invalid query, through < from (0 < 2678400)",
err: "invalid query, through < from (0 < 2678400)",
},
{
query: "foo",
from: model.Time(0),
through: model.Time(0).Add(31 * 24 * time.Hour),
err: "rpc error: code = Code(400) desc = invalid query, length > limit (744h0m0s > 720h0m0s)",
err: "invalid query, length > limit (744h0m0s > 720h0m0s)",
},
{
query: "{foo=\"bar\"}",
from: model.Time(0),
through: model.Time(0).Add(1 * time.Hour),
err: "rpc error: code = Code(400) desc = query must contain metric name",
err: "query must contain metric name",
},
{
query: "{__name__=~\"bar\"}",
from: model.Time(0),
through: model.Time(0).Add(1 * time.Hour),
err: "rpc error: code = Code(400) desc = query must contain metric name",
err: "query must contain metric name",
},
} {
for _, schema := range schemas {
Expand Down
4 changes: 1 addition & 3 deletions pkg/chunk/series_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package chunk
import (
"context"
"fmt"
"net/http"
"time"

"github.com/go-kit/kit/log/level"
Expand All @@ -13,7 +12,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
Expand Down Expand Up @@ -116,7 +114,7 @@ func (c *seriesStore) Get(ctx context.Context, userID string, from, through mode
// Protect ourselves against OOMing.
maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID)
if maxChunksPerQuery > 0 && len(chunks) > maxChunksPerQuery {
err := httpgrpc.Errorf(http.StatusBadRequest, "Query %v fetched too many chunks (%d > %d)", allMatchers, len(chunks), maxChunksPerQuery)
err := QueryError(fmt.Sprintf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(chunks), maxChunksPerQuery))
level.Error(log).Log("err", err)
return nil, err
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/querier/chunk_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,19 @@ func (q *chunkStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...
}
chunks, err := q.store.Get(q.ctx, userID, model.Time(sp.Start), model.Time(sp.End), matchers...)
if err != nil {
return nil, nil, promql.ErrStorage{Err: err}
switch err.(type) {
case promql.ErrStorage, promql.ErrTooManySamples, promql.ErrQueryCanceled, promql.ErrQueryTimeout:
// Recognized by Prometheus API, vendor/github.com/prometheus/prometheus/promql/engine.go:91.
// Don't translate those, just in case we use them internally.
return nil, nil, err
case chunk.QueryError:
// This will be returned with status code 422 by Prometheus API.
// vendor/github.com/prometheus/prometheus/web/api/v1/api.go:1393
return nil, nil, err
default:
// All other errors will be returned as 500.
return nil, nil, promql.ErrStorage{Err: err}
}
}

return partitionChunks(chunks, q.mint, q.maxt, q.chunkIteratorFunc), nil, nil
Expand Down
Loading