Skip to content

Commit 5bc5654

Browse files
authored
Added support for flushing blocks (#2794)
* Added support for flushing blocks. Flushing can be triggered by: - using -experimental.tsdb.flush-blocks-on-shutdown option - using /flush endpoint (works for chunks too) -- this triggers flush and shipping (if enabled), but endpoint returns immediately - using /shutdown endpoint (works for chunks too) -- enables flush flag, and shuts down ingester. Endpoint waits until flushing and shipping has finished. Signed-off-by: Peter Štibraný <[email protected]> * Added tests to verify flushing behaviour. Signed-off-by: Peter Štibraný <[email protected]> * If shipping is triggered by /flush loop, log message after finished. Signed-off-by: Peter Štibraný <[email protected]> * Updated doc. Signed-off-by: Peter Štibraný <[email protected]> * Added CHANGELOG.md Signed-off-by: Peter Štibraný <[email protected]> * Fix check whether head is empty or not. Signed-off-by: Peter Štibraný <[email protected]> * Unified compaction into single method. Both shipping and compaction now send completion notification back. Flushing reacts on ingester stopping, and writes proper message. Signed-off-by: Peter Štibraný <[email protected]> * Fix logging. Signed-off-by: Peter Štibraný <[email protected]> * Fix tests after rebase. Signed-off-by: Peter Štibraný <[email protected]> * Update comment. Signed-off-by: Peter Štibraný <[email protected]> * Removed logger. Signed-off-by: Peter Štibraný <[email protected]> * Add user to log message. Signed-off-by: Peter Štibraný <[email protected]> * Change debug to info when logging idle compaction. Signed-off-by: Peter Štibraný <[email protected]>
1 parent 7326c6c commit 5bc5654

File tree

8 files changed

+264
-23
lines changed

8 files changed

+264
-23
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
* [ENHANCEMENT] Query-tee: Support for custom API prefix by using `-server.path-prefix` option. #2814
3030
* [ENHANCEMENT] Query-tee: Forward `X-Scope-OrgId` header to backend, if present in the request. #2815
3131
* [ENHANCEMENT] Experimental TSDB: Added `-experimental.tsdb.head-compaction-idle-timeout` option to force compaction of data in memory into a block. #2803
32+
* [ENHANCEMENT] Experimental TSDB: Added support for flushing blocks via `/flush`, `/shutdown` (previously these only worked for chunks storage) and by using `-experimental.tsdb.flush-blocks-on-shutdown` option. #2794
3233
* [BUGFIX] Fixed a bug in the index intersect code causing storage to return more chunks/series than required. #2796
3334
* [BUGFIX] Fixed the number of reported keys in the background cache queue. #2764
3435
* [BUGFIX] Fix race in processing of headers in sharded queries. #2762

docs/configuration/config-file-reference.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3067,6 +3067,12 @@ bucket_store:
30673067
# CLI flag: -experimental.tsdb.store-gateway-enabled
30683068
[store_gateway_enabled: <boolean> | default = false]
30693069
3070+
# If true, and transfer of blocks on shutdown fails or is disabled, incomplete
3071+
# blocks are flushed to storage instead. If false, incomplete blocks will be
3072+
# reused after restart, and uploaded when finished.
3073+
# CLI flag: -experimental.tsdb.flush-blocks-on-shutdown
3074+
[flush_blocks_on_shutdown: <boolean> | default = false]
3075+
30703076
# limit the number of concurrently opening TSDB's on startup
30713077
# CLI flag: -experimental.tsdb.max-tsdb-opening-concurrency-on-startup
30723078
[max_tsdb_opening_concurrency_on_startup: <int> | default = 10]

docs/operations/blocks-storage.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,12 @@ tsdb:
452452
# CLI flag: -experimental.tsdb.store-gateway-enabled
453453
[store_gateway_enabled: <boolean> | default = false]
454454
455+
# If true, and transfer of blocks on shutdown fails or is disabled, incomplete
456+
# blocks are flushed to storage instead. If false, incomplete blocks will be
457+
# reused after restart, and uploaded when finished.
458+
# CLI flag: -experimental.tsdb.flush-blocks-on-shutdown
459+
[flush_blocks_on_shutdown: <boolean> | default = false]
460+
455461
# limit the number of concurrently opening TSDB's on startup
456462
# CLI flag: -experimental.tsdb.max-tsdb-opening-concurrency-on-startup
457463
[max_tsdb_opening_concurrency_on_startup: <int> | default = 10]

pkg/ingester/flush.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ const (
2424
// Flush triggers a flush of all the chunks and closes the flush queues.
2525
// Called from the Lifecycler as part of the ingester shutdown.
2626
func (i *Ingester) Flush() {
27+
if i.cfg.TSDBEnabled {
28+
i.v2LifecyclerFlush()
29+
return
30+
}
31+
2732
level.Info(util.Logger).Log("msg", "starting to flush all the chunks")
2833
i.sweepUsers(true)
2934
level.Info(util.Logger).Log("msg", "flushing of chunks complete")
@@ -39,6 +44,11 @@ func (i *Ingester) Flush() {
3944
// FlushHandler triggers a flush of all in memory chunks. Mainly used for
4045
// local testing.
4146
func (i *Ingester) FlushHandler(w http.ResponseWriter, r *http.Request) {
47+
if i.cfg.TSDBEnabled {
48+
i.v2FlushHandler(w, r)
49+
return
50+
}
51+
4252
level.Info(util.Logger).Log("msg", "starting to flush all the chunks")
4353
i.sweepUsers(true)
4454
level.Info(util.Logger).Log("msg", "flushing of chunks complete")

pkg/ingester/ingester.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
108108
// Ingester deals with "in flight" chunks. Based on Prometheus 1.x
109109
// MemorySeriesStorage.
110110
type Ingester struct {
111-
services.Service
111+
*services.BasicService
112112

113113
cfg Config
114114
clientConfig client.Config
@@ -207,7 +207,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
207207
i.subservicesWatcher = services.NewFailureWatcher()
208208
i.subservicesWatcher.WatchService(i.lifecycler)
209209

210-
i.Service = services.NewBasicService(i.starting, i.loop, i.stopping)
210+
i.BasicService = services.NewBasicService(i.starting, i.loop, i.stopping)
211211
return i, nil
212212
}
213213

@@ -276,7 +276,7 @@ func NewForFlusher(cfg Config, clientConfig client.Config, chunkStore ChunkStore
276276
wal: &noopWAL{},
277277
}
278278

279-
i.Service = services.NewBasicService(i.startingForFlusher, i.loop, i.stopping)
279+
i.BasicService = services.NewBasicService(i.startingForFlusher, i.loop, i.stopping)
280280
return i, nil
281281
}
282282

pkg/ingester/ingester_v2.go

Lines changed: 121 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ type TSDBState struct {
135135

136136
tsdbMetrics *tsdbMetrics
137137

138+
forceCompactTrigger chan chan<- struct{}
139+
shipTrigger chan chan<- struct{}
140+
138141
// Head compactions metrics.
139142
compactionsTriggered prometheus.Counter
140143
compactionsFailed prometheus.Counter
@@ -161,9 +164,11 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
161164
usersMetadata: map[string]*userMetricsMetadata{},
162165
wal: &noopWAL{},
163166
TSDBState: TSDBState{
164-
dbs: make(map[string]*userTSDB),
165-
bucket: bucketClient,
166-
tsdbMetrics: newTSDBMetrics(registerer),
167+
dbs: make(map[string]*userTSDB),
168+
bucket: bucketClient,
169+
tsdbMetrics: newTSDBMetrics(registerer),
170+
forceCompactTrigger: make(chan chan<- struct{}),
171+
shipTrigger: make(chan chan<- struct{}),
167172

168173
compactionsTriggered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
169174
Name: "cortex_ingester_tsdb_compactions_triggered_total",
@@ -207,7 +212,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
207212
}, i.numSeriesInTSDB)
208213
}
209214

210-
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true, registerer)
215+
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, cfg.TSDBConfig.FlushBlocksOnShutdown, registerer)
211216
if err != nil {
212217
return nil, err
213218
}
@@ -218,7 +223,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
218223
i.limiter = NewLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)
219224
i.userStates = newUserStates(i.limiter, cfg, i.metrics)
220225

221-
i.Service = services.NewBasicService(i.startingV2, i.updateLoop, i.stoppingV2)
226+
i.BasicService = services.NewBasicService(i.startingV2, i.updateLoop, i.stoppingV2)
222227
return i, nil
223228
}
224229

@@ -1055,6 +1060,15 @@ func (i *Ingester) shipBlocksLoop(ctx context.Context) error {
10551060
case <-shipTicker.C:
10561061
i.shipBlocks(ctx)
10571062

1063+
case ch := <-i.TSDBState.shipTrigger:
1064+
i.shipBlocks(ctx)
1065+
1066+
// Notify back.
1067+
select {
1068+
case ch <- struct{}{}:
1069+
default: // Nobody is waiting for notification, don't block this loop.
1070+
}
1071+
10581072
case <-ctx.Done():
10591073
return nil
10601074
}
@@ -1096,17 +1110,28 @@ func (i *Ingester) compactionLoop(ctx context.Context) error {
10961110
for {
10971111
select {
10981112
case <-ticker.C:
1099-
i.compactBlocks(ctx)
1113+
i.compactBlocks(ctx, false)
1114+
1115+
case ch := <-i.TSDBState.forceCompactTrigger:
1116+
i.compactBlocks(ctx, true)
1117+
1118+
// Notify back.
1119+
select {
1120+
case ch <- struct{}{}:
1121+
default: // Nobody is waiting for notification, don't block this loop.
1122+
}
11001123

11011124
case <-ctx.Done():
11021125
return nil
11031126
}
11041127
}
11051128
}
11061129

1107-
func (i *Ingester) compactBlocks(ctx context.Context) {
1108-
// Don't compact TSDB blocks while JOINING or LEAVING, as there may be ongoing blocks transfers.
1109-
if ingesterState := i.lifecycler.GetState(); ingesterState == ring.JOINING || ingesterState == ring.LEAVING {
1130+
// Compacts all compactable blocks. Force flag will force compaction even if head is not compactable yet.
1131+
func (i *Ingester) compactBlocks(ctx context.Context, force bool) {
1132+
// Don't compact TSDB blocks while JOINING as there may be ongoing blocks transfers.
1133+
// Compaction loop is not running in LEAVING state, so if we get here in LEAVING state, we're flushing blocks.
1134+
if ingesterState := i.lifecycler.GetState(); ingesterState == ring.JOINING {
11101135
level.Info(util.Logger).Log("msg", "TSDB blocks compaction has been skipped because of the current ingester state", "state", ingesterState)
11111136
return
11121137
}
@@ -1126,18 +1151,28 @@ func (i *Ingester) compactBlocks(ctx context.Context) {
11261151
var err error
11271152

11281153
i.TSDBState.compactionsTriggered.Inc()
1129-
if i.cfg.TSDBConfig.HeadCompactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.cfg.TSDBConfig.HeadCompactionIdleTimeout) {
1130-
level.Debug(util.Logger).Log("msg", "Forcing compaction due to TSDB being idle")
1154+
1155+
reason := ""
1156+
switch {
1157+
case force:
1158+
reason = "forced"
11311159
err = userDB.CompactHead(tsdb.NewRangeHead(h, h.MinTime(), h.MaxTime()))
1132-
} else {
1160+
1161+
case i.cfg.TSDBConfig.HeadCompactionIdleTimeout > 0 && userDB.isIdle(time.Now(), i.cfg.TSDBConfig.HeadCompactionIdleTimeout):
1162+
reason = "idle"
1163+
level.Info(util.Logger).Log("msg", "TSDB is idle, forcing compaction", "user", userID)
1164+
err = userDB.CompactHead(tsdb.NewRangeHead(h, h.MinTime(), h.MaxTime()))
1165+
1166+
default:
1167+
reason = "regular"
11331168
err = userDB.Compact()
11341169
}
11351170

11361171
if err != nil {
11371172
i.TSDBState.compactionsFailed.Inc()
1138-
level.Warn(util.Logger).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err)
1173+
level.Warn(util.Logger).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err, "compactReason", reason)
11391174
} else {
1140-
level.Debug(util.Logger).Log("msg", "TSDB blocks compaction completed successfully", "user", userID)
1175+
level.Debug(util.Logger).Log("msg", "TSDB blocks compaction completed successfully", "user", userID, "compactReason", reason)
11411176
}
11421177
})
11431178
}
@@ -1173,3 +1208,75 @@ sendLoop:
11731208
// wait for ongoing workers to finish.
11741209
wg.Wait()
11751210
}
1211+
1212+
// This method is called as part of Lifecycler's shutdown, to flush all data.
1213+
// Lifecycler shutdown happens as part of Ingester shutdown (see stoppingV2 method).
1214+
// Samples are not received at this stage. Compaction and Shipping loops have already been stopped as well.
1215+
func (i *Ingester) v2LifecyclerFlush() {
1216+
level.Info(util.Logger).Log("msg", "starting to flush and ship TSDB blocks")
1217+
1218+
ctx := context.Background()
1219+
1220+
i.compactBlocks(ctx, true)
1221+
if i.cfg.TSDBConfig.ShipInterval > 0 {
1222+
i.shipBlocks(ctx)
1223+
}
1224+
1225+
level.Info(util.Logger).Log("msg", "finished flushing and shipping TSDB blocks")
1226+
}
1227+
1228+
// Blocks version of Flush handler. It force-compacts blocks, and triggers shipping.
1229+
func (i *Ingester) v2FlushHandler(w http.ResponseWriter, _ *http.Request) {
1230+
go func() {
1231+
ingCtx := i.BasicService.ServiceContext()
1232+
if ingCtx == nil || ingCtx.Err() != nil {
1233+
level.Info(util.Logger).Log("msg", "flushing TSDB blocks: ingester not running, ignoring flush request")
1234+
return
1235+
}
1236+
1237+
ch := make(chan struct{}, 1)
1238+
1239+
level.Info(util.Logger).Log("msg", "flushing TSDB blocks: triggering compaction")
1240+
select {
1241+
case i.TSDBState.forceCompactTrigger <- ch:
1242+
// Compacting now.
1243+
case <-ingCtx.Done():
1244+
level.Warn(util.Logger).Log("msg", "failed to compact TSDB blocks, ingester not running anymore")
1245+
return
1246+
}
1247+
1248+
// Wait until notified about compaction being finished.
1249+
select {
1250+
case <-ch:
1251+
level.Info(util.Logger).Log("msg", "finished compacting TSDB blocks")
1252+
case <-ingCtx.Done():
1253+
level.Warn(util.Logger).Log("msg", "failed to compact TSDB blocks, ingester not running anymore")
1254+
return
1255+
}
1256+
1257+
if i.cfg.TSDBConfig.ShipInterval > 0 {
1258+
level.Info(util.Logger).Log("msg", "flushing TSDB blocks: triggering shipping")
1259+
1260+
select {
1261+
case i.TSDBState.shipTrigger <- ch:
1262+
// shipping now
1263+
case <-ingCtx.Done():
1264+
level.Warn(util.Logger).Log("msg", "failed to ship TSDB blocks, ingester not running anymore")
1265+
return
1266+
}
1267+
1268+
// Wait until shipping finished.
1269+
select {
1270+
case <-ch:
1271+
level.Info(util.Logger).Log("msg", "shipping of TSDB blocks finished")
1272+
case <-ingCtx.Done():
1273+
level.Warn(util.Logger).Log("msg", "failed to ship TSDB blocks, ingester not running anymore")
1274+
return
1275+
}
1276+
}
1277+
1278+
level.Info(util.Logger).Log("msg", "flushing TSDB blocks: finished")
1279+
}()
1280+
1281+
w.WriteHeader(http.StatusNoContent)
1282+
}

0 commit comments

Comments
 (0)