|
| 1 | +package tenantfederation |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "fmt" |
| 6 | + |
| 7 | + "github.com/pkg/errors" |
| 8 | + "github.com/prometheus/client_golang/prometheus" |
| 9 | + "github.com/prometheus/client_golang/prometheus/promauto" |
| 10 | + "github.com/prometheus/prometheus/model/exemplar" |
| 11 | + "github.com/prometheus/prometheus/model/labels" |
| 12 | + "github.com/prometheus/prometheus/storage" |
| 13 | + "github.com/weaveworks/common/user" |
| 14 | + |
| 15 | + "github.com/cortexproject/cortex/pkg/tenant" |
| 16 | + "github.com/cortexproject/cortex/pkg/util/concurrency" |
| 17 | + "github.com/cortexproject/cortex/pkg/util/spanlogger" |
| 18 | +) |
| 19 | + |
| 20 | +// NewExemplarQueryable returns a exemplarQueryable that iterates through all the |
| 21 | +// tenant IDs that are part of the request and aggregates the results from each |
| 22 | +// tenant's ExemplarQuerier by sending of subsequent requests. |
| 23 | +// By setting byPassWithSingleQuerier to true the mergeExemplarQuerier gets by-passed |
| 24 | +// and results for request with a single exemplar querier will not contain the |
| 25 | +// "__tenant_id__" label. This allows a smoother transition, when enabling |
| 26 | +// tenant federation in a cluster. |
| 27 | +// The result contains a label "__tenant_id__" to identify the tenant ID that |
| 28 | +// it originally resulted from. |
| 29 | +// If the label "__tenant_id__" is already existing, its value is overwritten |
| 30 | +// by the tenant ID and the previous value is exposed through a new label |
| 31 | +// prefixed with "original_". This behaviour is not implemented recursively. |
| 32 | +func NewExemplarQueryable(upstream storage.ExemplarQueryable, maxConcurrent int, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.ExemplarQueryable { |
| 33 | + return NewMergeExemplarQueryable(defaultTenantLabel, maxConcurrent, tenantExemplarQuerierCallback(upstream), byPassWithSingleQuerier, reg) |
| 34 | +} |
| 35 | + |
| 36 | +func tenantExemplarQuerierCallback(exemplarQueryable storage.ExemplarQueryable) MergeExemplarQuerierCallback { |
| 37 | + return func(ctx context.Context) ([]string, []storage.ExemplarQuerier, error) { |
| 38 | + tenantIDs, err := tenant.TenantIDs(ctx) |
| 39 | + if err != nil { |
| 40 | + return nil, nil, err |
| 41 | + } |
| 42 | + |
| 43 | + var queriers = make([]storage.ExemplarQuerier, len(tenantIDs)) |
| 44 | + for pos, tenantID := range tenantIDs { |
| 45 | + q, err := exemplarQueryable.ExemplarQuerier(user.InjectOrgID(ctx, tenantID)) |
| 46 | + if err != nil { |
| 47 | + return nil, nil, err |
| 48 | + } |
| 49 | + queriers[pos] = q |
| 50 | + } |
| 51 | + |
| 52 | + return tenantIDs, queriers, nil |
| 53 | + } |
| 54 | +} |
| 55 | + |
| 56 | +// MergeExemplarQuerierCallback returns the underlying exemplar queriers and their |
| 57 | +// IDs relevant for the query. |
| 58 | +type MergeExemplarQuerierCallback func(ctx context.Context) (ids []string, queriers []storage.ExemplarQuerier, err error) |
| 59 | + |
| 60 | +// NewMergeExemplarQueryable returns a queryable that merges results from multiple |
| 61 | +// underlying ExemplarQueryables. |
| 62 | +// By setting byPassWithSingleQuerier to true the mergeExemplarQuerier gets by-passed |
| 63 | +// and results for request with a single exemplar querier will not contain the |
| 64 | +// "__tenant_id__" label. This allows a smoother transition, when enabling |
| 65 | +// tenant federation in a cluster. |
| 66 | +// Results contain a label `idLabelName` to identify the underlying exemplar queryable |
| 67 | +// that it originally resulted from. |
| 68 | +// If the label `idLabelName` is already existing, its value is overwritten and |
| 69 | +// the previous value is exposed through a new label prefixed with "original_". |
| 70 | +// This behaviour is not implemented recursively. |
| 71 | +func NewMergeExemplarQueryable(idLabelName string, maxConcurrent int, callback MergeExemplarQuerierCallback, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.ExemplarQueryable { |
| 72 | + return &mergeExemplarQueryable{ |
| 73 | + idLabelName: idLabelName, |
| 74 | + byPassWithSingleQuerier: byPassWithSingleQuerier, |
| 75 | + callback: callback, |
| 76 | + maxConcurrent: maxConcurrent, |
| 77 | + |
| 78 | + tenantsPerExemplarQuery: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ |
| 79 | + Namespace: "cortex", |
| 80 | + Name: "querier_federated_tenants_per_exemplar_query", |
| 81 | + Help: "Number of tenants per exemplar query.", |
| 82 | + Buckets: []float64{1, 2, 4, 8, 16, 32, 64}, |
| 83 | + }), |
| 84 | + } |
| 85 | +} |
| 86 | + |
| 87 | +type mergeExemplarQueryable struct { |
| 88 | + idLabelName string |
| 89 | + maxConcurrent int |
| 90 | + byPassWithSingleQuerier bool |
| 91 | + callback MergeExemplarQuerierCallback |
| 92 | + tenantsPerExemplarQuery prometheus.Histogram |
| 93 | +} |
| 94 | + |
| 95 | +// ExemplarQuerier returns a new mergeExemplarQuerier which aggregates results from |
| 96 | +// multiple exemplar queriers into a single result. |
| 97 | +func (m *mergeExemplarQueryable) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { |
| 98 | + ids, queriers, err := m.callback(ctx) |
| 99 | + if err != nil { |
| 100 | + return nil, err |
| 101 | + } |
| 102 | + |
| 103 | + m.tenantsPerExemplarQuery.Observe(float64(len(ids))) |
| 104 | + |
| 105 | + if m.byPassWithSingleQuerier && len(queriers) == 1 { |
| 106 | + return queriers[0], nil |
| 107 | + } |
| 108 | + |
| 109 | + return &mergeExemplarQuerier{ |
| 110 | + ctx: ctx, |
| 111 | + idLabelName: m.idLabelName, |
| 112 | + maxConcurrent: m.maxConcurrent, |
| 113 | + tenantIds: ids, |
| 114 | + queriers: queriers, |
| 115 | + byPassWithSingleQuerier: m.byPassWithSingleQuerier, |
| 116 | + }, nil |
| 117 | +} |
| 118 | + |
| 119 | +// mergeExemplarQuerier aggregates the results from underlying exemplar queriers |
| 120 | +// and adds a label `idLabelName` to identify the exemplar queryable that |
| 121 | +// `seriesLabels` resulted from. |
| 122 | +// If the label `idLabelName` is already existing, its value is overwritten and |
| 123 | +// the previous value is exposed through a new label prefixed with "original_". |
| 124 | +// This behaviour is not implemented recursively. |
| 125 | +type mergeExemplarQuerier struct { |
| 126 | + ctx context.Context |
| 127 | + idLabelName string |
| 128 | + maxConcurrent int |
| 129 | + tenantIds []string |
| 130 | + queriers []storage.ExemplarQuerier |
| 131 | + byPassWithSingleQuerier bool |
| 132 | +} |
| 133 | + |
| 134 | +type exemplarSelectJob struct { |
| 135 | + pos int |
| 136 | + querier storage.ExemplarQuerier |
| 137 | + id string |
| 138 | +} |
| 139 | + |
| 140 | +// Select returns aggregated exemplars within given time range for multiple tenants. |
| 141 | +func (m mergeExemplarQuerier) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) { |
| 142 | + log, ctx := spanlogger.New(m.ctx, "mergeExemplarQuerier.Select") |
| 143 | + defer log.Span.Finish() |
| 144 | + |
| 145 | + // filter out tenants to query and unrelated matchers |
| 146 | + allMatchedTenantIds, allUnrelatedMatchers := filterAllTenantsAndMatchers(m.idLabelName, m.tenantIds, matchers) |
| 147 | + jobs := make([]interface{}, len(allMatchedTenantIds)) |
| 148 | + results := make([][]exemplar.QueryResult, len(allMatchedTenantIds)) |
| 149 | + |
| 150 | + var jobPos int |
| 151 | + for idx, tenantId := range m.tenantIds { |
| 152 | + if _, ok := allMatchedTenantIds[tenantId]; !ok { |
| 153 | + // skip tenantIds that should not be queried |
| 154 | + continue |
| 155 | + } |
| 156 | + |
| 157 | + jobs[jobPos] = &exemplarSelectJob{ |
| 158 | + pos: jobPos, |
| 159 | + querier: m.queriers[idx], |
| 160 | + id: tenantId, |
| 161 | + } |
| 162 | + jobPos++ |
| 163 | + } |
| 164 | + |
| 165 | + run := func(ctx context.Context, jobIntf interface{}) error { |
| 166 | + job, ok := jobIntf.(*exemplarSelectJob) |
| 167 | + if !ok { |
| 168 | + return fmt.Errorf("unexpected type %T", jobIntf) |
| 169 | + } |
| 170 | + |
| 171 | + res, err := job.querier.Select(start, end, allUnrelatedMatchers...) |
| 172 | + if err != nil { |
| 173 | + return errors.Wrapf(err, "error exemplars querying %s %s", rewriteLabelName(m.idLabelName), job.id) |
| 174 | + } |
| 175 | + |
| 176 | + // append __tenant__ label to `seriesLabels` to identify each tenants per exemplar quer |
| 177 | + for i, e := range res { |
| 178 | + e.SeriesLabels = setLabelsRetainExisting(e.SeriesLabels, labels.Label{ |
| 179 | + Name: m.idLabelName, |
| 180 | + Value: job.id, |
| 181 | + }) |
| 182 | + res[i] = e |
| 183 | + } |
| 184 | + |
| 185 | + results[job.pos] = res |
| 186 | + return nil |
| 187 | + } |
| 188 | + |
| 189 | + err := concurrency.ForEach(ctx, jobs, m.maxConcurrent, run) |
| 190 | + if err != nil { |
| 191 | + return nil, err |
| 192 | + } |
| 193 | + |
| 194 | + var ret []exemplar.QueryResult |
| 195 | + for _, exemplars := range results { |
| 196 | + ret = append(ret, exemplars...) |
| 197 | + } |
| 198 | + |
| 199 | + return ret, nil |
| 200 | +} |
| 201 | + |
| 202 | +func filterAllTenantsAndMatchers(idLabelName string, tenantIds []string, allMatchers [][]*labels.Matcher) (map[string]struct{}, [][]*labels.Matcher) { |
| 203 | + allMatchedTenantIds := make(map[string]struct{}) |
| 204 | + allUnrelatedMatchers := make([][]*labels.Matcher, len(allMatchers)) |
| 205 | + |
| 206 | + for idx, matchers := range allMatchers { |
| 207 | + matchedTenantIds, unrelatedMatchers := filterValuesByMatchers(idLabelName, tenantIds, matchers...) |
| 208 | + for tenantId := range matchedTenantIds { |
| 209 | + allMatchedTenantIds[tenantId] = struct{}{} |
| 210 | + } |
| 211 | + allUnrelatedMatchers[idx] = unrelatedMatchers |
| 212 | + } |
| 213 | + |
| 214 | + return allMatchedTenantIds, allUnrelatedMatchers |
| 215 | +} |
0 commit comments