From 74555ff06ec0cf69a4c80e6dbbd4909fa7b328a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 13 Apr 2021 11:05:58 +0200 Subject: [PATCH 1/7] Ingester can now flush only specified users. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2.go | 103 +++++++++++++++++++++---------- pkg/ingester/ingester_v2_test.go | 97 ++++++++++++++++++----------- 2 files changed, 130 insertions(+), 70 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 88993293325..5a6125785e1 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -387,8 +387,8 @@ type TSDBState struct { tsdbMetrics *tsdbMetrics - forceCompactTrigger chan chan<- struct{} - shipTrigger chan chan<- struct{} + forceCompactTrigger chan requestWithUsersAndCallback + shipTrigger chan requestWithUsersAndCallback // Timeout chosen for idle compactions. compactionIdleTimeout time.Duration @@ -405,6 +405,11 @@ type TSDBState struct { idleTsdbChecks *prometheus.CounterVec } +type requestWithUsersAndCallback struct { + users map[string]struct{} // if nil, all users are compacted/shipped. If not nil, only users in the map are compacted/shipped. + callback chan<- struct{} // when compaction/shipping is finished, this channel is closed +} + func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer) TSDBState { idleTsdbChecks := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ingester_idle_tsdb_checks_total", @@ -426,8 +431,8 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer dbs: make(map[string]*userTSDB), bucket: bucketClient, tsdbMetrics: newTSDBMetrics(registerer), - forceCompactTrigger: make(chan chan<- struct{}), - shipTrigger: make(chan chan<- struct{}), + forceCompactTrigger: make(chan requestWithUsersAndCallback), + shipTrigger: make(chan requestWithUsersAndCallback), compactionsTriggered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_ingester_tsdb_compactions_triggered_total", @@ -1707,16 +1712,11 @@ func (i *Ingester) shipBlocksLoop(ctx context.Context) error { for { select { case <-shipTicker.C: - i.shipBlocks(ctx) + i.shipBlocks(ctx, nil) - case ch := <-i.TSDBState.shipTrigger: - i.shipBlocks(ctx) - - // Notify back. - select { - case ch <- struct{}{}: - default: // Nobody is waiting for notification, don't block this loop. - } + case req := <-i.TSDBState.shipTrigger: + i.shipBlocks(ctx, req.users) + close(req.callback) // Notify back. case <-ctx.Done(): return nil @@ -1724,7 +1724,10 @@ func (i *Ingester) shipBlocksLoop(ctx context.Context) error { } } -func (i *Ingester) shipBlocks(ctx context.Context) { +// shipBlocks runs shipping for all users. +// allowedUsers, if not nil, is used to only ship blocks for users in this map. +// (Note that empty map = no users are shipped, nil map = all users are shipped) +func (i *Ingester) shipBlocks(ctx context.Context, allowedUsers map[string]struct{}) { // Do not ship blocks if the ingester is PENDING or JOINING. It's // particularly important for the JOINING state because there could // be a blocks transfer in progress (from another ingester) and if we @@ -1739,6 +1742,10 @@ func (i *Ingester) shipBlocks(ctx context.Context) { // Number of concurrent workers is limited in order to avoid to concurrently sync a lot // of tenants in a large cluster. _ = concurrency.ForEachUser(ctx, i.getTSDBUsers(), i.cfg.BlocksStorageConfig.TSDB.ShipConcurrency, func(ctx context.Context, userID string) error { + if _, ok := allowedUsers[userID]; allowedUsers != nil && !ok { + return nil + } + // Get the user's DB. If the user doesn't exist, we skip it. userDB := i.getTSDB(userID) if userDB == nil || userDB.shipper == nil { @@ -1803,16 +1810,11 @@ func (i *Ingester) compactionLoop(ctx context.Context) error { for ctx.Err() == nil { select { case <-ticker.C: - i.compactBlocks(ctx, false) - - case ch := <-i.TSDBState.forceCompactTrigger: - i.compactBlocks(ctx, true) + i.compactBlocks(ctx, false, nil) - // Notify back. - select { - case ch <- struct{}{}: - default: // Nobody is waiting for notification, don't block this loop. - } + case req := <-i.TSDBState.forceCompactTrigger: + i.compactBlocks(ctx, true, req.users) + close(req.callback) // Notify back. case <-ctx.Done(): return nil @@ -1822,7 +1824,9 @@ func (i *Ingester) compactionLoop(ctx context.Context) error { } // Compacts all compactable blocks. Force flag will force compaction even if head is not compactable yet. -func (i *Ingester) compactBlocks(ctx context.Context, force bool) { +// allowedUsers, if not nil, is used to only compact users in this map. +// (Note that empty map = no users are compacted, nil map = all users are compacted) +func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowedUsers map[string]struct{}) { // Don't compact TSDB blocks while JOINING as there may be ongoing blocks transfers. // Compaction loop is not running in LEAVING state, so if we get here in LEAVING state, we're flushing blocks. if i.lifecycler != nil { @@ -1833,6 +1837,10 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool) { } _ = concurrency.ForEachUser(ctx, i.getTSDBUsers(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionConcurrency, func(ctx context.Context, userID string) error { + if _, ok := allowedUsers[userID]; allowedUsers != nil && !ok { + return nil + } + userDB := i.getTSDB(userID) if userDB == nil { return nil @@ -1982,28 +1990,45 @@ func (i *Ingester) v2LifecyclerFlush() { ctx := context.Background() - i.compactBlocks(ctx, true) + i.compactBlocks(ctx, true, nil) if i.cfg.BlocksStorageConfig.TSDB.IsBlocksShippingEnabled() { - i.shipBlocks(ctx) + i.shipBlocks(ctx, nil) } level.Info(i.logger).Log("msg", "finished flushing and shipping TSDB blocks") } // Blocks version of Flush handler. It force-compacts blocks, and triggers shipping. -func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) { - go func() { +func (i *Ingester) v2FlushHandler(w http.ResponseWriter, r *http.Request) { + err := r.ParseForm() + if err != nil { + level.Warn(i.logger).Log("msg", "failed to parse HTTP request in flush handler", "err", err) + w.WriteHeader(http.StatusBadRequest) + return + } + + tenants := r.Form["tenant"] + + var allowedUsers map[string]struct{} = nil // All users are allowed. + if len(tenants) > 0 { + allowedUsers = make(map[string]struct{}, len(tenants)) + for _, t := range tenants { + allowedUsers[t] = struct{}{} + } + } + + run := func() { ingCtx := i.BasicService.ServiceContext() if ingCtx == nil || ingCtx.Err() != nil { level.Info(i.logger).Log("msg", "flushing TSDB blocks: ingester not running, ignoring flush request") return } - ch := make(chan struct{}, 1) + compactionCallbackCh := make(chan struct{}) level.Info(i.logger).Log("msg", "flushing TSDB blocks: triggering compaction") select { - case i.TSDBState.forceCompactTrigger <- ch: + case i.TSDBState.forceCompactTrigger <- requestWithUsersAndCallback{users: allowedUsers, callback: compactionCallbackCh}: // Compacting now. case <-ingCtx.Done(): level.Warn(i.logger).Log("msg", "failed to compact TSDB blocks, ingester not running anymore") @@ -2012,7 +2037,7 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) { // Wait until notified about compaction being finished. select { - case <-ch: + case <-compactionCallbackCh: level.Info(i.logger).Log("msg", "finished compacting TSDB blocks") case <-ingCtx.Done(): level.Warn(i.logger).Log("msg", "failed to compact TSDB blocks, ingester not running anymore") @@ -2020,10 +2045,12 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) { } if i.cfg.BlocksStorageConfig.TSDB.IsBlocksShippingEnabled() { + shippingCallbackCh := make(chan struct{}) // must be new channel, as compactionCallbackCh is closed now. + level.Info(i.logger).Log("msg", "flushing TSDB blocks: triggering shipping") select { - case i.TSDBState.shipTrigger <- ch: + case i.TSDBState.shipTrigger <- requestWithUsersAndCallback{users: allowedUsers, callback: shippingCallbackCh}: // shipping now case <-ingCtx.Done(): level.Warn(i.logger).Log("msg", "failed to ship TSDB blocks, ingester not running anymore") @@ -2032,7 +2059,7 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) { // Wait until shipping finished. select { - case <-ch: + case <-shippingCallbackCh: level.Info(i.logger).Log("msg", "shipping of TSDB blocks finished") case <-ingCtx.Done(): level.Warn(i.logger).Log("msg", "failed to ship TSDB blocks, ingester not running anymore") @@ -2041,7 +2068,15 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) { } level.Info(i.logger).Log("msg", "flushing TSDB blocks: finished") - }() + } + + const waitParam = "wait" + if len(r.Form[waitParam]) > 0 && r.Form[waitParam][0] == "true" { + // Run synchronously. This simplifies and speeds up tests. + run() + } else { + go run() + } w.WriteHeader(http.StatusNoContent) } diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 1f57f7b2239..d1c5e5d9909 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -10,6 +10,7 @@ import ( "net" "net/http" "net/http/httptest" + "net/url" "os" "path/filepath" "sort" @@ -2187,7 +2188,7 @@ func TestIngester_shipBlocks(t *testing.T) { } // Ship blocks and assert on the mocked shipper - i.shipBlocks(context.Background()) + i.shipBlocks(context.Background(), nil) for _, m := range mocks { m.AssertNumberOfCalls(t, "Sync", 1) @@ -2217,9 +2218,9 @@ func TestIngester_dontShipBlocksWhenTenantDeletionMarkerIsPresent(t *testing.T) pushSingleSampleWithMetadata(t, i) require.Equal(t, int64(1), i.TSDBState.seriesCount.Load()) - i.compactBlocks(context.Background(), true) + i.compactBlocks(context.Background(), true, nil) require.Equal(t, int64(0), i.TSDBState.seriesCount.Load()) - i.shipBlocks(context.Background()) + i.shipBlocks(context.Background(), nil) numObjects := len(bucket.Objects()) require.NotZero(t, numObjects) @@ -2234,9 +2235,9 @@ func TestIngester_dontShipBlocksWhenTenantDeletionMarkerIsPresent(t *testing.T) // After writing tenant deletion mark, pushSingleSampleWithMetadata(t, i) require.Equal(t, int64(1), i.TSDBState.seriesCount.Load()) - i.compactBlocks(context.Background(), true) + i.compactBlocks(context.Background(), true, nil) require.Equal(t, int64(0), i.TSDBState.seriesCount.Load()) - i.shipBlocks(context.Background()) + i.shipBlocks(context.Background(), nil) numObjectsAfterMarkingTenantForDeletion := len(bucket.Objects()) require.Equal(t, numObjects, numObjectsAfterMarkingTenantForDeletion) @@ -2271,7 +2272,7 @@ func TestIngester_seriesCountIsCorrectAfterClosingTSDBForDeletedTenant(t *testin require.Equal(t, int64(1), i.TSDBState.seriesCount.Load()) // We call shipBlocks to check for deletion marker (it happens inside this method). - i.shipBlocks(context.Background()) + i.shipBlocks(context.Background(), nil) // Verify that tenant deletion mark was found. db := i.getTSDB(userID) @@ -2317,7 +2318,7 @@ func TestIngester_closeAndDeleteUserTSDBIfIdle_shouldNotCloseTSDBIfShippingIsInP })) // Run blocks shipping in a separate go routine. - go i.shipBlocks(ctx) + go i.shipBlocks(ctx, nil) // Wait until shipping starts. test.Poll(t, 1*time.Second, activeShipping, func() interface{} { @@ -2404,8 +2405,8 @@ func TestIngester_idleCloseEmptyTSDB(t *testing.T) { require.NotNil(t, db) // Run compaction and shipping. - i.compactBlocks(context.Background(), true) - i.shipBlocks(context.Background()) + i.compactBlocks(context.Background(), true, nil) + i.shipBlocks(context.Background(), nil) // Make sure we can close completely empty TSDB without problems. require.Equal(t, tsdbIdleClosed, i.closeAndDeleteUserTSDBIfIdle(userID)) @@ -2550,19 +2551,54 @@ func TestIngester_flushing(t *testing.T) { cortex_ingester_shipper_uploads_total 0 `), "cortex_ingester_shipper_uploads_total")) - i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush", nil)) + // Using wait=true makes this a synchronous call. + i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush?wait=true", nil)) - // Flush handler only triggers compactions, but doesn't wait for them to finish. Let's wait for a moment, and then verify. - test.Poll(t, 5*time.Second, uint64(0), func() interface{} { - db := i.getTSDB(userID) - if db == nil { - return false - } - return db.Head().NumSeries() - }) + verifyCompactedHead(t, i, true) + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_ingester_shipper_uploads_total Total number of uploaded TSDB blocks + # TYPE cortex_ingester_shipper_uploads_total counter + cortex_ingester_shipper_uploads_total 1 + `), "cortex_ingester_shipper_uploads_total")) + }, + }, - // The above waiting only ensures compaction, waiting another second to register the Sync call. - time.Sleep(1 * time.Second) + "flushHandlerWithListOfTenants": { + setupIngester: func(cfg *Config) { + cfg.BlocksStorageConfig.TSDB.FlushBlocksOnShutdown = false + }, + + action: func(t *testing.T, i *Ingester, reg *prometheus.Registry) { + pushSingleSampleWithMetadata(t, i) + + // Nothing shipped yet. + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_ingester_shipper_uploads_total Total number of uploaded TSDB blocks + # TYPE cortex_ingester_shipper_uploads_total counter + cortex_ingester_shipper_uploads_total 0 + `), "cortex_ingester_shipper_uploads_total")) + + users := url.Values{} + users.Add("tenant", "unknown-user") + users.Add("tenant", "another-unknown-user") + + // Using wait=true makes this a synchronous call. + i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush?wait=true&"+users.Encode(), nil)) + + // Still nothing shipped or compacted. + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_ingester_shipper_uploads_total Total number of uploaded TSDB blocks + # TYPE cortex_ingester_shipper_uploads_total counter + cortex_ingester_shipper_uploads_total 0 + `), "cortex_ingester_shipper_uploads_total")) + verifyCompactedHead(t, i, false) + + users = url.Values{} + users.Add("tenant", "different-user") + users.Add("tenant", userID) // Our user + users.Add("tenant", "yet-another-user") + + i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush?wait=true&"+users.Encode(), nil)) verifyCompactedHead(t, i, true) require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` @@ -2598,19 +2634,7 @@ func TestIngester_flushing(t *testing.T) { cortex_ingester_shipper_uploads_total 0 `), "cortex_ingester_shipper_uploads_total")) - i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush", nil)) - - // Flush handler only triggers compactions, but doesn't wait for them to finish. Let's wait for a moment, and then verify. - test.Poll(t, 5*time.Second, uint64(0), func() interface{} { - db := i.getTSDB(userID) - if db == nil { - return false - } - return db.Head().NumSeries() - }) - - // The above waiting only ensures compaction, waiting another second to register the Sync call. - time.Sleep(1 * time.Second) + i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush?wait=true", nil)) verifyCompactedHead(t, i, true) @@ -2867,7 +2891,7 @@ func TestIngesterCompactIdleBlock(t *testing.T) { pushSingleSampleWithMetadata(t, i) - i.compactBlocks(context.Background(), false) + i.compactBlocks(context.Background(), false, nil) verifyCompactedHead(t, i, false) require.NoError(t, testutil.GatherAndCompare(r, strings.NewReader(` # HELP cortex_ingester_memory_series_created_total The total number of series that were created per user. @@ -2886,7 +2910,7 @@ func TestIngesterCompactIdleBlock(t *testing.T) { // wait one second (plus maximum jitter) -- TSDB is now idle. time.Sleep(time.Duration(float64(cfg.BlocksStorageConfig.TSDB.HeadCompactionIdleTimeout) * (1 + compactionIdleTimeoutJitter))) - i.compactBlocks(context.Background(), false) + i.compactBlocks(context.Background(), false, nil) verifyCompactedHead(t, i, true) require.NoError(t, testutil.GatherAndCompare(r, strings.NewReader(` # HELP cortex_ingester_memory_series_created_total The total number of series that were created per user. @@ -3342,7 +3366,8 @@ func TestIngesterNoFlushWithInFlightRequest(t *testing.T) { db := i.getTSDB(userID) require.NoError(t, db.acquireAppendLock()) - // Flush handler only triggers compactions, but doesn't wait for them to finish. + // Flush handler only triggers compactions, but doesn't wait for them to finish. We cannot use ?wait=true here, + // because it would deadlock -- flush will wait for appendLock to be released. i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush", nil)) // Flushing should not have succeeded even after 5 seconds. From d72b6b7d2bde5eb4386c916edbf92d6099fd0f2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 13 Apr 2021 11:11:19 +0200 Subject: [PATCH 2/7] Rename tenant to t, to shorten the query string. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2.go | 4 +++- pkg/ingester/ingester_v2_test.go | 10 +++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 5a6125785e1..d1a3c7d3ef7 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -1998,6 +1998,8 @@ func (i *Ingester) v2LifecyclerFlush() { level.Info(i.logger).Log("msg", "finished flushing and shipping TSDB blocks") } +const tenantParam = "t" + // Blocks version of Flush handler. It force-compacts blocks, and triggers shipping. func (i *Ingester) v2FlushHandler(w http.ResponseWriter, r *http.Request) { err := r.ParseForm() @@ -2007,7 +2009,7 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, r *http.Request) { return } - tenants := r.Form["tenant"] + tenants := r.Form[tenantParam] var allowedUsers map[string]struct{} = nil // All users are allowed. if len(tenants) > 0 { diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index d1c5e5d9909..a388819be7c 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -2579,8 +2579,8 @@ func TestIngester_flushing(t *testing.T) { `), "cortex_ingester_shipper_uploads_total")) users := url.Values{} - users.Add("tenant", "unknown-user") - users.Add("tenant", "another-unknown-user") + users.Add(tenantParam, "unknown-user") + users.Add(tenantParam, "another-unknown-user") // Using wait=true makes this a synchronous call. i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush?wait=true&"+users.Encode(), nil)) @@ -2594,9 +2594,9 @@ func TestIngester_flushing(t *testing.T) { verifyCompactedHead(t, i, false) users = url.Values{} - users.Add("tenant", "different-user") - users.Add("tenant", userID) // Our user - users.Add("tenant", "yet-another-user") + users.Add(tenantParam, "different-user") + users.Add(tenantParam, userID) // Our user + users.Add(tenantParam, "yet-another-user") i.FlushHandler(httptest.NewRecorder(), httptest.NewRequest("POST", "/flush?wait=true&"+users.Encode(), nil)) From d850e97372d42feb5366bb770e2b0dcec0764c56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 13 Apr 2021 11:15:55 +0200 Subject: [PATCH 3/7] CHANGELOG.md and api updates. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 1 + docs/api/_index.md | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 624ad8dfa8e..bb1bc526b7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ * [ENHANCEMENT] Ingester: added following per-ingester (instance) limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992. * [ENHANCEMENT] Cortex is now built with Go 1.16. #4062 * [ENHANCEMENT] Ruler: Added `-ruler.enabled-tenants` and `-ruler.disabled-tenants` to explicitly enable or disable rules processing for specific tenants. #4074 +* [ENHANCEMENT] Block Storage Ingester: `/flush` now accepts two new parameters: `t` (as "tenant") to specify tenant to flush and `wait=true` to make call synchronous. Multiple tenants can be specified by repeating `t` parameter. #4073 * [BUGFIX] Ruler-API: fix bug where `/api/v1/rules//` endpoint return `400` instead of `404`. #4013 * [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948 * [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959 diff --git a/docs/api/_index.md b/docs/api/_index.md index 857315b27d3..4f8435834b4 100644 --- a/docs/api/_index.md +++ b/docs/api/_index.md @@ -247,6 +247,10 @@ GET,POST /flush Triggers a flush of the in-memory time series data (chunks or blocks) to the long-term storage. This endpoint triggers the flush also when `-ingester.flush-on-shutdown-with-wal-enabled` or `-blocks-storage.tsdb.flush-blocks-on-shutdown` are disabled. +When using blocks storage, this endpoint accepts `t` parameter to specify tenant whose blocks are compacted and shipped. `t` parameter may be specified multiple times to select more tenants. If no tenant is specified, all tenants are flushed. + +Flush endpoint now also accepts `wait=true` parameter, which makes the call synchronous – it will only return after flushing has finished. Note that returned status code does not reflect the result of flush operation. This parameter is only available when using blocks storage. + ### Shutdown ``` From bb46a256edfb5ca0a1e3f0ac7b9c5f30022da124 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Tue, 13 Apr 2021 11:32:23 +0200 Subject: [PATCH 4/7] Remove nil/empty distinction. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index d1a3c7d3ef7..6c2a056c526 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -406,7 +406,7 @@ type TSDBState struct { } type requestWithUsersAndCallback struct { - users map[string]struct{} // if nil, all users are compacted/shipped. If not nil, only users in the map are compacted/shipped. + users map[string]struct{} // if nil or empty, all users are compacted/shipped. Otherwise only users in the map are compacted/shipped. callback chan<- struct{} // when compaction/shipping is finished, this channel is closed } @@ -1725,8 +1725,7 @@ func (i *Ingester) shipBlocksLoop(ctx context.Context) error { } // shipBlocks runs shipping for all users. -// allowedUsers, if not nil, is used to only ship blocks for users in this map. -// (Note that empty map = no users are shipped, nil map = all users are shipped) +// allowedUsers, if not empty, is used to only ship blocks for users in this map. (nil map = empty) func (i *Ingester) shipBlocks(ctx context.Context, allowedUsers map[string]struct{}) { // Do not ship blocks if the ingester is PENDING or JOINING. It's // particularly important for the JOINING state because there could @@ -1742,7 +1741,7 @@ func (i *Ingester) shipBlocks(ctx context.Context, allowedUsers map[string]struc // Number of concurrent workers is limited in order to avoid to concurrently sync a lot // of tenants in a large cluster. _ = concurrency.ForEachUser(ctx, i.getTSDBUsers(), i.cfg.BlocksStorageConfig.TSDB.ShipConcurrency, func(ctx context.Context, userID string) error { - if _, ok := allowedUsers[userID]; allowedUsers != nil && !ok { + if _, ok := allowedUsers[userID]; len(allowedUsers) > 0 && !ok { return nil } @@ -1824,8 +1823,7 @@ func (i *Ingester) compactionLoop(ctx context.Context) error { } // Compacts all compactable blocks. Force flag will force compaction even if head is not compactable yet. -// allowedUsers, if not nil, is used to only compact users in this map. -// (Note that empty map = no users are compacted, nil map = all users are compacted) +// allowedUsers, if not empty, is used to only compact users in this map. (nil map = empty) func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowedUsers map[string]struct{}) { // Don't compact TSDB blocks while JOINING as there may be ongoing blocks transfers. // Compaction loop is not running in LEAVING state, so if we get here in LEAVING state, we're flushing blocks. @@ -1837,7 +1835,7 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowedUsers m } _ = concurrency.ForEachUser(ctx, i.getTSDBUsers(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionConcurrency, func(ctx context.Context, userID string) error { - if _, ok := allowedUsers[userID]; allowedUsers != nil && !ok { + if _, ok := allowedUsers[userID]; len(allowedUsers) > 0 && !ok { return nil } From 5310d07fecddc3ff7cdd6e3edf9fa73bd4dbb857 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 14 Apr 2021 10:22:18 +0200 Subject: [PATCH 5/7] Review feedback. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 6c2a056c526..2749bf14604 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -1996,7 +1996,10 @@ func (i *Ingester) v2LifecyclerFlush() { level.Info(i.logger).Log("msg", "finished flushing and shipping TSDB blocks") } -const tenantParam = "t" +const ( + tenantParam = "tenant" + waitParam = "wait" +) // Blocks version of Flush handler. It force-compacts blocks, and triggers shipping. func (i *Ingester) v2FlushHandler(w http.ResponseWriter, r *http.Request) { @@ -2070,7 +2073,6 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, r *http.Request) { level.Info(i.logger).Log("msg", "flushing TSDB blocks: finished") } - const waitParam = "wait" if len(r.Form[waitParam]) > 0 && r.Form[waitParam][0] == "true" { // Run synchronously. This simplifies and speeds up tests. run() From 2adc556c9d11dee0363347052411bce9229eec3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 14 Apr 2021 15:04:07 +0200 Subject: [PATCH 6/7] Fix documentation. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- CHANGELOG.md | 2 +- docs/api/_index.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bb1bc526b7a..f1b62bff23f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,7 +39,7 @@ * [ENHANCEMENT] Ingester: added following per-ingester (instance) limits: max number of series in memory (`-ingester.instance-limits.max-series`), max number of users in memory (`-ingester.instance-limits.max-tenants`), max ingestion rate (`-ingester.instance-limits.max-ingestion-rate`), and max inflight requests (`-ingester.instance-limits.max-inflight-push-requests`). These limits are only used when using blocks storage. Limits can also be configured using runtime-config feature, and current values are exported as `cortex_ingester_instance_limits` metric. #3992. * [ENHANCEMENT] Cortex is now built with Go 1.16. #4062 * [ENHANCEMENT] Ruler: Added `-ruler.enabled-tenants` and `-ruler.disabled-tenants` to explicitly enable or disable rules processing for specific tenants. #4074 -* [ENHANCEMENT] Block Storage Ingester: `/flush` now accepts two new parameters: `t` (as "tenant") to specify tenant to flush and `wait=true` to make call synchronous. Multiple tenants can be specified by repeating `t` parameter. #4073 +* [ENHANCEMENT] Block Storage Ingester: `/flush` now accepts two new parameters: `tenant` to specify tenant to flush and `wait=true` to make call synchronous. Multiple tenants can be specified by repeating `tenant` parameter. If no `tenant` is specified, all tenants are flushed, as before. #4073 * [BUGFIX] Ruler-API: fix bug where `/api/v1/rules//` endpoint return `400` instead of `404`. #4013 * [BUGFIX] Distributor: reverted changes done to rate limiting in #3825. #3948 * [BUGFIX] Ingester: Fix race condition when opening and closing tsdb concurrently. #3959 diff --git a/docs/api/_index.md b/docs/api/_index.md index 4f8435834b4..3c4141852bf 100644 --- a/docs/api/_index.md +++ b/docs/api/_index.md @@ -247,7 +247,7 @@ GET,POST /flush Triggers a flush of the in-memory time series data (chunks or blocks) to the long-term storage. This endpoint triggers the flush also when `-ingester.flush-on-shutdown-with-wal-enabled` or `-blocks-storage.tsdb.flush-blocks-on-shutdown` are disabled. -When using blocks storage, this endpoint accepts `t` parameter to specify tenant whose blocks are compacted and shipped. `t` parameter may be specified multiple times to select more tenants. If no tenant is specified, all tenants are flushed. +When using blocks storage, this endpoint accepts `tenant` parameter to specify tenant whose blocks are compacted and shipped. This parameter may be specified multiple times to select more tenants. If no tenant is specified, all tenants are flushed. Flush endpoint now also accepts `wait=true` parameter, which makes the call synchronous – it will only return after flushing has finished. Note that returned status code does not reflect the result of flush operation. This parameter is only available when using blocks storage. From eef449dc11840ed4bc21ec55b096e19c99e74ee4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20S=CC=8Ctibrany=CC=81?= Date: Wed, 14 Apr 2021 20:56:08 +0200 Subject: [PATCH 7/7] Use allowed tenants for passing which tenants to flush. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Peter Štibraný --- pkg/ingester/ingester_v2.go | 23 +++++++---------------- pkg/util/allowed_tenants.go | 9 +++++++++ pkg/util/allowed_tenants_test.go | 9 +++++++++ 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 2749bf14604..587613fd5ab 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -406,8 +406,8 @@ type TSDBState struct { } type requestWithUsersAndCallback struct { - users map[string]struct{} // if nil or empty, all users are compacted/shipped. Otherwise only users in the map are compacted/shipped. - callback chan<- struct{} // when compaction/shipping is finished, this channel is closed + users *util.AllowedTenants // if nil, all tenants are allowed. + callback chan<- struct{} // when compaction/shipping is finished, this channel is closed } func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer) TSDBState { @@ -1725,8 +1725,7 @@ func (i *Ingester) shipBlocksLoop(ctx context.Context) error { } // shipBlocks runs shipping for all users. -// allowedUsers, if not empty, is used to only ship blocks for users in this map. (nil map = empty) -func (i *Ingester) shipBlocks(ctx context.Context, allowedUsers map[string]struct{}) { +func (i *Ingester) shipBlocks(ctx context.Context, allowed *util.AllowedTenants) { // Do not ship blocks if the ingester is PENDING or JOINING. It's // particularly important for the JOINING state because there could // be a blocks transfer in progress (from another ingester) and if we @@ -1741,7 +1740,7 @@ func (i *Ingester) shipBlocks(ctx context.Context, allowedUsers map[string]struc // Number of concurrent workers is limited in order to avoid to concurrently sync a lot // of tenants in a large cluster. _ = concurrency.ForEachUser(ctx, i.getTSDBUsers(), i.cfg.BlocksStorageConfig.TSDB.ShipConcurrency, func(ctx context.Context, userID string) error { - if _, ok := allowedUsers[userID]; len(allowedUsers) > 0 && !ok { + if !allowed.IsAllowed(userID) { return nil } @@ -1823,8 +1822,7 @@ func (i *Ingester) compactionLoop(ctx context.Context) error { } // Compacts all compactable blocks. Force flag will force compaction even if head is not compactable yet. -// allowedUsers, if not empty, is used to only compact users in this map. (nil map = empty) -func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowedUsers map[string]struct{}) { +func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowed *util.AllowedTenants) { // Don't compact TSDB blocks while JOINING as there may be ongoing blocks transfers. // Compaction loop is not running in LEAVING state, so if we get here in LEAVING state, we're flushing blocks. if i.lifecycler != nil { @@ -1835,7 +1833,7 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowedUsers m } _ = concurrency.ForEachUser(ctx, i.getTSDBUsers(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionConcurrency, func(ctx context.Context, userID string) error { - if _, ok := allowedUsers[userID]; len(allowedUsers) > 0 && !ok { + if !allowed.IsAllowed(userID) { return nil } @@ -2012,14 +2010,7 @@ func (i *Ingester) v2FlushHandler(w http.ResponseWriter, r *http.Request) { tenants := r.Form[tenantParam] - var allowedUsers map[string]struct{} = nil // All users are allowed. - if len(tenants) > 0 { - allowedUsers = make(map[string]struct{}, len(tenants)) - for _, t := range tenants { - allowedUsers[t] = struct{}{} - } - } - + allowedUsers := util.NewAllowedTenants(tenants, nil) run := func() { ingCtx := i.BasicService.ServiceContext() if ingCtx == nil || ingCtx.Err() != nil { diff --git a/pkg/util/allowed_tenants.go b/pkg/util/allowed_tenants.go index 0f610a5fdb1..88c7a6333b8 100644 --- a/pkg/util/allowed_tenants.go +++ b/pkg/util/allowed_tenants.go @@ -1,5 +1,7 @@ package util +// AllowedTenants that can answer whether tenant is allowed or not based on configuration. +// Default value (nil) allows all tenants. type AllowedTenants struct { // If empty, all tenants are enabled. If not empty, only tenants in the map are enabled. enabled map[string]struct{} @@ -8,6 +10,9 @@ type AllowedTenants struct { disabled map[string]struct{} } +// NewAllowedTenants builds new allowed tenants based on enabled and disabled tenants. +// If there are any enabled tenants, then only those tenants are allowed. +// If there are any disabled tenants, then tenant from that list, that would normally be allowed, is disabled instead. func NewAllowedTenants(enabled []string, disabled []string) *AllowedTenants { a := &AllowedTenants{} @@ -29,6 +34,10 @@ func NewAllowedTenants(enabled []string, disabled []string) *AllowedTenants { } func (a *AllowedTenants) IsAllowed(tenantID string) bool { + if a == nil { + return true + } + if len(a.enabled) > 0 { if _, ok := a.enabled[tenantID]; !ok { return false diff --git a/pkg/util/allowed_tenants_test.go b/pkg/util/allowed_tenants_test.go index b51cf930e74..221e0a9e603 100644 --- a/pkg/util/allowed_tenants_test.go +++ b/pkg/util/allowed_tenants_test.go @@ -36,3 +36,12 @@ func TestAllowedTenants_Combination(t *testing.T) { require.False(t, a.IsAllowed("C")) // disabled require.False(t, a.IsAllowed("D")) // not enabled } + +func TestAllowedTenants_Nil(t *testing.T) { + var a *AllowedTenants = nil + + // All tenants are allowed when using nil as allowed tenants. + require.True(t, a.IsAllowed("A")) + require.True(t, a.IsAllowed("B")) + require.True(t, a.IsAllowed("C")) +}