Skip to content

Commit 68f32b9

Browse files
bborehamalvinlin123
authored andcommitted
Distributor unit tests: shut down more goroutines (cortexproject#4506)
* Distributor tests: move HA-Tracker into 'prepare' function This way, it gets shut down cleanly when the distributor exits. Signed-off-by: Bryan Boreham <[email protected]> * Distributor tests: stop services via cleanup func This is fewer lines of code, and covers every tests - previously a few like `TestDistributor_Push_ExemplarValidation` didn't call `stopAll()`. Signed-off-by: Bryan Boreham <[email protected]> Signed-off-by: Alvin Lin <[email protected]>
1 parent 978f676 commit 68f32b9

File tree

1 file changed

+38
-52
lines changed

1 file changed

+38
-52
lines changed

pkg/distributor/distributor_test.go

Lines changed: 38 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -258,15 +258,14 @@ func TestDistributor_Push(t *testing.T) {
258258
limits.IngestionRate = 20
259259
limits.IngestionBurstSize = 20
260260

261-
ds, _, r, regs := prepare(t, prepConfig{
261+
ds, _, regs := prepare(t, prepConfig{
262262
numIngesters: tc.numIngesters,
263263
happyIngesters: tc.happyIngesters,
264264
numDistributors: 1,
265265
shardByAllLabels: shardByAllLabels,
266266
limits: limits,
267267
errFail: tc.ingesterError,
268268
})
269-
defer stopAll(ds, r)
270269

271270
request := makeWriteRequest(tc.samples.startTimestampMs, tc.samples.num, tc.metadata)
272271
response, err := ds[0].Push(ctx, request)
@@ -288,7 +287,7 @@ func TestDistributor_Push(t *testing.T) {
288287
}
289288

290289
func TestDistributor_MetricsCleanup(t *testing.T) {
291-
dists, _, _, regs := prepare(t, prepConfig{
290+
dists, _, regs := prepare(t, prepConfig{
292291
numDistributors: 1,
293292
})
294293
d := dists[0]
@@ -465,14 +464,13 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
465464
limits.IngestionBurstSize = testData.ingestionBurstSize
466465

467466
// Start all expected distributors
468-
distributors, _, r, _ := prepare(t, prepConfig{
467+
distributors, _, _ := prepare(t, prepConfig{
469468
numIngesters: 3,
470469
happyIngesters: 3,
471470
numDistributors: testData.distributors,
472471
shardByAllLabels: true,
473472
limits: limits,
474473
})
475-
defer stopAll(distributors, r)
476474

477475
// Push samples in multiple requests to the first distributor
478476
for _, push := range testData.pushes {
@@ -605,7 +603,7 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
605603
flagext.DefaultValues(limits)
606604

607605
// Start all expected distributors
608-
distributors, _, r, regs := prepare(t, prepConfig{
606+
distributors, _, regs := prepare(t, prepConfig{
609607
numIngesters: 3,
610608
happyIngesters: 3,
611609
numDistributors: 1,
@@ -614,7 +612,6 @@ func TestDistributor_PushInstanceLimits(t *testing.T) {
614612
maxInflightRequests: testData.inflightLimit,
615613
maxIngestionRate: testData.ingestionRateLimit,
616614
})
617-
defer stopAll(distributors, r)
618615

619616
d := distributors[0]
620617
d.inflightPushRequests.Add(int64(testData.preInflight))
@@ -698,34 +695,17 @@ func TestDistributor_PushHAInstances(t *testing.T) {
698695
limits.AcceptHASamples = true
699696
limits.MaxLabelValueLength = 15
700697

701-
ds, _, r, _ := prepare(t, prepConfig{
698+
ds, _, _ := prepare(t, prepConfig{
702699
numIngesters: 3,
703700
happyIngesters: 3,
704701
numDistributors: 1,
705702
shardByAllLabels: shardByAllLabels,
706703
limits: &limits,
704+
enableTracker: tc.enableTracker,
707705
})
708-
defer stopAll(ds, r)
709-
codec := GetReplicaDescCodec()
710706

711-
ringStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil)
712-
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
713-
714-
mock := kv.PrefixClient(ringStore, "prefix")
715707
d := ds[0]
716708

717-
if tc.enableTracker {
718-
r, err := newHATracker(HATrackerConfig{
719-
EnableHATracker: true,
720-
KVStore: kv.Config{Mock: mock},
721-
UpdateTimeout: 100 * time.Millisecond,
722-
FailoverTimeout: time.Second,
723-
}, trackerLimits{maxClusters: 100}, nil, log.NewNopLogger())
724-
require.NoError(t, err)
725-
require.NoError(t, services.StartAndAwaitRunning(context.Background(), r))
726-
d.HATracker = r
727-
}
728-
729709
userID, err := tenant.TenantID(ctx)
730710
assert.NoError(t, err)
731711
err = d.HATracker.checkReplica(ctx, userID, tc.cluster, tc.acceptedReplica, time.Now())
@@ -879,15 +859,14 @@ func TestDistributor_PushQuery(t *testing.T) {
879859

880860
for _, tc := range testcases {
881861
t.Run(tc.name, func(t *testing.T) {
882-
ds, ingesters, r, _ := prepare(t, prepConfig{
862+
ds, ingesters, _ := prepare(t, prepConfig{
883863
numIngesters: tc.numIngesters,
884864
happyIngesters: tc.happyIngesters,
885865
numDistributors: 1,
886866
shardByAllLabels: tc.shardByAllLabels,
887867
shuffleShardEnabled: tc.shuffleShardEnabled,
888868
shuffleShardSize: shuffleShardSize,
889869
})
890-
defer stopAll(ds, r)
891870

892871
request := makeWriteRequest(0, tc.samples, tc.metadata)
893872
writeResponse, err := ds[0].Push(ctx, request)
@@ -931,7 +910,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
931910
limits.MaxChunksPerQuery = maxChunksLimit
932911

933912
// Prepare distributors.
934-
ds, _, r, _ := prepare(t, prepConfig{
913+
ds, _, _ := prepare(t, prepConfig{
935914
numIngesters: 3,
936915
happyIngesters: 3,
937916
numDistributors: 1,
@@ -940,7 +919,6 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
940919
})
941920

942921
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(0, 0, maxChunksLimit))
943-
defer stopAll(ds, r)
944922

945923
// Push a number of series below the max chunks limit. Each series has 1 sample,
946924
// so expect 1 chunk per series when querying back.
@@ -988,14 +966,13 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac
988966
ctx = limiter.AddQueryLimiterToContext(ctx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0))
989967

990968
// Prepare distributors.
991-
ds, _, r, _ := prepare(t, prepConfig{
969+
ds, _, _ := prepare(t, prepConfig{
992970
numIngesters: 3,
993971
happyIngesters: 3,
994972
numDistributors: 1,
995973
shardByAllLabels: true,
996974
limits: limits,
997975
})
998-
defer stopAll(ds, r)
999976

1000977
// Push a number of series below the max series limit.
1001978
initialSeries := maxSeriesLimit
@@ -1042,15 +1019,14 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
10421019
// Prepare distributors.
10431020
// Use replication factor of 2 to always read all the chunks from both ingesters,
10441021
// this guarantees us to always read the same chunks and have a stable test.
1045-
ds, _, r, _ := prepare(t, prepConfig{
1022+
ds, _, _ := prepare(t, prepConfig{
10461023
numIngesters: 2,
10471024
happyIngesters: 2,
10481025
numDistributors: 1,
10491026
shardByAllLabels: true,
10501027
limits: limits,
10511028
replicationFactor: 2,
10521029
})
1053-
defer stopAll(ds, r)
10541030

10551031
allSeriesMatchers := []*labels.Matcher{
10561032
labels.MustNewMatcher(labels.MatchRegexp, model.MetricNameLabel, ".+"),
@@ -1165,14 +1141,13 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) {
11651141
limits.DropLabels = tc.removeLabels
11661142
limits.AcceptHASamples = tc.removeReplica
11671143

1168-
ds, ingesters, r, _ := prepare(t, prepConfig{
1144+
ds, ingesters, _ := prepare(t, prepConfig{
11691145
numIngesters: 2,
11701146
happyIngesters: 2,
11711147
numDistributors: 1,
11721148
shardByAllLabels: true,
11731149
limits: &limits,
11741150
})
1175-
defer stopAll(ds, r)
11761151

11771152
// Push the series to the distributor
11781153
req := mockWriteRequest(tc.inputSeries, 1, 1)
@@ -1270,14 +1245,13 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *
12701245

12711246
for testName, testData := range tests {
12721247
t.Run(testName, func(t *testing.T) {
1273-
ds, ingesters, r, _ := prepare(t, prepConfig{
1248+
ds, ingesters, _ := prepare(t, prepConfig{
12741249
numIngesters: 2,
12751250
happyIngesters: 2,
12761251
numDistributors: 1,
12771252
shardByAllLabels: true,
12781253
limits: &limits,
12791254
})
1280-
defer stopAll(ds, r)
12811255

12821256
// Push the series to the distributor
12831257
req := mockWriteRequest(testData.inputSeries, 1, 1)
@@ -1331,7 +1305,7 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) {
13311305

13321306
for testName, tc := range tests {
13331307
t.Run(testName, func(t *testing.T) {
1334-
ds, _, _, _ := prepare(t, prepConfig{
1308+
ds, _, _ := prepare(t, prepConfig{
13351309
numIngesters: 2,
13361310
happyIngesters: 2,
13371311
numDistributors: 1,
@@ -1393,7 +1367,7 @@ func TestDistributor_Push_ExemplarValidation(t *testing.T) {
13931367

13941368
for testName, tc := range tests {
13951369
t.Run(testName, func(t *testing.T) {
1396-
ds, _, _, _ := prepare(t, prepConfig{
1370+
ds, _, _ := prepare(t, prepConfig{
13971371
numIngesters: 2,
13981372
happyIngesters: 2,
13991373
numDistributors: 1,
@@ -1694,14 +1668,13 @@ func TestSlowQueries(t *testing.T) {
16941668
expectedErr = errFail
16951669
}
16961670

1697-
ds, _, r, _ := prepare(t, prepConfig{
1671+
ds, _, _ := prepare(t, prepConfig{
16981672
numIngesters: nIngesters,
16991673
happyIngesters: happy,
17001674
numDistributors: 1,
17011675
queryDelay: 100 * time.Millisecond,
17021676
shardByAllLabels: shardByAllLabels,
17031677
})
1704-
defer stopAll(ds, r)
17051678

17061679
_, err := ds[0].Query(ctx, 0, 10, nameMatcher)
17071680
assert.Equal(t, expectedErr, err)
@@ -1804,15 +1777,14 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
18041777
now := model.Now()
18051778

18061779
// Create distributor
1807-
ds, ingesters, r, _ := prepare(t, prepConfig{
1780+
ds, ingesters, _ := prepare(t, prepConfig{
18081781
numIngesters: numIngesters,
18091782
happyIngesters: numIngesters,
18101783
numDistributors: 1,
18111784
shardByAllLabels: true,
18121785
shuffleShardEnabled: testData.shuffleShardEnabled,
18131786
shuffleShardSize: testData.shuffleShardSize,
18141787
})
1815-
defer stopAll(ds, r)
18161788

18171789
// Push fixtures
18181790
ctx := user.InjectOrgID(context.Background(), "test")
@@ -1863,7 +1835,7 @@ func TestDistributor_MetricsMetadata(t *testing.T) {
18631835
for testName, testData := range tests {
18641836
t.Run(testName, func(t *testing.T) {
18651837
// Create distributor
1866-
ds, ingesters, r, _ := prepare(t, prepConfig{
1838+
ds, ingesters, _ := prepare(t, prepConfig{
18671839
numIngesters: numIngesters,
18681840
happyIngesters: numIngesters,
18691841
numDistributors: 1,
@@ -1872,7 +1844,6 @@ func TestDistributor_MetricsMetadata(t *testing.T) {
18721844
shuffleShardSize: testData.shuffleShardSize,
18731845
limits: nil,
18741846
})
1875-
defer stopAll(ds, r)
18761847

18771848
// Push metadata
18781849
ctx := user.InjectOrgID(context.Background(), "test")
@@ -1927,10 +1898,11 @@ type prepConfig struct {
19271898
maxInflightRequests int
19281899
maxIngestionRate float64
19291900
replicationFactor int
1901+
enableTracker bool
19301902
errFail error
19311903
}
19321904

1933-
func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *ring.Ring, []*prometheus.Registry) {
1905+
func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, []*prometheus.Registry) {
19341906
ingesters := []mockIngester{}
19351907
for i := 0; i < cfg.happyIngesters; i++ {
19361908
ingesters = append(ingesters, mockIngester{
@@ -2032,6 +2004,20 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin
20322004
cfg.limits.IngestionTenantShardSize = cfg.shuffleShardSize
20332005
}
20342006

2007+
if cfg.enableTracker {
2008+
codec := GetReplicaDescCodec()
2009+
ringStore, closer := consul.NewInMemoryClient(codec, log.NewNopLogger(), nil)
2010+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
2011+
mock := kv.PrefixClient(ringStore, "prefix")
2012+
distributorCfg.HATrackerConfig = HATrackerConfig{
2013+
EnableHATracker: true,
2014+
KVStore: kv.Config{Mock: mock},
2015+
UpdateTimeout: 100 * time.Millisecond,
2016+
FailoverTimeout: time.Second,
2017+
}
2018+
cfg.limits.HAMaxClusters = 100
2019+
}
2020+
20352021
overrides, err := validation.NewOverrides(*cfg.limits, nil)
20362022
require.NoError(t, err)
20372023

@@ -2052,7 +2038,9 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin
20522038
})
20532039
}
20542040

2055-
return distributors, ingesters, ingestersRing, registries
2041+
t.Cleanup(func() { stopAll(distributors, ingestersRing) })
2042+
2043+
return distributors, ingesters, registries
20562044
}
20572045

20582046
func stopAll(ds []*Distributor, r *ring.Ring) {
@@ -2550,14 +2538,13 @@ func TestDistributorValidation(t *testing.T) {
25502538
limits.RejectOldSamplesMaxAge = model.Duration(24 * time.Hour)
25512539
limits.MaxLabelNamesPerSeries = 2
25522540

2553-
ds, _, r, _ := prepare(t, prepConfig{
2541+
ds, _, _ := prepare(t, prepConfig{
25542542
numIngesters: 3,
25552543
happyIngesters: 3,
25562544
numDistributors: 1,
25572545
shardByAllLabels: true,
25582546
limits: &limits,
25592547
})
2560-
defer stopAll(ds, r)
25612548

25622549
_, err := ds[0].Push(ctx, cortexpb.ToWriteRequest(tc.labels, tc.samples, tc.metadata, cortexpb.API))
25632550
require.Equal(t, tc.err, err)
@@ -2701,14 +2688,13 @@ func TestDistributor_Push_Relabel(t *testing.T) {
27012688
flagext.DefaultValues(&limits)
27022689
limits.MetricRelabelConfigs = tc.metricRelabelConfigs
27032690

2704-
ds, ingesters, r, _ := prepare(t, prepConfig{
2691+
ds, ingesters, _ := prepare(t, prepConfig{
27052692
numIngesters: 2,
27062693
happyIngesters: 2,
27072694
numDistributors: 1,
27082695
shardByAllLabels: true,
27092696
limits: &limits,
27102697
})
2711-
defer stopAll(ds, r)
27122698

27132699
// Push the series to the distributor
27142700
req := mockWriteRequest(tc.inputSeries, 1, 1)

0 commit comments

Comments
 (0)