Skip to content

Commit 9539ae1

Browse files
authored
Merge pull request #1654 from cortexproject/optimise-append
Small optimisations to Ingester.append()
2 parents 069e08e + f54a9c0 commit 9539ae1

File tree

1 file changed

+18
-15
lines changed

1 file changed

+18
-15
lines changed

pkg/ingester/ingester.go

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -152,13 +152,12 @@ type Ingester struct {
152152
lifecycler *ring.Lifecycler
153153
limits *validation.Overrides
154154

155-
stopLock sync.RWMutex
156-
stopped bool
157-
quit chan struct{}
158-
done sync.WaitGroup
155+
quit chan struct{}
156+
done sync.WaitGroup
159157

160-
userStatesMtx sync.RWMutex
158+
userStatesMtx sync.RWMutex // protects userStates and stopped
161159
userStates *userStates
160+
stopped bool // protected by userStatesMtx
162161

163162
// One queue per flush thread. Fingerprint is used to
164163
// pick a queue.
@@ -247,8 +246,8 @@ func (i *Ingester) Shutdown() {
247246

248247
// StopIncomingRequests is called during the shutdown process.
249248
func (i *Ingester) StopIncomingRequests() {
250-
i.stopLock.Lock()
251-
defer i.stopLock.Unlock()
249+
i.userStatesMtx.Lock()
250+
defer i.userStatesMtx.Unlock()
252251
i.stopped = true
253252
}
254253

@@ -286,21 +285,25 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
286285
func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum) error {
287286
labels.removeBlanks()
288287

289-
i.stopLock.RLock()
290-
defer i.stopLock.RUnlock()
288+
var (
289+
state *userState
290+
fp model.Fingerprint
291+
)
292+
i.userStatesMtx.RLock()
293+
defer func() {
294+
i.userStatesMtx.RUnlock()
295+
if state != nil {
296+
state.fpLocker.Unlock(fp)
297+
}
298+
}()
291299
if i.stopped {
292300
return fmt.Errorf("ingester stopping")
293301
}
294-
295-
i.userStatesMtx.RLock()
296-
defer i.userStatesMtx.RUnlock()
297302
state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels)
298303
if err != nil {
304+
state = nil // don't want to unlock the fp if there is an error
299305
return err
300306
}
301-
defer func() {
302-
state.fpLocker.Unlock(fp)
303-
}()
304307

305308
prevNumChunks := len(series.chunkDescs)
306309
if i.cfg.SpreadFlushes && prevNumChunks > 0 {

0 commit comments

Comments
 (0)