From 191ba10c8273b480738f3043cc7a631aa94ad5c3 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 14 Jan 2020 08:54:49 +0000 Subject: [PATCH 1/2] Report chunks flushed by spread-flushes option under separate label This improves observability of flushing, creating separate labels for chunks that overflowed versus to series that reached their time under spread-flushes behaviour. The flushReason type shrinks to int8 to avoid bloating the chunk desc object. Signed-off-by: Bryan Boreham --- pkg/ingester/flush.go | 10 ++++++++-- pkg/ingester/ingester.go | 2 +- pkg/ingester/series.go | 17 +++++++++++------ 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index d97dc0eb7b9..49b229229ed 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -137,7 +137,7 @@ func (i *Ingester) sweepUsers(immediate bool) { oldestUnflushedChunkTimestamp.Set(float64(oldest.Unix())) } -type flushReason int +type flushReason int8 const ( noFlush = iota @@ -146,6 +146,7 @@ const ( reasonAged reasonIdle reasonStale + reasonSpreadFlush ) func (f flushReason) String() string { @@ -162,6 +163,8 @@ func (f flushReason) String() string { return "Idle" case reasonStale: return "Stale" + case reasonSpreadFlush: + return "Spread" default: panic("unrecognised flushReason") } @@ -196,6 +199,9 @@ func (i *Ingester) shouldFlushSeries(series *memorySeries, fp model.Fingerprint, // Flush if we have more than one chunk, and haven't already flushed the first chunk if len(series.chunkDescs) > 1 && !series.chunkDescs[0].flushed { + if series.chunkDescs[0].flushReason != noFlush { + return series.chunkDescs[0].flushReason + } return reasonMultipleChunksInSeries } else if len(series.chunkDescs) > 0 { // Otherwise look in more detail at the first chunk @@ -287,7 +293,7 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model. // Assume we're going to flush everything, and maybe don't flush the head chunk if it doesn't need it. chunks := series.chunkDescs if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head(), fp, series.isStale()) != noFlush) { - series.closeHead() + series.closeHead(reasonImmediate) } else { chunks = chunks[:len(chunks)-1] } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index ffa80ced26f..1ada90d16bf 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -355,7 +355,7 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, slot := startOfCycle.Add(time.Duration(uint64(fp) % uint64(i.cfg.MaxChunkAge))) // If adding this sample means the head chunk will span that point in time, close so it will get flushed if series.head().FirstTime < slot && timestamp >= slot { - series.closeHead() + series.closeHead(reasonSpreadFlush) } } diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index 7ce8f42f8bc..5bf2f57f0eb 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -123,7 +123,11 @@ func firstAndLastTimes(c encoding.Chunk) (model.Time, model.Time, error) { return first, last, iter.Err() } -func (s *memorySeries) closeHead() { +// closeHead marks the head chunk closed. The caller must have locked +// the fingerprint of the memorySeries. This method will panic if this +// series has no chunk descriptors. +func (s *memorySeries) closeHead(reason flushReason) { + s.chunkDescs[0].flushReason = reason s.headChunkClosed = true } @@ -212,11 +216,12 @@ func (s *memorySeries) isStale() bool { } type desc struct { - C encoding.Chunk // nil if chunk is evicted. - FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable. - LastTime model.Time // Timestamp of last sample. Populated at creation & on append. - LastUpdate model.Time // This server's local time on last change - flushed bool // set to true when flush succeeds + C encoding.Chunk // nil if chunk is evicted. + FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable. + LastTime model.Time // Timestamp of last sample. Populated at creation & on append. + LastUpdate model.Time // This server's local time on last change + flushReason flushReason // If chunk is closed, holds the reason why. + flushed bool // set to true when flush succeeds } func newDesc(c encoding.Chunk, firstTime model.Time, lastTime model.Time) *desc { From 3991c80ba0c8ee3968ec2b3a761ba1e6c4d4d2eb Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Tue, 14 Jan 2020 10:57:02 +0000 Subject: [PATCH 2/2] Add changelog note about new flush_reasons label value Signed-off-by: Bryan Boreham --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 84467d7c45c..8f084da5f25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## master / unreleased +* [ENHANCEMENT] metric `cortex_ingester_flush_reasons` gets a new `reason` value: `Spread`, when `-ingester.spread-flushes` option is enabled. + * [CHANGE] Flags changed with transition to upstream Prometheus rules manager: * `ruler.client-timeout` is now `ruler.configs.client-timeout` in order to match `ruler.configs.url` * `ruler.group-timeout`has been removed