Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [CHANGE] Ingester: Chunks flushed via /flush stay in memory until retention period is reached. This affects `cortex_ingester_memory_chunks` metric. #2778
* [CHANGE] Querier: the error message returned when the query time range exceeds `-store.max-query-length` has changed from `invalid query, length > limit (X > Y)` to `the query time range exceeds the limit (query length: X, limit: Y)`. #2826
* [CHANGE] KV: The `role` label which was a label of `multi` KV store client only has been added to metrics of every KV store client. If KV store client is not `multi`, then the value of `role` label is `primary`. #2837
* [CHANGE] Experimental TSDB: compact head when opening TSDB. This should only affect ingester startup after it was unable to compact head in previous run. #2870
* [FEATURE] Introduced `ruler.for-outage-tolerance`, Max time to tolerate outage for restoring "for" state of alert. #2783
* [FEATURE] Introduced `ruler.for-grace-period`, Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. #2783
* [FEATURE] Introduced `ruler.resend-delay`, Minimum amount of time to wait before resending an alert to Alertmanager. #2783
Expand Down
10 changes: 9 additions & 1 deletion pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,10 +891,18 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
SeriesLifecycleCallback: userDB,
})
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)
}
db.DisableCompactions() // we will compact on our own schedule

// Run compaction before using this TSDB. If there is data in head that needs to be put into blocks,
// this will actually create the blocks. If there is no data (empty TSDB), this is a no-op, although
// local blocks compaction may still take place if configured.
err = db.Compact()
if err != nil {
return nil, errors.Wrapf(err, "failed to compact TSDB: %s", udir)
}

userDB.DB = db
// We set the limiter here because we don't want to limit
// series during WAL replay.
Expand Down
75 changes: 75 additions & 0 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1602,3 +1603,77 @@ func pushSingleSample(t *testing.T, i *Ingester) {
_, err := i.v2Push(ctx, req)
require.NoError(t, err)
}

func TestHeadCompactionOnStartup(t *testing.T) {
// Create a temporary directory for TSDB
tempDir, err := ioutil.TempDir("", "tsdb")
require.NoError(t, err)
t.Cleanup(func() {
os.RemoveAll(tempDir)
})

// Build TSDB for user, with data covering 24 hours.
{
// Number of full chunks, 12 chunks for 24hrs.
numFullChunks := 12
chunkRange := 2 * time.Hour.Milliseconds()

userDir := filepath.Join(tempDir, userID)
require.NoError(t, os.Mkdir(userDir, 0700))

db, err := tsdb.Open(userDir, nil, nil, &tsdb.Options{
RetentionDuration: int64(time.Hour * 25 / time.Millisecond),
NoLockfile: true,
MinBlockDuration: chunkRange,
MaxBlockDuration: chunkRange,
})
require.NoError(t, err)

db.DisableCompactions()
head := db.Head()

l := labels.Labels{{Name: "n", Value: "v"}}
for i := 0; i < numFullChunks; i++ {
// Not using db.Appender() as it checks for compaction.
app := head.Appender()
_, err := app.Add(l, int64(i)*chunkRange+1, 9.99)
require.NoError(t, err)
_, err = app.Add(l, int64(i+1)*chunkRange, 9.99)
require.NoError(t, err)
require.NoError(t, app.Commit())
}

dur := time.Duration(head.MaxTime()-head.MinTime()) * time.Millisecond
require.True(t, dur > 23*time.Hour)
require.Equal(t, 0, len(db.Blocks()))
require.NoError(t, db.Close())
}

clientCfg := defaultClientTestConfig()
limits := defaultLimitsTestConfig()

overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

ingesterCfg := defaultIngesterTestConfig()
ingesterCfg.TSDBEnabled = true
ingesterCfg.TSDBConfig.Dir = tempDir
ingesterCfg.TSDBConfig.Backend = "s3"
ingesterCfg.TSDBConfig.S3.Endpoint = "localhost"
ingesterCfg.TSDBConfig.Retention = 2 * 24 * time.Hour // Make sure that no newly created blocks are deleted.

ingester, err := NewV2(ingesterCfg, clientCfg, overrides, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingester))

defer services.StopAndAwaitTerminated(context.Background(), ingester) //nolint:errcheck

db := ingester.getTSDB(userID)
require.NotNil(t, db)

h := db.Head()

dur := time.Duration(h.MaxTime()-h.MinTime()) * time.Millisecond
require.True(t, dur <= 2*time.Hour)
require.Equal(t, 11, len(db.Blocks()))
}