From 17c0c21e4f964140eb1b651392d05b72a327dda2 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 22 Jan 2020 13:45:22 +0000 Subject: [PATCH 1/2] Make sure we return noFlush on an empty series Signed-off-by: Bryan Boreham --- pkg/ingester/flush.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 49b229229ed..a0c6ad23c53 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -193,6 +193,9 @@ func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memo } func (i *Ingester) shouldFlushSeries(series *memorySeries, fp model.Fingerprint, immediate bool) flushReason { + if len(series.chunkDescs) == 0 { + return noFlush + } if immediate { return reasonImmediate } @@ -203,12 +206,9 @@ func (i *Ingester) shouldFlushSeries(series *memorySeries, fp model.Fingerprint, return series.chunkDescs[0].flushReason } return reasonMultipleChunksInSeries - } else if len(series.chunkDescs) > 0 { - // Otherwise look in more detail at the first chunk - return i.shouldFlushChunk(series.chunkDescs[0], fp, series.isStale()) } - - return noFlush + // Otherwise look in more detail at the first chunk + return i.shouldFlushChunk(series.chunkDescs[0], fp, series.isStale()) } func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint, lastValueIsStale bool) flushReason { From df8b95f2109b6fd27212a01b69a42e2518834849 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 22 Jan 2020 13:52:44 +0000 Subject: [PATCH 2/2] Pass the correct flush reason to closeHead() Pass the reason from `shouldFlushChunk()` to `closeHead()`; sending `reasonImmediate` was a bug. We don't need to check `len(chunks)` is non-zero, since that would return `noFlush` from `shouldFlushSeries()`. Signed-off-by: Bryan Boreham --- pkg/ingester/flush.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index a0c6ad23c53..3b73e1ffa77 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -290,11 +290,14 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model. return nil } - // Assume we're going to flush everything, and maybe don't flush the head chunk if it doesn't need it. + // shouldFlushSeries() has told us we have at least one chunk chunks := series.chunkDescs - if immediate || (len(chunks) > 0 && i.shouldFlushChunk(series.head(), fp, series.isStale()) != noFlush) { + if immediate { series.closeHead(reasonImmediate) + } else if chunkReason := i.shouldFlushChunk(series.head(), fp, series.isStale()); chunkReason != noFlush { + series.closeHead(chunkReason) } else { + // The head chunk doesn't need flushing; step back by one. chunks = chunks[:len(chunks)-1] }