Skip to content

Commit c518968

Browse files
authored
Merge branch 'master' into feat/external-labels
Signed-off-by: Friedrich Gonzalez <[email protected]>
2 parents 8c543ae + 71dccee commit c518968

File tree

8 files changed

+382
-5
lines changed

8 files changed

+382
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
1717
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
1818
* [FEATURE] Ruler: Add support for per-user external labels #6340
19+
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
1920
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333
2021
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
2122
* [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255

docs/configuration/config-file-reference.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3250,6 +3250,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
32503250
# CLI flag: -distributor.ha-tracker.enable-for-all-users
32513251
[accept_ha_samples: <boolean> | default = false]
32523252
3253+
# [Experimental] Flag to enable handling of samples with mixed external labels
3254+
# identifying replicas in an HA Prometheus setup. Supported only if
3255+
# -distributor.ha-tracker.enable-for-all-users is true.
3256+
# CLI flag: -experimental.distributor.ha-tracker.mixed-ha-samples
3257+
[accept_mixed_ha_samples: <boolean> | default = false]
3258+
32533259
# Prometheus label to look for in samples to identify a Prometheus HA cluster.
32543260
# CLI flag: -distributor.ha-tracker.cluster
32553261
[ha_cluster_label: <string> | default = "cluster"]

docs/configuration/v1-guarantees.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ Currently experimental features are:
5454
- Metric relabeling in the distributor.
5555
- Scalable query-frontend (when using query-scheduler)
5656
- Ingester: do not unregister from ring on shutdown (`-ingester.unregister-on-shutdown=false`)
57-
- Distributor: do not extend writes on unhealthy ingesters (`-distributor.extend-writes=false`)
57+
- Distributor:
58+
- Do not extend writes on unhealthy ingesters (`-distributor.extend-writes=false`)
59+
- Accept multiple HA pairs in the same request (enabled via `-experimental.distributor.ha-tracker.mixed-ha-samples=true`)
5860
- Tenant Deletion in Purger, for blocks storage.
5961
- Query-frontend: query stats tracking (`-frontend.query-stats-enabled`)
6062
- Blocks storage bucket index

docs/getting-started/_index.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ This example uses [Docker Compose](https://docs.docker.com/compose/) to set up:
4646
#### Instructions
4747

4848
```sh
49-
$ cd docs/getting-started
49+
$ git clone https://github.com/cortexproject/cortex.git
50+
$ cd cortex/docs/getting-started
5051
```
5152

5253
##### Start the services
@@ -160,7 +161,8 @@ $ helm repo add prometheus-community https://prometheus-community.github.io/helm
160161
### Instructions
161162

162163
```sh
163-
$ cd docs/getting-started
164+
$ git clone https://github.com/cortexproject/cortex.git
165+
$ cd cortex/docs/getting-started
164166
```
165167

166168
#### Configure SeaweedFS (S3)
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
//go:build requires_docker
2+
// +build requires_docker
3+
4+
package integration
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
"github.com/prometheus/common/model"
11+
"github.com/prometheus/prometheus/prompb"
12+
"github.com/stretchr/testify/require"
13+
14+
"github.com/cortexproject/cortex/integration/e2e"
15+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
16+
"github.com/cortexproject/cortex/integration/e2ecortex"
17+
)
18+
19+
func TestDistriubtorAcceptMixedHASamplesRunningInMicroservicesMode(t *testing.T) {
20+
const blockRangePeriod = 5 * time.Minute
21+
22+
t.Run("Distributor accept mixed HA samples in the same request", func(t *testing.T) {
23+
s, err := e2e.NewScenario(networkName)
24+
require.NoError(t, err)
25+
defer s.Close()
26+
27+
// Start dependencies.
28+
consul := e2edb.NewConsul()
29+
etcd := e2edb.NewETCD()
30+
minio := e2edb.NewMinio(9000, bucketName)
31+
require.NoError(t, s.StartAndWaitReady(consul, etcd, minio))
32+
33+
// Configure the querier to only look in ingester
34+
// and enbale distributor ha tracker with mixed samples.
35+
distributorFlags := map[string]string{
36+
"-distributor.ha-tracker.enable": "true",
37+
"-distributor.ha-tracker.enable-for-all-users": "true",
38+
"-experimental.distributor.ha-tracker.mixed-ha-samples": "true",
39+
"-distributor.ha-tracker.cluster": "cluster",
40+
"-distributor.ha-tracker.replica": "__replica__",
41+
"-distributor.ha-tracker.store": "etcd",
42+
"-distributor.ha-tracker.etcd.endpoints": "etcd:2379",
43+
}
44+
querierFlags := mergeFlags(BlocksStorageFlags(), map[string]string{
45+
"-querier.query-store-after": (1 * time.Hour).String(),
46+
})
47+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
48+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
49+
"-blocks-storage.tsdb.ship-interval": "5s",
50+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
51+
"-blocks-storage.bucket-store.max-chunk-pool-bytes": "1",
52+
})
53+
54+
// Start Cortex components.
55+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), distributorFlags, "")
56+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
57+
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
58+
59+
// Wait until both the distributor and ingester have updated the ring.
60+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
61+
62+
distributorClient, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
63+
require.NoError(t, err)
64+
65+
// Push some series to Cortex.
66+
series1Timestamp := time.Now()
67+
series2Timestamp := series1Timestamp.Add(-2 * time.Second)
68+
series3Timestamp := series1Timestamp.Add(-4 * time.Second)
69+
series4Timestamp := series1Timestamp.Add(-6 * time.Second)
70+
series5Timestamp := series1Timestamp.Add(-8 * time.Second)
71+
series6Timestamp := series1Timestamp.Add(-10 * time.Second)
72+
series7Timestamp := series1Timestamp.Add(-12 * time.Second)
73+
series1, _ := generateSeries("foo", series1Timestamp, prompb.Label{Name: "__replica__", Value: "replica0"}, prompb.Label{Name: "cluster", Value: "cluster0"})
74+
series2, _ := generateSeries("foo", series2Timestamp, prompb.Label{Name: "__replica__", Value: "replica1"}, prompb.Label{Name: "cluster", Value: "cluster0"})
75+
series3, _ := generateSeries("foo", series3Timestamp, prompb.Label{Name: "__replica__", Value: "replica0"}, prompb.Label{Name: "cluster", Value: "cluster1"})
76+
series4, _ := generateSeries("foo", series4Timestamp, prompb.Label{Name: "__replica__", Value: "replica1"}, prompb.Label{Name: "cluster", Value: "cluster1"})
77+
series5, _ := generateSeries("foo", series5Timestamp, prompb.Label{Name: "__replica__", Value: "replicaNoCluster"})
78+
series6, _ := generateSeries("foo", series6Timestamp, prompb.Label{Name: "cluster", Value: "clusterNoReplica"})
79+
series7, _ := generateSeries("foo", series7Timestamp, prompb.Label{Name: "other", Value: "label"})
80+
81+
res, err := distributorClient.Push([]prompb.TimeSeries{series1[0], series2[0], series3[0], series4[0], series5[0], series6[0], series7[0]})
82+
require.NoError(t, err)
83+
require.Equal(t, 200, res.StatusCode)
84+
85+
// Wait until the samples have been deduped.
86+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(2), "cortex_distributor_deduped_samples_total"))
87+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(3), "cortex_distributor_non_ha_samples_received_total"))
88+
89+
// Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check.
90+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
91+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(querierFlags, flags), "")
92+
require.NoError(t, s.StartAndWaitReady(querier, storeGateway))
93+
94+
// Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check
95+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total"))
96+
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
97+
98+
// Query back the series.
99+
querierClient, err := e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
100+
require.NoError(t, err)
101+
102+
// Query back the series (only in the ingesters).
103+
result, err := querierClient.Query("foo[5m]", series1Timestamp)
104+
require.NoError(t, err)
105+
106+
require.Equal(t, model.ValMatrix, result.Type())
107+
m := result.(model.Matrix)
108+
require.Equal(t, 5, m.Len())
109+
numValidHA := 0
110+
numNonHA := 0
111+
for _, ss := range m {
112+
replicaLabel, okReplica := ss.Metric["__replica__"]
113+
if okReplica {
114+
require.Equal(t, string(replicaLabel), "replicaNoCluster")
115+
}
116+
clusterLabel, okCluster := ss.Metric["cluster"]
117+
if okCluster {
118+
require.Equal(t, string(clusterLabel) == "cluster1" || string(clusterLabel) == "cluster0" || string(clusterLabel) == "clusterNoReplica", true)
119+
if clusterLabel == "cluster1" || clusterLabel == "cluster0" {
120+
numValidHA++
121+
}
122+
}
123+
if (okReplica && !okCluster && replicaLabel == "replicaNoCluster") || (okCluster && !okReplica && clusterLabel == "clusterNoReplica") || (!okCluster && !okReplica) {
124+
numNonHA++
125+
}
126+
require.NotEmpty(t, ss.Values)
127+
for _, v := range ss.Values {
128+
require.NotEmpty(t, v)
129+
}
130+
}
131+
require.Equal(t, numNonHA, 3)
132+
require.Equal(t, numValidHA, 2)
133+
134+
// Ensure no service-specific metrics prefix is used by the wrong service.
135+
assertServiceMetricsPrefixes(t, Distributor, distributor)
136+
assertServiceMetricsPrefixes(t, Ingester, ingester)
137+
assertServiceMetricsPrefixes(t, StoreGateway, storeGateway)
138+
assertServiceMetricsPrefixes(t, Querier, querier)
139+
})
140+
}

pkg/distributor/distributor.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
665665
// Cache user limit with overrides so we spend less CPU doing locking. See issue #4904
666666
limits := d.limits.GetOverridesForUser(userID)
667667

668-
if limits.AcceptHASamples && len(req.Timeseries) > 0 {
668+
if limits.AcceptHASamples && len(req.Timeseries) > 0 && !limits.AcceptMixedHASamples {
669669
cluster, replica := findHALabels(limits.HAReplicaLabel, limits.HAClusterLabel, req.Timeseries[0].Labels)
670670
removeReplica, err = d.checkSample(ctx, userID, cluster, replica, limits)
671671
if err != nil {
@@ -872,6 +872,29 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
872872
// check each sample and discard if outside limits.
873873
skipLabelNameValidation := d.cfg.SkipLabelNameValidation || req.GetSkipLabelNameValidation()
874874
for _, ts := range req.Timeseries {
875+
if limits.AcceptHASamples && limits.AcceptMixedHASamples {
876+
cluster, replica := findHALabels(limits.HAReplicaLabel, limits.HAClusterLabel, ts.Labels)
877+
if cluster != "" && replica != "" {
878+
_, err := d.checkSample(ctx, userID, cluster, replica, limits)
879+
if err != nil {
880+
// discard sample
881+
if errors.Is(err, ha.ReplicasNotMatchError{}) {
882+
// These samples have been deduped.
883+
d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(len(ts.Samples) + len(ts.Histograms)))
884+
}
885+
if errors.Is(err, ha.TooManyReplicaGroupsError{}) {
886+
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(len(ts.Samples) + len(ts.Histograms)))
887+
}
888+
889+
continue
890+
}
891+
removeReplica = true // valid HA sample
892+
} else {
893+
removeReplica = false // non HA sample
894+
d.nonHASamples.WithLabelValues(userID).Add(float64(len(ts.Samples) + len(ts.Histograms)))
895+
}
896+
}
897+
875898
// Use timestamp of latest sample in the series. If samples for series are not ordered, metric for user may be wrong.
876899
if len(ts.Samples) > 0 {
877900
latestSampleTimestampMs = max(latestSampleTimestampMs, ts.Samples[len(ts.Samples)-1].TimestampMs)

0 commit comments

Comments
 (0)