Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
* [ENHANCEMENT] Allow configuration of Cassandra's host selection policy. #4069
* [ENHANCEMENT] Store-gateway: retry synching blocks if a per-tenant sync fails. #3975 #4088
* [ENHANCEMENT] Add metric `cortex_tcp_connections` exposing the current number of accepted TCP connections. #4099
* [ENHANCEMENT] Querier: Allow federated queries to run concurrently. #4065
* [BUGFIX] Ruler-API: fix bug where `/api/v1/rules/<namespace>/<group_name>` endpoint return `400` instead of `404`. #4013
* [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948
* [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959
Expand Down
140 changes: 114 additions & 26 deletions pkg/querier/tenantfederation/merge_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strings"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
Expand All @@ -13,12 +14,14 @@ import (
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/concurrency"
)

const (
defaultTenantLabel = "__tenant_id__"
retainExistingPrefix = "original_"
originalDefaultTenantLabel = retainExistingPrefix + defaultTenantLabel
maxConcurrency = 16
)

// NewQueryable returns a queryable that iterates through all the tenant IDs
Expand Down Expand Up @@ -65,6 +68,7 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s
}

return &mergeQuerier{
ctx: ctx,
queriers: queriers,
tenantIDs: tenantIDs,
}, nil
Expand All @@ -77,6 +81,7 @@ func (m *mergeQueryable) Querier(ctx context.Context, mint int64, maxt int64) (s
// overwritten by the tenant ID and the previous value is exposed through a new
// label prefixed with "original_". This behaviour is not implemented recursively
type mergeQuerier struct {
ctx context.Context
queriers []storage.Querier
tenantIDs []string
}
Expand All @@ -97,7 +102,7 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]
name = defaultTenantLabel
}

return m.mergeDistinctStringSlice(func(q storage.Querier) ([]string, storage.Warnings, error) {
return m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) {
return q.LabelValues(name, matchers...)
})
}
Expand All @@ -106,7 +111,7 @@ func (m *mergeQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]
// queriers. It also adds the defaultTenantLabel and if present in the original
// results the originalDefaultTenantLabel
func (m *mergeQuerier) LabelNames() ([]string, storage.Warnings, error) {
labelNames, warnings, err := m.mergeDistinctStringSlice(func(q storage.Querier) ([]string, storage.Warnings, error) {
labelNames, warnings, err := m.mergeDistinctStringSlice(func(ctx context.Context, q storage.Querier) ([]string, storage.Warnings, error) {
return q.LabelNames()
})
if err != nil {
Expand Down Expand Up @@ -137,27 +142,64 @@ func (m *mergeQuerier) LabelNames() ([]string, storage.Warnings, error) {
return labelNames, warnings, nil
}

type stringSliceFunc func(storage.Querier) ([]string, storage.Warnings, error)
type stringSliceFunc func(context.Context, storage.Querier) ([]string, storage.Warnings, error)

type stringSliceFuncJob struct {
querier storage.Querier
tenantID string
result []string
warnings storage.Warnings
}

// mergeDistinctStringSlice is aggregating results from stringSliceFunc calls
// on a querier. It removes duplicates and sorts the result. It doesn't require
// the output of the stringSliceFunc to be sorted, as results of LabelValues
// are not sorted.
//
// TODO: Consider running stringSliceFunc calls concurrently
// on per querier in parallel. It removes duplicates and sorts the result. It
// doesn't require the output of the stringSliceFunc to be sorted, as results
// of LabelValues are not sorted.
func (m *mergeQuerier) mergeDistinctStringSlice(f stringSliceFunc) ([]string, storage.Warnings, error) {
var jobs = make([]interface{}, len(m.tenantIDs))

for pos := range m.tenantIDs {
jobs[pos] = &stringSliceFuncJob{
querier: m.queriers[pos],
tenantID: m.tenantIDs[pos],
}
}

run := func(ctx context.Context, jobIntf interface{}) error {
job, ok := jobIntf.(*stringSliceFuncJob)
if !ok {
return fmt.Errorf("unexpected type %T", jobIntf)
}

var err error
job.result, job.warnings, err = f(ctx, job.querier)
if err != nil {
return errors.Wrapf(err, "error querying %s %s", rewriteLabelName(defaultTenantLabel), job.tenantID)
}

return nil
}

err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run)
if err != nil {
return nil, nil, err
}

// aggregate warnings and deduplicate string results
var warnings storage.Warnings
resultMap := make(map[string]struct{})
for pos, tenantID := range m.tenantIDs {
result, resultWarnings, err := f(m.queriers[pos])
if err != nil {
return nil, nil, err
for _, jobIntf := range jobs {
job, ok := jobIntf.(*stringSliceFuncJob)
if !ok {
return nil, nil, fmt.Errorf("unexpected type %T", jobIntf)
}
for _, e := range result {

for _, e := range job.result {
resultMap[e] = struct{}{}
}
for _, w := range resultWarnings {
warnings = append(warnings, fmt.Errorf("error querying tenant id %s: %w", tenantID, w))

for _, w := range job.warnings {
warnings = append(warnings, errors.Wrapf(w, "warning querying %s %s", rewriteLabelName(defaultTenantLabel), job.tenantID))
}
}

Expand All @@ -173,33 +215,60 @@ func (m *mergeQuerier) mergeDistinctStringSlice(f stringSliceFunc) ([]string, st
func (m *mergeQuerier) Close() error {
errs := tsdb_errors.NewMulti()
for pos, tenantID := range m.tenantIDs {
errs.Add(errors.Wrapf(m.queriers[pos].Close(), "failed to close querier for tenant id %s", tenantID))
errs.Add(errors.Wrapf(m.queriers[pos].Close(), "failed to close querier for %s %s", rewriteLabelName(defaultTenantLabel), tenantID))
}
return errs.Err()
}

type selectJob struct {
pos int
querier storage.Querier
tenantID string
}

// Select returns a set of series that matches the given label matchers. If the
// tenantLabelName is matched on it only considers those queriers matching. The
// forwarded labelSelector is not containing those that operate on
// tenantLabelName.
func (m *mergeQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
matchedTenants, filteredMatchers := filterValuesByMatchers(defaultTenantLabel, m.tenantIDs, matchers...)
var seriesSets = make([]storage.SeriesSet, 0, len(matchedTenants))
for pos, tenantID := range m.tenantIDs {
if _, matched := matchedTenants[tenantID]; !matched {
var jobs = make([]interface{}, len(matchedTenants))
var seriesSets = make([]storage.SeriesSet, len(matchedTenants))
var jobPos int
for tenantPos := range m.tenantIDs {
if _, matched := matchedTenants[m.tenantIDs[tenantPos]]; !matched {
continue
}
seriesSets = append(seriesSets, &addLabelsSeriesSet{
// TODO: Consider running Select calls concurrently
upstream: m.queriers[pos].Select(sortSeries, hints, filteredMatchers...),
jobs[jobPos] = &selectJob{
pos: jobPos,
querier: m.queriers[tenantPos],
tenantID: m.tenantIDs[tenantPos],
}
jobPos++
}

run := func(ctx context.Context, jobIntf interface{}) error {
job, ok := jobIntf.(*selectJob)
if !ok {
return fmt.Errorf("unexpected type %T", jobIntf)
}
seriesSets[job.pos] = &addLabelsSeriesSet{
upstream: job.querier.Select(sortSeries, hints, filteredMatchers...),
labels: labels.Labels{
{
Name: defaultTenantLabel,
Value: tenantID,
Value: job.tenantID,
},
},
})
}
return nil
}

err := concurrency.ForEach(m.ctx, jobs, maxConcurrency, run)
if err != nil {
return storage.ErrSeriesSet(err)
}

return storage.NewMergeSeriesSet(seriesSets, storage.ChainedSeriesMerge)
}

Expand Down Expand Up @@ -266,13 +335,32 @@ func (m *addLabelsSeriesSet) At() storage.Series {
// The error that iteration as failed with.
// When an error occurs, set cannot continue to iterate.
func (m *addLabelsSeriesSet) Err() error {
return m.upstream.Err()
return errors.Wrapf(m.upstream.Err(), "error querying %s", labelsToString(m.labels))
}

// A collection of warnings for the whole set.
// Warnings could be return even iteration has not failed with error.
func (m *addLabelsSeriesSet) Warnings() storage.Warnings {
return m.upstream.Warnings()
upstream := m.upstream.Warnings()
warnings := make(storage.Warnings, len(upstream))
for pos := range upstream {
warnings[pos] = errors.Wrapf(upstream[pos], "warning querying %s", labelsToString(m.labels))
}
return warnings
}

// rewrite label name to be more readable in error output
func rewriteLabelName(s string) string {
return strings.TrimRight(strings.TrimLeft(s, "_"), "_")
}

// this outputs a more readable error format
func labelsToString(labels labels.Labels) string {
parts := make([]string, len(labels))
for pos, l := range labels {
parts[pos] = rewriteLabelName(l.Name) + " " + l.Value
}
return strings.Join(parts, ", ")
}

type addLabelsSeries struct {
Expand Down
Loading