Skip to content

Commit 038760e

Browse files
committed
Fix backfill TSDB replay with tests for it
Signed-off-by: Ganesh Vernekar <[email protected]>
1 parent ed6cf63 commit 038760e

File tree

3 files changed

+69
-12
lines changed

3 files changed

+69
-12
lines changed

pkg/ingester/ingester_v2.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1088,8 +1088,15 @@ func (i *Ingester) closeAllTSDB() {
10881088
}(userDB)
10891089
}
10901090

1091-
// Wait until all Close() completed
1091+
wg.Add(1)
1092+
go func() {
1093+
defer wg.Done()
1094+
i.closeAllBackfillTSDBs()
1095+
}()
1096+
10921097
i.userStatesMtx.Unlock()
1098+
1099+
// Wait until all Close() completed
10931100
wg.Wait()
10941101
}
10951102

pkg/ingester/ingester_v2_backfill.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ func (i *Ingester) getOrCreateBackfillTSDB(userID string, ts int64) (*tsdbBucket
124124
insertIdx := len(userBuckets)
125125
for idx, b := range userBuckets {
126126
if ts >= b.bucketStart && ts < b.bucketEnd {
127-
fmt.Println("bucket found")
128127
return b, userBuckets, nil
129128
}
130129

@@ -201,7 +200,7 @@ func (i *Ingester) openExistingBackfillTSDB(ctx context.Context) error {
201200
}
202201

203202
for bucketID, bucketName := range bucketNames {
204-
if bucketName.IsDir() {
203+
if !bucketName.IsDir() {
205204
continue
206205
}
207206

@@ -229,7 +228,7 @@ func (i *Ingester) openExistingBackfillTSDB(ctx context.Context) error {
229228
}
230229

231230
wg.Add(1)
232-
go func(bucketID int, userID, bucketName string) {
231+
go func(bucketID int, userID, bucketName, dbDir string) {
233232
defer wg.Done()
234233
defer openGate.Done()
235234
defer func(ts time.Time) {
@@ -241,7 +240,7 @@ func (i *Ingester) openExistingBackfillTSDB(ctx context.Context) error {
241240
level.Error(util.Logger).Log("msg", "unable to get bucket range", "err", err, "user", userID, "bucketName", bucketName)
242241
return
243242
}
244-
db, err := i.createNewTSDB(userID, filepath.Join(userID, bucketName), (end-start)*2, (end-start)*2, prometheus.NewRegistry())
243+
db, err := i.createNewTSDB(userID, dbDir, (end-start)*2, (end-start)*2, prometheus.NewRegistry())
245244
if err != nil {
246245
level.Error(util.Logger).Log("msg", "unable to open user backfill TSDB", "err", err, "user", userID)
247246
return
@@ -258,7 +257,7 @@ func (i *Ingester) openExistingBackfillTSDB(ctx context.Context) error {
258257
// Append at the end, we will sort it at the end.
259258
i.TSDBState.backfillDBs.tsdbs[userID] = append(i.TSDBState.backfillDBs.tsdbs[userID], bucket)
260259
i.TSDBState.backfillDBs.tsdbsMtx.Unlock()
261-
}(bucketID, userID, bucketName.Name())
260+
}(bucketID, userID, bucketName.Name(), dbPath)
262261
}
263262

264263
if runErr != nil {
@@ -344,6 +343,30 @@ func (i *Ingester) backfillSelect(ctx context.Context, userID string, from, thro
344343
return result, nil
345344
}
346345

346+
func (i *Ingester) closeAllBackfillTSDBs() {
347+
i.TSDBState.backfillDBs.tsdbsMtx.Lock()
348+
defer i.TSDBState.backfillDBs.tsdbsMtx.Unlock()
349+
350+
wg := &sync.WaitGroup{}
351+
352+
for userID, buckets := range i.TSDBState.backfillDBs.tsdbs {
353+
for _, bucket := range buckets {
354+
wg.Add(1)
355+
go func(uid string, db *userTSDB) {
356+
defer wg.Done()
357+
358+
if err := db.Close(); err != nil {
359+
level.Warn(util.Logger).Log("msg", "unable to close backfill TSDB", "user", uid, "bucket_dir", db.Dir(), "err", err)
360+
}
361+
}(userID, bucket.db)
362+
}
363+
}
364+
365+
wg.Wait()
366+
// Clear all DBs irrespective of them closing.
367+
i.TSDBState.backfillDBs.tsdbs = make(map[string][]*tsdbBucket)
368+
}
369+
347370
// Assumes 1h bucket range for . TODO(codesome): protect stuff with locks.
348371
type backfillTSDBs struct {
349372
tsdbsMtx sync.RWMutex

pkg/ingester/ingester_v2_test.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1820,11 +1820,11 @@ func TestIngesterV2BackfillPushAndQuery(t *testing.T) {
18201820
ts := 100 * time.Hour.Milliseconds()
18211821
ingestSample(ts, 0, false)
18221822

1823-
// 99.5h
1823+
// 99.5h, to the main TSDB.
18241824
ts = 99*time.Hour.Milliseconds() + 30*time.Minute.Milliseconds()
18251825
ingestSample(ts, 0, false)
18261826

1827-
// 99h
1827+
// 99h, to the main TSDB.
18281828
ts = 99 * time.Hour.Milliseconds()
18291829
ingestSample(ts, 0, false)
18301830

@@ -1848,12 +1848,12 @@ func TestIngesterV2BackfillPushAndQuery(t *testing.T) {
18481848
ts = (100-12)*time.Hour.Milliseconds() + 1*time.Millisecond.Milliseconds()
18491849
ingestSample(ts, 4, false)
18501850

1851-
// 100h-12h, out of bounds even for backfill.
1852-
ts = (100 - 12) * time.Hour.Milliseconds()
1851+
// 100h-backfillLimit, out of bounds even for backfill.
1852+
ts = 100*time.Hour.Milliseconds() - backfillLimit.Milliseconds()
18531853
ingestSample(ts, 4, true)
18541854

1855-
// 100h-13h, out of bounds even for backfill.
1856-
ts = (100 - 13) * time.Hour.Milliseconds()
1855+
// 99h-backfillLimit, out of bounds even for backfill.
1856+
ts = 99*time.Hour.Milliseconds() - backfillLimit.Milliseconds()
18571857
ingestSample(ts, 4, true)
18581858

18591859
// Query back all the samples.
@@ -1865,4 +1865,31 @@ func TestIngesterV2BackfillPushAndQuery(t *testing.T) {
18651865
require.NoError(t, err)
18661866
require.NotNil(t, res)
18671867
assert.Equal(t, expectedIngested, res.Timeseries)
1868+
1869+
// Restart to check if we can still query backfill TSDBs.
1870+
1871+
// Stop ingester.
1872+
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), i))
1873+
1874+
overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
1875+
require.NoError(t, err)
1876+
1877+
i, err = NewV2(i.cfg, defaultClientTestConfig(), overrides, nil)
1878+
require.NoError(t, err)
1879+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
1880+
1881+
// Wait until it's ACTIVE
1882+
test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} {
1883+
return i.lifecycler.GetState()
1884+
})
1885+
1886+
// Query back all the samples.
1887+
res, err = i.v2Query(ctx, &client.QueryRequest{
1888+
StartTimestampMs: math.MinInt64,
1889+
EndTimestampMs: math.MaxInt64,
1890+
Matchers: []*client.LabelMatcher{{Type: client.REGEX_MATCH, Name: labels.MetricName, Value: ".*"}},
1891+
})
1892+
require.NoError(t, err)
1893+
require.NotNil(t, res)
1894+
assert.Equal(t, expectedIngested, res.Timeseries)
18681895
}

0 commit comments

Comments
 (0)