@@ -72,11 +72,10 @@ type walWrapper struct {
72
72
checkpointDeleteTotal prometheus.Counter
73
73
checkpointCreationFail prometheus.Counter
74
74
checkpointCreationTotal prometheus.Counter
75
+ checkpointDuration prometheus.Summary
75
76
}
76
77
77
- // newWAL creates a WAL object.
78
- // * If the WAL is disabled, then the returned WAL is a no-op WAL.
79
- // * If WAL recovery is enabled, then the userStates is always set for ingester.
78
+ // newWAL creates a WAL object. If the WAL is disabled, then the returned WAL is a no-op WAL.
80
79
func newWAL (cfg WALConfig , userStatesFunc func () map [string ]* userState ) (WAL , error ) {
81
80
if ! cfg .walEnabled {
82
81
return & noopWAL {}, nil
@@ -114,12 +113,18 @@ func newWAL(cfg WALConfig, userStatesFunc func() map[string]*userState) (WAL, er
114
113
Name : "ingester_checkpoint_creations_total" ,
115
114
Help : "Total number of checkpoint creations attempted." ,
116
115
})
116
+ w .checkpointDuration = prometheus .NewSummary (prometheus.SummaryOpts {
117
+ Name : "ingester_checkpoint_duration_seconds" ,
118
+ Help : "Time taken to create a checkpoint." ,
119
+ Objectives : map [float64 ]float64 {0.5 : 0.05 , 0.9 : 0.01 , 0.99 : 0.001 },
120
+ })
117
121
if cfg .metricsRegisterer != nil {
118
122
cfg .metricsRegisterer .MustRegister (
119
123
w .checkpointDeleteFail ,
120
124
w .checkpointDeleteTotal ,
121
125
w .checkpointCreationFail ,
122
126
w .checkpointCreationTotal ,
127
+ w .checkpointDuration ,
123
128
)
124
129
}
125
130
@@ -131,7 +136,6 @@ func newWAL(cfg WALConfig, userStatesFunc func() map[string]*userState) (WAL, er
131
136
func (w * walWrapper ) Stop () {
132
137
close (w .quit )
133
138
w .wait .Wait ()
134
-
135
139
w .wal .Close ()
136
140
}
137
141
@@ -161,7 +165,7 @@ func (w *walWrapper) run() {
161
165
ticker := time .NewTicker (w .cfg .checkpointDuration )
162
166
defer ticker .Stop ()
163
167
164
- for ! w . isStopped () {
168
+ for {
165
169
select {
166
170
case <- ticker .C :
167
171
start := time .Now ()
@@ -172,7 +176,9 @@ func (w *walWrapper) run() {
172
176
}
173
177
elapsed := time .Since (start )
174
178
level .Info (util .Logger ).Log ("msg" , "checkpoint done" , "time" , elapsed .String ())
179
+ w .checkpointDuration .Observe (elapsed .Seconds ())
175
180
case <- w .quit :
181
+ level .Info (util .Logger ).Log ("msg" , "creating checkpoint before shutdown" )
176
182
if err := w .performCheckpoint (); err != nil {
177
183
level .Error (util .Logger ).Log ("msg" , "error checkpointing series during shutdown" , "err" , err )
178
184
}
@@ -181,15 +187,6 @@ func (w *walWrapper) run() {
181
187
}
182
188
}
183
189
184
- func (w * walWrapper ) isStopped () bool {
185
- select {
186
- case <- w .quit :
187
- return true
188
- default :
189
- return false
190
- }
191
- }
192
-
193
190
const checkpointPrefix = "checkpoint."
194
191
195
192
func (w * walWrapper ) performCheckpoint () (err error ) {
@@ -511,11 +508,8 @@ Loop:
511
508
case capturedErr = <- errChan :
512
509
return capturedErr
513
510
default :
514
- if err := reader .Err (); err != nil {
515
- return err
516
- }
511
+ return reader .Err ()
517
512
}
518
- return nil
519
513
}
520
514
521
515
func copyLabelAdapters (las []client.LabelAdapter ) []client.LabelAdapter {
@@ -563,6 +557,7 @@ func processCheckpointRecord(userStates *userStates, seriesPool *sync.Pool, stat
563
557
errChan <- err
564
558
return
565
559
}
560
+ memoryChunks .Add (float64 (len (descs )))
566
561
567
562
seriesCache [s.UserId ][s.Fingerprint ] = series
568
563
seriesPool .Put (s )
@@ -705,16 +700,8 @@ Loop:
705
700
case capturedErr = <- errChan :
706
701
return capturedErr
707
702
default :
708
- if err := reader .Err (); err != nil {
709
- return err
710
- }
703
+ return reader .Err ()
711
704
}
712
-
713
- if err != nil {
714
- return err
715
- }
716
-
717
- return nil
718
705
}
719
706
720
707
func processWALSamples (userStates * userStates , stateCache map [string ]* userState , seriesCache map [string ]map [uint64 ]* memorySeries ,
0 commit comments