diff --git a/CHANGELOG.md b/CHANGELOG.md index 624ad8dfa8e..f1b62bff23f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ * [ENHANCEMENT] Ingester: added following per-ingester (instance) limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992. * [ENHANCEMENT] Cortex is now built with Go 1.16. #4062 * [ENHANCEMENT] Ruler: Added `-ruler.enabled-tenants` and `-ruler.disabled-tenants` to explicitly enable or disable rules processing for specific tenants. #4074 +* [ENHANCEMENT] Block Storage Ingester: `/flush` now accepts two new parameters: `tenant` to specify tenant to flush and `wait=true` to make call synchronous. Multiple tenants can be specified by repeating `tenant` parameter. If no `tenant` is specified, all tenants are flushed, as before. #4073 * [BUGFIX] Ruler-API: fix bug where `/api/v1/rules//` endpoint return `400` instead of `404`. #4013 * [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948 * [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959 diff --git a/docs/api/_index.md b/docs/api/_index.md index 857315b27d3..3c4141852bf 100644 --- a/docs/api/_index.md +++ b/docs/api/_index.md @@ -247,6 +247,10 @@ GET,POST /flush Triggers a flush of the in-memory time series data (chunks or blocks) to the long-term storage. This endpoint triggers the flush also when `-ingester.flush-on-shutdown-with-wal-enabled` or `-blocks-storage.tsdb.flush-blocks-on-shutdown` are disabled. +When using blocks storage, this endpoint accepts `tenant` parameter to specify tenant whose blocks are compacted and shipped. This parameter may be specified multiple times to select more tenants. If no tenant is specified, all tenants are flushed. + +Flush endpoint now also accepts `wait=true` parameter, which makes the call synchronous – it will only return after flushing has finished. Note that returned status code does not reflect the result of flush operation. This parameter is only available when using blocks storage. + ### Shutdown ``` diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 88993293325..587613fd5ab 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -387,8 +387,8 @@ type TSDBState struct { tsdbMetrics *tsdbMetrics - forceCompactTrigger chan chan<- struct{} - shipTrigger chan chan<- struct{} + forceCompactTrigger chan requestWithUsersAndCallback + shipTrigger chan requestWithUsersAndCallback // Timeout chosen for idle compactions. compactionIdleTimeout time.Duration @@ -405,6 +405,11 @@ type TSDBState struct { idleTsdbChecks *prometheus.CounterVec } +type requestWithUsersAndCallback struct { + users *util.AllowedTenants // if nil, all tenants are allowed. + callback chan<- struct{} // when compaction/shipping is finished, this channel is closed +} + func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer) TSDBState { idleTsdbChecks := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingester_idle_tsdb_checks_total", @@ -426,8 +431,8 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer dbs: make(map[string]*userTSDB), bucket: bucketClient, tsdbMetrics: newTSDBMetrics(registerer), - forceCompactTrigger: make(chan chan<- struct{}), - shipTrigger: make(chan chan<- struct{}), + forceCompactTrigger: make(chan requestWithUsersAndCallback), + shipTrigger: make(chan requestWithUsersAndCallback), compactionsTriggered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_ingester_tsdb_compactions_triggered_total", @@ -1707,16 +1712,11 @@ func (i *Ingester) shipBlocksLoop(ctx context.Context) error { for { select { case <-shipTicker.C: - i.shipBlocks(ctx) + i.shipBlocks(ctx, nil) - case ch := <-i.TSDBState.shipTrigger: - i.shipBlocks(ctx) - - // Notify back. - select { - case ch <- struct{}{}: - default: // Nobody is waiting for notification, don't block this loop. - } + case req := <-i.TSDBState.shipTrigger: + i.shipBlocks(ctx, req.users) + close(req.callback) // Notify back. case <-ctx.Done(): return nil @@ -1724,7 +1724,8 @@ func (i *Ingester) shipBlocksLoop(ctx context.Context) error { } } -func (i *Ingester) shipBlocks(ctx context.Context) { +// shipBlocks runs shipping for all users. +func (i *Ingester) shipBlocks(ctx context.Context, allowed *util.AllowedTenants) { // Do not ship blocks if the ingester is PENDING or JOINING. It's // particularly important for the JOINING state because there could // be a blocks transfer in progress (from another ingester) and if we @@ -1739,6 +1740,10 @@ func (i *Ingester) shipBlocks(ctx context.Context) { // Number of concurrent workers is limited in order to avoid to concurrently sync a lot // of tenants in a large cluster. _ = concurrency.ForEachUser(ctx, i.getTSDBUsers(), i.cfg.BlocksStorageConfig.TSDB.ShipConcurrency, func(ctx context.Context, userID string) error { + if !allowed.IsAllowed(userID) { + return nil + } + // Get the user's DB. If the user doesn't exist, we skip it. userDB := i.getTSDB(userID) if userDB == nil || userDB.shipper == nil { @@ -1803,16 +1808,11 @@ func (i *Ingester) compactionLoop(ctx context.Context) error { for ctx.Err() == nil { select { case <-ticker.C: - i.compactBlocks(ctx, false) - - case ch := <-i.TSDBState.forceCompactTrigger: - i.compactBlocks(ctx, true) + i.compactBlocks(ctx, false, nil) - // Notify back. - select { - case ch <- struct{}{}: - default: // Nobody is waiting for notification, don't block this loop. - } + case req := <-i.TSDBState.forceCompactTrigger: + i.compactBlocks(ctx, true, req.users) + close(req.callback) // Notify back. case <-ctx.Done(): return nil @@ -1822,7 +1822,7 @@ func (i *Ingester) compactionLoop(ctx context.Context) error { } // Compacts all compactable blocks. Force flag will force compaction even if head is not compactable yet. -func (i *Ingester) compactBlocks(ctx context.Context, force bool) { +func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowed *util.AllowedTenants) { // Don't compact TSDB blocks while JOINING as there may be ongoing blocks transfers. // Compaction loop is not running in LEAVING state, so if we get here in LEAVING state, we're flushing blocks. if i.lifecycler != nil { @@ -1833,6 +1833,10 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool) { } _ = concurrency.ForEachUser(ctx, i.getTSDBUsers(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionConcurrency, func(ctx context.Context, userID string) error { + if !allowed.IsAllowed(userID) { + return nil + } + userDB := i.getTSDB(userID) if userDB == nil { return nil @@ -1982,28 +1986,43 @@ func (i *Ingester) v2LifecyclerFlush() { ctx := context.Background() - i.compactBlocks(ctx, true) + i.compactBlocks(ctx, true, nil) if i.cfg.BlocksStorageConfig.TSDB.IsBlocksShippingEnabled() { - i.shipBlocks(ctx) + i.shipBlocks(ctx, nil) } level.Info(i.logger).Log("msg", "finished flushing and shipping TSDB blocks") } +const ( + tenantParam = "tenant" + waitParam = "wait" +) + // Blocks version of Flush handler. It force-compacts blocks, and triggers shipping. -func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) { - go func() { +func (i *Ingester) v2FlushHandler(w http.ResponseWriter, r *http.Request) { + err := r.ParseForm() + if err != nil { + level.Warn(i.logger).Log("msg", "failed to parse HTTP request in flush handler", "err", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + tenants := r.Form[tenantParam] + + allowedUsers := util.NewAllowedTenants(tenants, nil) + run := func() { ingCtx := i.BasicService.ServiceContext() if ingCtx == nil || ingCtx.Err() != nil { level.Info(i.logger).Log("msg", "flushing TSDB blocks: ingester not running, ignoring flush request") return } - ch := make(chan struct{}, 1) + compactionCallbackCh := make(chan struct{}) level.Info(i.logger).Log("msg", "flushing TSDB blocks: triggering compaction") select { - case i.TSDBState.forceCompactTrigger <- ch: + case i.TSDBState.forceCompactTrigger <- requestWithUsersAndCallback{users: allowedUsers, callback: compactionCallbackCh}: // Compacting now. case <-ingCtx.Done(): level.Warn(i.logger).Log("msg", "failed to compact TSDB blocks, ingester not running anymore") @@ -2012,7 +2031,7 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) { // Wait until notified about compaction being finished. select { - case <-ch: + case <-compactionCallbackCh: level.Info(i.logger).Log("msg", "finished compacting TSDB blocks") case <-ingCtx.Done(): level.Warn(i.logger).Log("msg", "failed to compact TSDB blocks, ingester not running anymore") @@ -2020,10 +2039,12 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) { } if i.cfg.BlocksStorageConfig.TSDB.IsBlocksShippingEnabled() { + shippingCallbackCh := make(chan struct{}) // must be new channel, as compactionCallbackCh is closed now. + level.Info(i.logger).Log("msg", "flushing TSDB blocks: triggering shipping") select { - case i.TSDBState.shipTrigger <- ch: + case i.TSDBState.shipTrigger <- requestWithUsersAndCallback{users: allowedUsers, callback: shippingCallbackCh}: // shipping now case <-ingCtx.Done(): level.Warn(i.logger).Log("msg", "failed to ship TSDB blocks, ingester not running anymore") @@ -2032,7 +2053,7 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) { // Wait until shipping finished. select { - case <-ch: + case <-shippingCallbackCh: level.Info(i.logger).Log("msg", "shipping of TSDB blocks finished") case <-ingCtx.Done(): level.Warn(i.logger).Log("msg", "failed to ship TSDB blocks, ingester not running anymore") @@ -2041,7 +2062,14 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) { } level.Info(i.logger).Log("msg", "flushing TSDB blocks: finished") - }() + } + + if len(r.Form[waitParam]) > 0 && r.Form[waitParam][0] == "true" { + // Run synchronously. This simplifies and speeds up tests. + run() + } else { + go run() + } w.WriteHeader(http.StatusNoContent) } diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 1f57f7b2239..a388819be7c 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -10,6 +10,7 @@ import ( "net" "net/http" "net/http/httptest" + "net/url" "os" "path/filepath" "sort" @@ -2187,7 +2188,7 @@ func TestIngester_shipBlocks(t *testing.T) { } // Ship blocks and assert on the mocked shipper - i.shipBlocks(context.Background()) + i.shipBlocks(context.Background(), nil) for _, m := range mocks { m.AssertNumberOfCalls(t, "Sync", 1) @@ -2217,9 +2218,9 @@ func TestIngester_dontShipBlocksWhenTenantDeletionMarkerIsPresent(t *testing.T) pushSingleSampleWithMetadata(t, i) require.Equal(t, int64(1), i.TSDBState.seriesCount.Load()) - i.compactBlocks(context.Background(), true) + i.compactBlocks(context.Background(), true, nil) require.Equal(t, int64(0), i.TSDBState.seriesCount.Load()) - i.shipBlocks(context.Background()) + i.shipBlocks(context.Background(), nil) numObjects := len(bucket.Objects()) require.NotZero(t, numObjects) @@ -2234,9 +2235,9 @@ func TestIngester_dontShipBlocksWhenTenantDeletionMarkerIsPresent(t *testing.T) // After writing tenant deletion mark, pushSingleSampleWithMetadata(t, i) require.Equal(t, int64(1), i.TSDBState.seriesCount.Load()) - i.compactBlocks(context.Background(), true) + i.compactBlocks(context.Background(), true, nil) require.Equal(t, int64(0), i.TSDBState.seriesCount.Load()) - i.shipBlocks(context.Background()) + i.shipBlocks(context.Background(), nil) numObjectsAfterMarkingTenantForDeletion := len(bucket.Objects()) require.Equal(t, numObjects, numObjectsAfterMarkingTenantForDeletion) @@ -2271,7 +2272,7 @@ func TestIngester_seriesCountIsCorrectAfterClosingTSDBForDeletedTenant(t *testin require.Equal(t, int64(1), i.TSDBState.seriesCount.Load()) // We call shipBlocks to check for deletion marker (it happens inside this method). - i.shipBlocks(context.Background()) + i.shipBlocks(context.Background(), nil) // Verify that tenant deletion mark was found. db := i.getTSDB(userID) @@ -2317,7 +2318,7 @@ func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInP })) // Run blocks shipping in a separate go routine. - go i.shipBlocks(ctx) + go i.shipBlocks(ctx, nil) // Wait until shipping starts. test.Poll(t, 1*time.Second, activeShipping, func() interface{} { @@ -2404,8 +2405,8 @@ func TestIngester_idleCloseEmptyTSDB(t *testing.T) { require.NotNil(t, db) // Run compaction and shipping. - i.compactBlocks(context.Background(), true) - i.shipBlocks(context.Background()) + i.compactBlocks(context.Background(), true, nil) + i.shipBlocks(context.Background(), nil) // Make sure we can close completely empty TSDB without problems. require.Equal(t, tsdbIdleClosed, i.closeAndDeleteUserTSDBIfIdle(userID)) @@ -2550,19 +2551,54 @@ func TestIngester_flushing(t *testing.T) { cortex_ingester_shipper_uploads_total 0 `), "cortex_ingester_shipper_uploads_total")) - i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush", nil)) + // Using wait=true makes this a synchronous call. + i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush?wait=true", nil)) - // Flush handler only triggers compactions, but doesn't wait for them to finish. Let's wait for a moment, and then verify. - test.Poll(t, 5*time.Second, uint64(0), func() interface{} { - db := i.getTSDB(userID) - if db == nil { - return false - } - return db.Head().NumSeries() - }) + verifyCompactedHead(t, i, true) + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_ingester_shipper_uploads_total Total number of uploaded TSDB blocks + # TYPE cortex_ingester_shipper_uploads_total counter + cortex_ingester_shipper_uploads_total 1 + `), "cortex_ingester_shipper_uploads_total")) + }, + }, - // The above waiting only ensures compaction, waiting another second to register the Sync call. - time.Sleep(1 * time.Second) + "flushHandlerWithListOfTenants": { + setupIngester: func(cfg *Config) { + cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown = false + }, + + action: func(t *testing.T, i *Ingester, reg *prometheus.Registry) { + pushSingleSampleWithMetadata(t, i) + + // Nothing shipped yet. + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_ingester_shipper_uploads_total Total number of uploaded TSDB blocks + # TYPE cortex_ingester_shipper_uploads_total counter + cortex_ingester_shipper_uploads_total 0 + `), "cortex_ingester_shipper_uploads_total")) + + users := url.Values{} + users.Add(tenantParam, "unknown-user") + users.Add(tenantParam, "another-unknown-user") + + // Using wait=true makes this a synchronous call. + i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush?wait=true&"+users.Encode(), nil)) + + // Still nothing shipped or compacted. + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_ingester_shipper_uploads_total Total number of uploaded TSDB blocks + # TYPE cortex_ingester_shipper_uploads_total counter + cortex_ingester_shipper_uploads_total 0 + `), "cortex_ingester_shipper_uploads_total")) + verifyCompactedHead(t, i, false) + + users = url.Values{} + users.Add(tenantParam, "different-user") + users.Add(tenantParam, userID) // Our user + users.Add(tenantParam, "yet-another-user") + + i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush?wait=true&"+users.Encode(), nil)) verifyCompactedHead(t, i, true) require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` @@ -2598,19 +2634,7 @@ func TestIngester_flushing(t *testing.T) { cortex_ingester_shipper_uploads_total 0 `), "cortex_ingester_shipper_uploads_total")) - i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush", nil)) - - // Flush handler only triggers compactions, but doesn't wait for them to finish. Let's wait for a moment, and then verify. - test.Poll(t, 5*time.Second, uint64(0), func() interface{} { - db := i.getTSDB(userID) - if db == nil { - return false - } - return db.Head().NumSeries() - }) - - // The above waiting only ensures compaction, waiting another second to register the Sync call. - time.Sleep(1 * time.Second) + i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush?wait=true", nil)) verifyCompactedHead(t, i, true) @@ -2867,7 +2891,7 @@ func TestIngesterCompactIdleBlock(t *testing.T) { pushSingleSampleWithMetadata(t, i) - i.compactBlocks(context.Background(), false) + i.compactBlocks(context.Background(), false, nil) verifyCompactedHead(t, i, false) require.NoError(t, testutil.GatherAndCompare(r, strings.NewReader(` # HELP cortex_ingester_memory_series_created_total The total number of series that were created per user. @@ -2886,7 +2910,7 @@ func TestIngesterCompactIdleBlock(t *testing.T) { // wait one second (plus maximum jitter) -- TSDB is now idle. time.Sleep(time.Duration(float64(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout) * (1 + compactionIdleTimeoutJitter))) - i.compactBlocks(context.Background(), false) + i.compactBlocks(context.Background(), false, nil) verifyCompactedHead(t, i, true) require.NoError(t, testutil.GatherAndCompare(r, strings.NewReader(` # HELP cortex_ingester_memory_series_created_total The total number of series that were created per user. @@ -3342,7 +3366,8 @@ func TestIngesterNoFlushWithInFlightRequest(t *testing.T) { db := i.getTSDB(userID) require.NoError(t, db.acquireAppendLock()) - // Flush handler only triggers compactions, but doesn't wait for them to finish. + // Flush handler only triggers compactions, but doesn't wait for them to finish. We cannot use ?wait=true here, + // because it would deadlock -- flush will wait for appendLock to be released. i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush", nil)) // Flushing should not have succeeded even after 5 seconds. diff --git a/pkg/util/allowed_tenants.go b/pkg/util/allowed_tenants.go index 0f610a5fdb1..88c7a6333b8 100644 --- a/pkg/util/allowed_tenants.go +++ b/pkg/util/allowed_tenants.go @@ -1,5 +1,7 @@ package util +// AllowedTenants that can answer whether tenant is allowed or not based on configuration. +// Default value (nil) allows all tenants. type AllowedTenants struct { // If empty, all tenants are enabled. If not empty, only tenants in the map are enabled. enabled map[string]struct{} @@ -8,6 +10,9 @@ type AllowedTenants struct { disabled map[string]struct{} } +// NewAllowedTenants builds new allowed tenants based on enabled and disabled tenants. +// If there are any enabled tenants, then only those tenants are allowed. +// If there are any disabled tenants, then tenant from that list, that would normally be allowed, is disabled instead. func NewAllowedTenants(enabled []string, disabled []string) *AllowedTenants { a := &AllowedTenants{} @@ -29,6 +34,10 @@ func NewAllowedTenants(enabled []string, disabled []string) *AllowedTenants { } func (a *AllowedTenants) IsAllowed(tenantID string) bool { + if a == nil { + return true + } + if len(a.enabled) > 0 { if _, ok := a.enabled[tenantID]; !ok { return false diff --git a/pkg/util/allowed_tenants_test.go b/pkg/util/allowed_tenants_test.go index b51cf930e74..221e0a9e603 100644 --- a/pkg/util/allowed_tenants_test.go +++ b/pkg/util/allowed_tenants_test.go @@ -36,3 +36,12 @@ func TestAllowedTenants_Combination(t *testing.T) { require.False(t, a.IsAllowed("C")) // disabled require.False(t, a.IsAllowed("D")) // not enabled } + +func TestAllowedTenants_Nil(t *testing.T) { + var a *AllowedTenants = nil + + // All tenants are allowed when using nil as allowed tenants. + require.True(t, a.IsAllowed("A")) + require.True(t, a.IsAllowed("B")) + require.True(t, a.IsAllowed("C")) +}