diff --git a/CHANGELOG.md b/CHANGELOG.md index a9de139019c..18b0fdfc101 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -58,6 +58,7 @@ * [ENHANCEMENT] Allow configuration of Cassandra's host selection policy. #4069 * [ENHANCEMENT] Store-gateway: retry synching blocks if a per-tenant sync fails. #3975 #4088 * [ENHANCEMENT] Add metric `cortex_tcp_connections` exposing the current number of accepted TCP connections. #4099 +* [ENHANCEMENT] Querier: Allow federated queries to run concurrently. #4065 * [BUGFIX] Ruler-API: fix bug where `/api/v1/rules//` endpoint return `400` instead of `404`. #4013 * [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948 * [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959 diff --git a/pkg/querier/tenantfederation/merge_queryable.go b/pkg/querier/tenantfederation/merge_queryable.go index 5bc9d89d0e0..01f8b9667c3 100644 --- a/pkg/querier/tenantfederation/merge_queryable.go +++ b/pkg/querier/tenantfederation/merge_queryable.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sort" + "strings" "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" @@ -13,12 +14,14 @@ import ( "github.com/weaveworks/common/user" "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util/concurrency" ) const ( defaultTenantLabel = "__tenant_id__" retainExistingPrefix = "original_" originalDefaultTenantLabel = retainExistingPrefix + defaultTenantLabel + maxConcurrency = 16 ) // NewQueryable returns a queryable that iterates through all the tenant IDs @@ -65,6 +68,7 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s } return &mergeQuerier{ + ctx: ctx, queriers: queriers, tenantIDs: tenantIDs, }, nil @@ -77,6 +81,7 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s // overwritten by the tenant ID and the previous value is exposed through a new // label prefixed with "original_". This behaviour is not implemented recursively type mergeQuerier struct { + ctx context.Context queriers []storage.Querier tenantIDs []string } @@ -97,7 +102,7 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([] name = defaultTenantLabel } - return m.mergeDistinctStringSlice(func(q storage.Querier) ([]string, storage.Warnings, error) { + return m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) { return q.LabelValues(name, matchers...) }) } @@ -106,7 +111,7 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([] // queriers. It also adds the defaultTenantLabel and if present in the original // results the originalDefaultTenantLabel func (m *mergeQuerier) LabelNames() ([]string, storage.Warnings, error) { - labelNames, warnings, err := m.mergeDistinctStringSlice(func(q storage.Querier) ([]string, storage.Warnings, error) { + labelNames, warnings, err := m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) { return q.LabelNames() }) if err != nil { @@ -137,27 +142,64 @@ func (m *mergeQuerier) LabelNames() ([]string, storage.Warnings, error) { return labelNames, warnings, nil } -type stringSliceFunc func(storage.Querier) ([]string, storage.Warnings, error) +type stringSliceFunc func(context.Context, storage.Querier) ([]string, storage.Warnings, error) + +type stringSliceFuncJob struct { + querier storage.Querier + tenantID string + result []string + warnings storage.Warnings +} // mergeDistinctStringSlice is aggregating results from stringSliceFunc calls -// on a querier. It removes duplicates and sorts the result. It doesn't require -// the output of the stringSliceFunc to be sorted, as results of LabelValues -// are not sorted. -// -// TODO: Consider running stringSliceFunc calls concurrently +// on per querier in parallel. It removes duplicates and sorts the result. It +// doesn't require the output of the stringSliceFunc to be sorted, as results +// of LabelValues are not sorted. func (m *mergeQuerier) mergeDistinctStringSlice(f stringSliceFunc) ([]string, storage.Warnings, error) { + var jobs = make([]interface{}, len(m.tenantIDs)) + + for pos := range m.tenantIDs { + jobs[pos] = &stringSliceFuncJob{ + querier: m.queriers[pos], + tenantID: m.tenantIDs[pos], + } + } + + run := func(ctx context.Context, jobIntf interface{}) error { + job, ok := jobIntf.(*stringSliceFuncJob) + if !ok { + return fmt.Errorf("unexpected type %T", jobIntf) + } + + var err error + job.result, job.warnings, err = f(ctx, job.querier) + if err != nil { + return errors.Wrapf(err, "error querying %s %s", rewriteLabelName(defaultTenantLabel), job.tenantID) + } + + return nil + } + + err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run) + if err != nil { + return nil, nil, err + } + + // aggregate warnings and deduplicate string results var warnings storage.Warnings resultMap := make(map[string]struct{}) - for pos, tenantID := range m.tenantIDs { - result, resultWarnings, err := f(m.queriers[pos]) - if err != nil { - return nil, nil, err + for _, jobIntf := range jobs { + job, ok := jobIntf.(*stringSliceFuncJob) + if !ok { + return nil, nil, fmt.Errorf("unexpected type %T", jobIntf) } - for _, e := range result { + + for _, e := range job.result { resultMap[e] = struct{}{} } - for _, w := range resultWarnings { - warnings = append(warnings, fmt.Errorf("error querying tenant id %s: %w", tenantID, w)) + + for _, w := range job.warnings { + warnings = append(warnings, errors.Wrapf(w, "warning querying %s %s", rewriteLabelName(defaultTenantLabel), job.tenantID)) } } @@ -173,33 +215,60 @@ func (m *mergeQuerier) mergeDistinctStringSlice(f stringSliceFunc) ([]string, st func (m *mergeQuerier) Close() error { errs := tsdb_errors.NewMulti() for pos, tenantID := range m.tenantIDs { - errs.Add(errors.Wrapf(m.queriers[pos].Close(), "failed to close querier for tenant id %s", tenantID)) + errs.Add(errors.Wrapf(m.queriers[pos].Close(), "failed to close querier for %s %s", rewriteLabelName(defaultTenantLabel), tenantID)) } return errs.Err() } +type selectJob struct { + pos int + querier storage.Querier + tenantID string +} + // Select returns a set of series that matches the given label matchers. If the // tenantLabelName is matched on it only considers those queriers matching. The // forwarded labelSelector is not containing those that operate on // tenantLabelName. func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { matchedTenants, filteredMatchers := filterValuesByMatchers(defaultTenantLabel, m.tenantIDs, matchers...) - var seriesSets = make([]storage.SeriesSet, 0, len(matchedTenants)) - for pos, tenantID := range m.tenantIDs { - if _, matched := matchedTenants[tenantID]; !matched { + var jobs = make([]interface{}, len(matchedTenants)) + var seriesSets = make([]storage.SeriesSet, len(matchedTenants)) + var jobPos int + for tenantPos := range m.tenantIDs { + if _, matched := matchedTenants[m.tenantIDs[tenantPos]]; !matched { continue } - seriesSets = append(seriesSets, &addLabelsSeriesSet{ - // TODO: Consider running Select calls concurrently - upstream: m.queriers[pos].Select(sortSeries, hints, filteredMatchers...), + jobs[jobPos] = &selectJob{ + pos: jobPos, + querier: m.queriers[tenantPos], + tenantID: m.tenantIDs[tenantPos], + } + jobPos++ + } + + run := func(ctx context.Context, jobIntf interface{}) error { + job, ok := jobIntf.(*selectJob) + if !ok { + return fmt.Errorf("unexpected type %T", jobIntf) + } + seriesSets[job.pos] = &addLabelsSeriesSet{ + upstream: job.querier.Select(sortSeries, hints, filteredMatchers...), labels: labels.Labels{ { Name: defaultTenantLabel, - Value: tenantID, + Value: job.tenantID, }, }, - }) + } + return nil } + + err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run) + if err != nil { + return storage.ErrSeriesSet(err) + } + return storage.NewMergeSeriesSet(seriesSets, storage.ChainedSeriesMerge) } @@ -266,13 +335,32 @@ func (m *addLabelsSeriesSet) At() storage.Series { // The error that iteration as failed with. // When an error occurs, set cannot continue to iterate. func (m *addLabelsSeriesSet) Err() error { - return m.upstream.Err() + return errors.Wrapf(m.upstream.Err(), "error querying %s", labelsToString(m.labels)) } // A collection of warnings for the whole set. // Warnings could be return even iteration has not failed with error. func (m *addLabelsSeriesSet) Warnings() storage.Warnings { - return m.upstream.Warnings() + upstream := m.upstream.Warnings() + warnings := make(storage.Warnings, len(upstream)) + for pos := range upstream { + warnings[pos] = errors.Wrapf(upstream[pos], "warning querying %s", labelsToString(m.labels)) + } + return warnings +} + +// rewrite label name to be more readable in error output +func rewriteLabelName(s string) string { + return strings.TrimRight(strings.TrimLeft(s, "_"), "_") +} + +// this outputs a more readable error format +func labelsToString(labels labels.Labels) string { + parts := make([]string, len(labels)) + for pos, l := range labels { + parts[pos] = rewriteLabelName(l.Name) + " " + l.Value + } + return strings.Join(parts, ", ") } type addLabelsSeries struct { diff --git a/pkg/querier/tenantfederation/merge_queryable_test.go b/pkg/querier/tenantfederation/merge_queryable_test.go index 6b63c9ba000..de6c493e8d0 100644 --- a/pkg/querier/tenantfederation/merge_queryable_test.go +++ b/pkg/querier/tenantfederation/merge_queryable_test.go @@ -25,7 +25,9 @@ const ( ) type mockTenantQueryableWithFilter struct { - extraLabels []string + extraLabels []string + warningsByTenant map[string]storage.Warnings + queryErrByTenant map[string]error } func (m *mockTenantQueryableWithFilter) Querier(ctx context.Context, _, _ int64) (storage.Querier, error) { @@ -33,7 +35,24 @@ func (m *mockTenantQueryableWithFilter) Querier(ctx context.Context, _, _ int64) if err != nil { return nil, err } - return mockTenantQuerier{tenant: tenantIDs[0], extraLabels: m.extraLabels}, nil + + q := mockTenantQuerier{tenant: tenantIDs[0], extraLabels: m.extraLabels} + + // set warning if exists + if m.warningsByTenant != nil { + if w, ok := m.warningsByTenant[q.tenant]; ok { + q.warnings = append([]error(nil), w...) + } + } + + // set queryErr if exists + if m.queryErrByTenant != nil { + if err, ok := m.queryErrByTenant[q.tenant]; ok { + q.queryErr = err + } + } + + return q, nil } func (m *mockTenantQueryableWithFilter) UseQueryable(_ time.Time, _, _ int64) bool { @@ -43,6 +62,9 @@ func (m *mockTenantQueryableWithFilter) UseQueryable(_ time.Time, _, _ int64) bo type mockTenantQuerier struct { tenant string extraLabels []string + + warnings storage.Warnings + queryErr error } func (m mockTenantQuerier) matrix() model.Matrix { @@ -84,6 +106,33 @@ func metricMatches(m model.Metric, selector labels.Selector) bool { return selector.Matches(labels.FromStrings(labelStrings...)) } +type mockSeriesSet struct { + upstream storage.SeriesSet + warnings storage.Warnings + queryErr error +} + +func (m *mockSeriesSet) Next() bool { + return m.upstream.Next() +} + +// At returns full series. Returned series should be iterable even after Next is called. +func (m *mockSeriesSet) At() storage.Series { + return m.upstream.At() +} + +// The error that iteration as failed with. +// When an error occurs, set cannot continue to iterate. +func (m *mockSeriesSet) Err() error { + return m.queryErr +} + +// A collection of warnings for the whole set. +// Warnings could be returned even if iteration has not failed with error. +func (m *mockSeriesSet) Warnings() storage.Warnings { + return m.warnings +} + func (m mockTenantQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { var matrix model.Matrix @@ -93,7 +142,11 @@ func (m mockTenantQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...* } } - return series.MatrixToSeriesSet(matrix) + return &mockSeriesSet{ + upstream: series.MatrixToSeriesSet(matrix), + warnings: m.warnings, + queryErr: m.queryErr, + } } func (m mockTenantQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { @@ -101,6 +154,10 @@ func (m mockTenantQuerier) LabelValues(name string, matchers ...*labels.Matcher) return nil, nil, errors.New("matchers are not implemented yet") } + if m.queryErr != nil { + return nil, nil, m.queryErr + } + labelValues := make(map[string]struct{}) for _, s := range m.matrix() { for k, v := range s.Metric { @@ -114,10 +171,15 @@ func (m mockTenantQuerier) LabelValues(name string, matchers ...*labels.Matcher) results = append(results, k) } sort.Strings(results) - return results, nil, nil + + return results, m.warnings, nil } func (m mockTenantQuerier) LabelNames() ([]string, storage.Warnings, error) { + if m.queryErr != nil { + return nil, nil, m.queryErr + } + labelValues := make(map[string]struct{}) for _, s := range m.matrix() { for k := range s.Metric { @@ -129,7 +191,7 @@ func (m mockTenantQuerier) LabelNames() ([]string, storage.Warnings, error) { results = append(results, k) } sort.Strings(results) - return results, nil, nil + return results, m.warnings, nil } func (mockTenantQuerier) Close() error { @@ -159,23 +221,28 @@ func (c selectorTestCase) test(querier storage.Querier) func(*testing.T) { type mergeQueryableTestCase struct { name string tenants []string - expectedErr error - extraLabels []string + expectedQuerierErr error labelNames []string expectedLabelValues map[string][]string selectorCases []selectorTestCase + + // storage.Warnings expected when querying + expectedWarnings []string + + // error expected when querying + expectedQueryErr error + + queryable mockTenantQueryableWithFilter } func TestMergeQueryable(t *testing.T) { - upstreamQueryable := &mockTenantQueryableWithFilter{} - // set a multi tenant resolver tenant.WithDefaultResolver(tenant.NewMultiResolver()) for _, tc := range []mergeQueryableTestCase{ { - name: "no tenant", - expectedErr: user.ErrNoOrgID, + name: "no tenant", + expectedQuerierErr: user.ErrNoOrgID, }, { name: "single tenant", @@ -215,10 +282,12 @@ func TestMergeQueryable(t *testing.T) { }, }, { - name: "three tenants and a __tenant_id__ label set", - tenants: []string{"team-a", "team-b", "team-c"}, - labelNames: []string{defaultTenantLabel, "instance", originalDefaultTenantLabel, "tenant-team-a", "tenant-team-b", "tenant-team-c"}, - extraLabels: []string{"__tenant_id__", "original-value"}, + name: "three tenants and a __tenant_id__ label set", + tenants: []string{"team-a", "team-b", "team-c"}, + labelNames: []string{defaultTenantLabel, "instance", originalDefaultTenantLabel, "tenant-team-a", "tenant-team-b", "tenant-team-c"}, + queryable: mockTenantQueryableWithFilter{ + extraLabels: []string{"__tenant_id__", "original-value"}, + }, expectedLabelValues: map[string][]string{ "instance": {"host1", "host2.team-a", "host2.team-b", "host2.team-c"}, defaultTenantLabel: {"team-a", "team-b", "team-c"}, @@ -258,14 +327,42 @@ func TestMergeQueryable(t *testing.T) { }, }, }, + { + name: "three tenants with some return storage warnings", + tenants: []string{"team-a", "team-b", "team-c"}, + labelNames: []string{defaultTenantLabel, "instance", "tenant-team-a", "tenant-team-b", "tenant-team-c"}, + expectedLabelValues: map[string][]string{ + "instance": {"host1", "host2.team-a", "host2.team-b", "host2.team-c"}, + }, + queryable: mockTenantQueryableWithFilter{ + warningsByTenant: map[string]storage.Warnings{ + "team-b": storage.Warnings([]error{errors.New("don't like them")}), + "team-c": storage.Warnings([]error{errors.New("out of office")}), + }, + }, + expectedWarnings: []string{ + `warning querying tenant_id team-b: don't like them`, + `warning querying tenant_id team-c: out of office`, + }, + }, + { + name: "three tenants with one error", + tenants: []string{"team-a", "team-b", "team-c"}, + labelNames: []string{defaultTenantLabel, "instance", "tenant-team-a", "tenant-team-b", "tenant-team-c"}, + expectedLabelValues: map[string][]string{ + "instance": {"host1", "host2.team-a", "host2.team-b", "host2.team-c"}, + }, + queryable: mockTenantQueryableWithFilter{ + queryErrByTenant: map[string]error{ + "team-b": errors.New("failure xyz"), + }, + }, + expectedQueryErr: errors.New("error querying tenant_id team-b: failure xyz"), + }, } { t.Run(tc.name, func(t *testing.T) { - upstreamQueryable.extraLabels = tc.extraLabels - // initialize with default tenant label - q := mergeQueryable{ - upstream: upstreamQueryable, - } + q := NewQueryable(&tc.queryable) // inject context if set ctx := context.Background() @@ -275,35 +372,62 @@ func TestMergeQueryable(t *testing.T) { // retrieve querier if set querier, err := q.Querier(ctx, mint, maxt) - if tc.expectedErr != nil { - require.EqualError(t, err, tc.expectedErr.Error()) + if tc.expectedQuerierErr != nil { + require.EqualError(t, err, tc.expectedQuerierErr.Error()) return } require.NoError(t, err) // select all series and don't expect an error seriesSet := querier.Select(true, &storage.SelectHints{Start: mint, End: maxt}) - require.NoError(t, seriesSet.Err()) - - // test individual matchers - for _, sc := range tc.selectorCases { - t.Run(sc.name, sc.test(querier)) + if tc.expectedQueryErr != nil { + require.EqualError(t, seriesSet.Err(), tc.expectedQueryErr.Error()) + } else { + require.NoError(t, seriesSet.Err()) + assertEqualWarnings(t, tc.expectedWarnings, seriesSet.Warnings()) + + // test individual matchers + for _, sc := range tc.selectorCases { + t.Run(sc.name, sc.test(querier)) + } } - labelNames, _, err := querier.LabelNames() - require.NoError(t, err) - assert.Equal(t, tc.labelNames, labelNames) + // check label names + labelNames, warnings, err := querier.LabelNames() + if tc.expectedQueryErr != nil { + require.EqualError(t, err, tc.expectedQueryErr.Error()) + } else { + require.NoError(t, err) + assert.Equal(t, tc.labelNames, labelNames) + assertEqualWarnings(t, tc.expectedWarnings, warnings) + } - // check label values + // check label values method for labelName, expectedLabelValues := range tc.expectedLabelValues { - actLabelValues, _, err := querier.LabelValues(labelName) - require.NoError(t, err) - assert.Equal(t, expectedLabelValues, actLabelValues, fmt.Sprintf("unexpected values for label '%s'", labelName)) + actLabelValues, warnings, err := querier.LabelValues(labelName) + if tc.expectedQueryErr != nil { + require.EqualError(t, err, tc.expectedQueryErr.Error()) + } else { + require.NoError(t, err) + assert.Equal(t, expectedLabelValues, actLabelValues, fmt.Sprintf("unexpected values for label '%s'", labelName)) + assertEqualWarnings(t, tc.expectedWarnings, warnings) + } } }) } } +func assertEqualWarnings(t *testing.T, exp []string, act storage.Warnings) { + if len(exp) == 0 && len(act) == 0 { + return + } + var actStrings = make([]string, len(act)) + for pos := range act { + actStrings[pos] = act[pos].Error() + } + assert.ElementsMatch(t, exp, actStrings) +} + func TestSetLabelsRetainExisting(t *testing.T) { for _, tc := range []struct { labels labels.Labels diff --git a/pkg/util/concurrency/runner.go b/pkg/util/concurrency/runner.go index e779fcb4baa..a1f0d12470a 100644 --- a/pkg/util/concurrency/runner.go +++ b/pkg/util/concurrency/runner.go @@ -81,6 +81,10 @@ func ForEach(ctx context.Context, jobs []interface{}, concurrency int, jobFunc f for ix := 0; ix < util_math.Min(concurrency, len(jobs)); ix++ { g.Go(func() error { for job := range ch { + if err := ctx.Err(); err != nil { + return err + } + if err := jobFunc(ctx, job); err != nil { return err } diff --git a/pkg/util/concurrency/runner_test.go b/pkg/util/concurrency/runner_test.go index f9d99596b31..1dec972c4d6 100644 --- a/pkg/util/concurrency/runner_test.go +++ b/pkg/util/concurrency/runner_test.go @@ -94,7 +94,7 @@ func TestForEach(t *testing.T) { assert.ElementsMatch(t, jobs, processed) } -func TestForEach_ShouldBreakOnFirstError(t *testing.T) { +func TestForEach_ShouldBreakOnFirstError_ContextCancellationHandled(t *testing.T) { var ( ctx = context.Background() @@ -125,6 +125,42 @@ func TestForEach_ShouldBreakOnFirstError(t *testing.T) { assert.Equal(t, int32(1), processed.Load()) } +func TestForEach_ShouldBreakOnFirstError_ContextCancellationUnhandled(t *testing.T) { + var ( + ctx = context.Background() + + // Keep the processed jobs count. + processed atomic.Int32 + ) + + // waitGroup to await the start of the first two jobs + var wg sync.WaitGroup + wg.Add(2) + + err := ForEach(ctx, []interface{}{"a", "b", "c"}, 2, func(ctx context.Context, job interface{}) error { + wg.Done() + + if processed.CAS(0, 1) { + // wait till two jobs have been started + wg.Wait() + return errors.New("the first request is failing") + } + + // Wait till context is cancelled to add processed jobs. + <-ctx.Done() + processed.Add(1) + + return nil + }) + + require.EqualError(t, err, "the first request is failing") + + // Since we expect the first error interrupts the workers, we should only + // see 2 job processed (the one which immediately returned error and the + // job with "b"). + assert.Equal(t, int32(2), processed.Load()) +} + func TestForEach_ShouldReturnImmediatelyOnNoJobsProvided(t *testing.T) { require.NoError(t, ForEach(context.Background(), nil, 2, func(ctx context.Context, job interface{}) error { return nil