7
7
"fmt"
8
8
"io"
9
9
"math"
10
- "math/rand"
11
10
"net"
12
11
"net/http"
13
12
"net/http/httptest"
@@ -183,11 +182,7 @@ func TestIngesterDeletionRace(t *testing.T) {
183
182
limits := defaultLimitsTestConfig ()
184
183
tenantLimits := newMockTenantLimits (map [string ]* validation.Limits {userID : & limits })
185
184
cfg := defaultIngesterTestConfig (t )
186
- cfg .BlocksStorageConfig .TSDB .CloseIdleTSDBInterval = 5 * time .Millisecond
187
- cfg .BlocksStorageConfig .TSDB .CloseIdleTSDBTimeout = 10 * time .Second
188
- cfg .BlocksStorageConfig .TSDB .ExpandedCachingExpireInterval = 5 * time .Millisecond
189
185
cfg .BlocksStorageConfig .TSDB .PostingsCache = cortex_tsdb.TSDBPostingsCacheConfig {
190
- SeedSize : 3 , // lets make sure all metric names collide
191
186
Head : cortex_tsdb.PostingsCacheConfig {
192
187
Enabled : true ,
193
188
Ttl : time .Hour ,
@@ -215,7 +210,7 @@ func TestIngesterDeletionRace(t *testing.T) {
215
210
return ing .lifecycler .GetState ()
216
211
})
217
212
218
- numberOfTenants := 150
213
+ numberOfTenants := 50
219
214
wg := sync.WaitGroup {}
220
215
wg .Add (numberOfTenants )
221
216
@@ -234,11 +229,30 @@ func TestIngesterDeletionRace(t *testing.T) {
234
229
Matchers : []* client.LabelMatcher {{Type : client .REGEX_MATCH , Name : labels .MetricName , Value : ".*" }},
235
230
}, s )
236
231
require .NoError (t , err )
237
- time . Sleep ( time . Duration ( rand . Int63n ( 5 )) * time .Millisecond )
232
+ ing . getTSDB ( u ). postingCache = & wrappedExpandedPostingsCache { ExpandedPostingsCache : ing . getTSDB ( u ). postingCache , purgeDelay : 10 * time .Millisecond }
238
233
ing .getTSDB (u ).deletionMarkFound .Store (true ) // lets force close the tenant
239
234
}()
240
235
}
236
+
241
237
wg .Wait ()
238
+
239
+ ctx , c := context .WithCancel (context .Background ())
240
+ defer c ()
241
+
242
+ wg .Add (1 )
243
+ go func () {
244
+ wg .Done ()
245
+ ing .expirePostingsCache (ctx ) //nolint:errcheck
246
+ }()
247
+
248
+ go func () {
249
+ wg .Wait () // make sure we clean after we started the purge go routine
250
+ ing .closeAndDeleteIdleUserTSDBs (ctx ) //nolint:errcheck
251
+ }()
252
+
253
+ test .Poll (t , 5 * time .Second , 0 , func () interface {} {
254
+ return len (ing .getTSDBUsers ())
255
+ })
242
256
}
243
257
244
258
func TestIngesterPerLabelsetLimitExceeded (t * testing.T ) {
@@ -3592,6 +3606,17 @@ func (m *mockMetricsForLabelMatchersStreamServer) Context() context.Context {
3592
3606
return m .ctx
3593
3607
}
3594
3608
3609
+ type wrappedExpandedPostingsCache struct {
3610
+ cortex_tsdb.ExpandedPostingsCache
3611
+
3612
+ purgeDelay time.Duration
3613
+ }
3614
+
3615
+ func (w * wrappedExpandedPostingsCache ) PurgeExpiredItems () {
3616
+ time .Sleep (w .purgeDelay )
3617
+ w .ExpandedPostingsCache .PurgeExpiredItems ()
3618
+ }
3619
+
3595
3620
type mockQueryStreamServer struct {
3596
3621
grpc.ServerStream
3597
3622
ctx context.Context
0 commit comments