diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 0531a58b558..43bda108eeb 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -152,13 +152,12 @@ type Ingester struct { lifecycler *ring.Lifecycler limits *validation.Overrides - stopLock sync.RWMutex - stopped bool - quit chan struct{} - done sync.WaitGroup + quit chan struct{} + done sync.WaitGroup - userStatesMtx sync.RWMutex + userStatesMtx sync.RWMutex // protects userStates and stopped userStates *userStates + stopped bool // protected by userStatesMtx // One queue per flush thread. Fingerprint is used to // pick a queue. @@ -247,8 +246,8 @@ func (i *Ingester) Shutdown() { // StopIncomingRequests is called during the shutdown process. func (i *Ingester) StopIncomingRequests() { - i.stopLock.Lock() - defer i.stopLock.Unlock() + i.userStatesMtx.Lock() + defer i.userStatesMtx.Unlock() i.stopped = true } @@ -286,21 +285,25 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum) error { labels.removeBlanks() - i.stopLock.RLock() - defer i.stopLock.RUnlock() + var ( + state *userState + fp model.Fingerprint + ) + i.userStatesMtx.RLock() + defer func() { + i.userStatesMtx.RUnlock() + if state != nil { + state.fpLocker.Unlock(fp) + } + }() if i.stopped { return fmt.Errorf("ingester stopping") } - - i.userStatesMtx.RLock() - defer i.userStatesMtx.RUnlock() state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels) if err != nil { + state = nil // don't want to unlock the fp if there is an error return err } - defer func() { - state.fpLocker.Unlock(fp) - }() prevNumChunks := len(series.chunkDescs) if i.cfg.SpreadFlushes && prevNumChunks > 0 {