From dadc5cd2d1271a5f625e5f0015d497739e9f9f44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 13 Jul 2020 15:35:48 +0200 Subject: [PATCH 1/7] Compact TSDB on open. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If there is data in head that needs to be cut into blocks, this will force it. If there is no data, this is noop. Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 852cdc5ac99..b986228b2a2 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -895,6 +895,14 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { } db.DisableCompactions() // we will compact on our own schedule + // Run compaction before using this TSDB. If there is data in head that needs to be put into blocks, + // this will actually create the blocks. If there is no data (empty TSDB), this is a no-op. + // We don't do blocks compaction. + err = db.Compact() + if err != nil { + return nil, err + } + userDB.DB = db // We set the limiter here because we don't want to limit // series during WAL replay. From f49cac12cf92e58d3d4f425ab499a25a32833681 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 13 Jul 2020 15:58:00 +0200 Subject: [PATCH 2/7] Added test for compaction on open. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2_test.go | 74 ++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 054ed1ff5ab..1c574f383d8 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -22,6 +22,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -1602,3 +1603,76 @@ func pushSingleSample(t *testing.T, i *Ingester) { _, err := i.v2Push(ctx, req) require.NoError(t, err) } + +func TestHeadCompactionOnStartup(t *testing.T) { + // Create a temporary directory for TSDB + tempDir, err := ioutil.TempDir("", "tsdb") + require.NoError(t, err) + t.Cleanup(func() { + os.RemoveAll(tempDir) + }) + + // Build TSDB for user, with data covering 24 hours. + { + // Number of full chunks, 12 chunks for 24hrs. + numFullChunks := 12 + chunkRange := 2 * time.Hour.Milliseconds() + + userDir := filepath.Join(tempDir, userID) + require.NoError(t, os.Mkdir(userDir, 0700)) + + db, err := tsdb.Open(userDir, nil, nil, &tsdb.Options{ + RetentionDuration: int64(time.Hour * 25 / time.Millisecond), + NoLockfile: true, + MinBlockDuration: chunkRange, + MaxBlockDuration: chunkRange, + }) + require.NoError(t, err) + + db.DisableCompactions() + head := db.Head() + + l := labels.Labels{{"n", "v"}} + for i := 0; i < numFullChunks; i++ { + // Not using db.Appender() as it checks for compaction. + app := head.Appender() + _, err := app.Add(l, int64(i)*chunkRange+1, 9.99) + require.NoError(t, err) + _, err = app.Add(l, int64(i+1)*chunkRange, 9.99) + require.NoError(t, err) + require.NoError(t, app.Commit()) + } + + dur := time.Duration(head.MaxTime()-head.MinTime()) * time.Millisecond + require.True(t, dur > 23*time.Hour) + require.Empty(t, db.Blocks()) + require.NoError(t, db.Close()) + } + + clientCfg := defaultClientTestConfig() + limits := defaultLimitsTestConfig() + + overrides, err := validation.NewOverrides(limits, nil) + require.NoError(t, err) + + ingesterCfg := defaultIngesterTestConfig() + ingesterCfg.TSDBEnabled = true + ingesterCfg.TSDBConfig.Dir = tempDir + ingesterCfg.TSDBConfig.Backend = "s3" + ingesterCfg.TSDBConfig.S3.Endpoint = "localhost" + + ingester, err := NewV2(ingesterCfg, clientCfg, overrides, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingester)) + + defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck + + db := ingester.getTSDB(userID) + require.NotNil(t, db) + + h := db.Head() + + dur := time.Duration(h.MaxTime()-h.MinTime()) * time.Millisecond + require.True(t, dur < 4*time.Hour) + require.NotEmpty(t, db.Blocks()) +} From f88671f43ea42e116099e98e7ff13aea50b22fa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 13 Jul 2020 15:58:56 +0200 Subject: [PATCH 3/7] Added CHANGELOG.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 93f0f74a025..ca17be07c26 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * [CHANGE] Ingester: Chunks flushed via /flush stay in memory until retention period is reached. This affects `cortex_ingester_memory_chunks` metric. #2778 * [CHANGE] Querier: the error message returned when the query time range exceeds `-store.max-query-length` has changed from `invalid query, length > limit (X > Y)` to `the query time range exceeds the limit (query length: X, limit: Y)`. #2826 * [CHANGE] KV: The `role` label which was a label of `multi` KV store client only has been added to metrics of every KV store client. If KV store client is not `multi`, then the value of `role` label is `primary`. #2837 +* [CHANGE] Experimental TSDB: compact head when opening TSDB. This should only affect ingester startup after it was unable to compact head in previous run. #2870 * [FEATURE] Introduced `ruler.for-outage-tolerance`, Max time to tolerate outage for restoring "for" state of alert. #2783 * [FEATURE] Introduced `ruler.for-grace-period`, Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. #2783 * [FEATURE] Introduced `ruler.resend-delay`, Minimum amount of time to wait before resending an alert to Alertmanager. #2783 From 76bcd178ab0713db102b914362f77fe29ebc330f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Mon, 13 Jul 2020 16:06:27 +0200 Subject: [PATCH 4/7] Added field names. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 1c574f383d8..389537823b7 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -1632,7 +1632,7 @@ func TestHeadCompactionOnStartup(t *testing.T) { db.DisableCompactions() head := db.Head() - l := labels.Labels{{"n", "v"}} + l := labels.Labels{{Name: "n", Value: "v"}} for i := 0; i < numFullChunks; i++ { // Not using db.Appender() as it checks for compaction. app := head.Appender() From 149e604a1a88f9ac422d497a63ee43633a57511b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 14 Jul 2020 08:25:18 +0200 Subject: [PATCH 5/7] Update comment. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index b986228b2a2..ca24c6b8f70 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -896,8 +896,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { db.DisableCompactions() // we will compact on our own schedule // Run compaction before using this TSDB. If there is data in head that needs to be put into blocks, - // this will actually create the blocks. If there is no data (empty TSDB), this is a no-op. - // We don't do blocks compaction. + // this will actually create the blocks. If there is no data (empty TSDB), this is a no-op, although + // local blocks compaction may still take place if configured. err = db.Compact() if err != nil { return nil, err From 8ed2f4dfba18d5436deb79325c90d334f0674299 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 14 Jul 2020 08:26:39 +0200 Subject: [PATCH 6/7] Wrap errors, add extra details. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index ca24c6b8f70..8cf8fd523d2 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -891,7 +891,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { SeriesLifecycleCallback: userDB, }) if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir) } db.DisableCompactions() // we will compact on our own schedule @@ -900,7 +900,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { // local blocks compaction may still take place if configured. err = db.Compact() if err != nil { - return nil, err + return nil, errors.Wrapf(err, "failed to compact TSDB: %s", udir) } userDB.DB = db From 65cb9aaa674327631ad04adceebb46fa3621e0d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 14 Jul 2020 08:33:42 +0200 Subject: [PATCH 7/7] Update test. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 389537823b7..de5a981fe1f 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -1645,7 +1645,7 @@ func TestHeadCompactionOnStartup(t *testing.T) { dur := time.Duration(head.MaxTime()-head.MinTime()) * time.Millisecond require.True(t, dur > 23*time.Hour) - require.Empty(t, db.Blocks()) + require.Equal(t, 0, len(db.Blocks())) require.NoError(t, db.Close()) } @@ -1660,6 +1660,7 @@ func TestHeadCompactionOnStartup(t *testing.T) { ingesterCfg.TSDBConfig.Dir = tempDir ingesterCfg.TSDBConfig.Backend = "s3" ingesterCfg.TSDBConfig.S3.Endpoint = "localhost" + ingesterCfg.TSDBConfig.Retention = 2 * 24 * time.Hour // Make sure that no newly created blocks are deleted. ingester, err := NewV2(ingesterCfg, clientCfg, overrides, nil) require.NoError(t, err) @@ -1673,6 +1674,6 @@ func TestHeadCompactionOnStartup(t *testing.T) { h := db.Head() dur := time.Duration(h.MaxTime()-h.MinTime()) * time.Millisecond - require.True(t, dur < 4*time.Hour) - require.NotEmpty(t, db.Blocks()) + require.True(t, dur <= 2*time.Hour) + require.Equal(t, 11, len(db.Blocks())) }