@@ -19,22 +19,53 @@ import (
19
19
"github.com/prometheus/prometheus/pkg/gate"
20
20
"github.com/prometheus/prometheus/pkg/labels"
21
21
"github.com/prometheus/prometheus/storage"
22
+ tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
22
23
23
24
"github.com/cortexproject/cortex/pkg/ingester/client"
24
25
"github.com/cortexproject/cortex/pkg/util"
25
26
)
26
27
27
- func (i * Ingester ) v2BackfillPush (userID string , la []client.LabelAdapter , s client.Sample ) error {
28
- bucket , err := i .getOrCreateBackfillTSDB (userID , s .TimestampMs )
29
- if err != nil {
30
- return err
28
+ // backfillAppender is an appender to ingest old data.
29
+ // This _does not_ implement storage.Appender interface.
30
+ // The methods of this appender should not be called concurrently.
31
+ type backfillAppender struct {
32
+ userID string
33
+ ingester * Ingester
34
+ buckets []* tsdbBucket
35
+ appenders map [int ]storage.Appender
36
+ }
37
+
38
+ func (i * Ingester ) newBackfillAppender (userID string ) * backfillAppender {
39
+ return & backfillAppender {
40
+ userID : userID ,
41
+ ingester : i ,
42
+ buckets : i .TSDBState .backfillDBs .getBucketsForUser (userID ),
43
+ appenders : make (map [int ]storage.Appender ),
44
+ }
45
+ }
46
+
47
+ func (a * backfillAppender ) add (la []client.LabelAdapter , s client.Sample ) (err error ) {
48
+ bucket := getBucketForTimestamp (s .TimestampMs , a .buckets )
49
+ if bucket == nil {
50
+ var userBuckets []* tsdbBucket
51
+ bucket , userBuckets , err = a .ingester .getOrCreateBackfillTSDB (a .userID , s .TimestampMs )
52
+ if err != nil {
53
+ return err
54
+ }
55
+ a .buckets = userBuckets
31
56
}
32
57
33
- startAppend := time .Now ()
34
58
db := bucket .db
35
- cachedRef , cachedRefExists := db .refCache .Ref (startAppend , client .FromLabelAdaptersToLabels (la ))
59
+ var app storage.Appender
60
+ if ap , ok := a .appenders [bucket .id ]; ok {
61
+ app = ap
62
+ } else {
63
+ app = db .Appender ()
64
+ a .appenders [bucket .id ] = app
65
+ }
36
66
37
- app := db .Appender ()
67
+ startAppend := time .Now ()
68
+ cachedRef , cachedRefExists := db .refCache .Ref (startAppend , client .FromLabelAdaptersToLabels (la ))
38
69
// If the cached reference exists, we try to use it.
39
70
if cachedRefExists {
40
71
err = app .AddFast (cachedRef , s .TimestampMs , s .Value )
@@ -46,46 +77,66 @@ func (i *Ingester) v2BackfillPush(userID string, la []client.LabelAdapter, s cli
46
77
47
78
// If the cached reference doesn't exist, we (re)try without using the reference.
48
79
if ! cachedRefExists {
49
- var ref uint64
50
-
51
80
// Copy the label set because both TSDB and the cache may retain it.
52
81
copiedLabels := client .FromLabelAdaptersToLabelsWithCopy (la )
53
-
54
- if ref , err = app .Add (copiedLabels , s .TimestampMs , s .Value ); err == nil {
82
+ if ref , err := app .Add (copiedLabels , s .TimestampMs , s .Value ); err == nil {
55
83
db .refCache .SetRef (startAppend , copiedLabels , ref )
56
- cachedRef = ref
57
- cachedRefExists = true
58
84
}
59
85
}
60
86
61
- if err == nil {
62
- err = app .Commit ()
87
+ return err
88
+ }
89
+
90
+ func (a * backfillAppender ) commit () error {
91
+ var merr tsdb_errors.MultiError
92
+ for _ , app := range a .appenders {
93
+ merr .Add (app .Commit ())
63
94
}
95
+ return merr .Err ()
96
+ }
64
97
65
- return err
98
+ func (a * backfillAppender ) rollback () error {
99
+ var merr tsdb_errors.MultiError
100
+ for _ , app := range a .appenders {
101
+ merr .Add (app .Rollback ())
102
+ }
103
+ return merr .Err ()
66
104
}
67
105
68
- func (i * Ingester ) getOrCreateBackfillTSDB (userID string , ts int64 ) (* tsdbBucket , error ) {
69
- userBuckets := i .TSDBState .backfillDBs .getBucketsForUser (userID )
106
+ func getBucketForTimestamp (ts int64 , userBuckets []* tsdbBucket ) * tsdbBucket {
107
+ // As the number of buckets will be small, we are iterating instead of binary search.
108
+ for _ , b := range userBuckets {
109
+ if ts >= b .bucketStart && ts < b .bucketEnd {
110
+ return b
111
+ }
112
+ }
113
+ return nil
114
+ }
115
+
116
+ func (i * Ingester ) getOrCreateBackfillTSDB (userID string , ts int64 ) (* tsdbBucket , []* tsdbBucket , error ) {
117
+ i .TSDBState .backfillDBs .tsdbsMtx .Lock ()
118
+ defer i .TSDBState .backfillDBs .tsdbsMtx .Unlock ()
119
+
120
+ userBuckets := i .TSDBState .backfillDBs .tsdbs [userID ]
70
121
71
122
start , end := getBucketRangesForTimestamp (ts , 1 )
72
123
73
- var bucket * tsdbBucket
74
124
insertIdx := len (userBuckets )
75
125
for idx , b := range userBuckets {
76
126
if ts >= b .bucketStart && ts < b .bucketEnd {
77
- bucket = b
78
- break
127
+ return b , userBuckets , nil
79
128
}
80
129
81
- // Existing: |-----------|
82
- // New: |------------|
130
+ // Existing: |-----------|
131
+ // New: |------------|
132
+ // Changed to: |------| (no overlaps)
83
133
if b .bucketStart < start && start < b .bucketEnd {
84
134
start = b .bucketEnd
85
135
}
86
136
87
- // Existing: |-----------|
88
- // New: |------------|
137
+ // Existing: |-----------|
138
+ // New: |------------|
139
+ // Changed to: |------| (no overlaps)
89
140
if end > b .bucketStart && end < b .bucketEnd {
90
141
end = b .bucketStart
91
142
insertIdx = idx
@@ -98,26 +149,26 @@ func (i *Ingester) getOrCreateBackfillTSDB(userID string, ts int64) (*tsdbBucket
98
149
}
99
150
}
100
151
101
- if bucket == nil {
102
- tsdb , err := i .createNewTSDB (
103
- userID , filepath .Join (userID , getBucketName (start , end )),
104
- (end - start )* 2 , (end - start )* 2 , prometheus .NewRegistry (),
105
- )
106
- if err != nil {
107
- return nil , err
108
- }
109
- bucket = & tsdbBucket {
110
- db : tsdb ,
111
- bucketStart : start ,
112
- bucketEnd : end ,
113
- }
114
- userBuckets = append (userBuckets [:insertIdx ], append ([]* tsdbBucket {bucket }, userBuckets [insertIdx :]... )... )
115
- i .TSDBState .backfillDBs .tsdbsMtx .Lock ()
116
- i .TSDBState .backfillDBs .tsdbs [userID ] = userBuckets
117
- i .TSDBState .backfillDBs .tsdbsMtx .Unlock ()
152
+ tsdb , err := i .createNewTSDB (
153
+ userID , filepath .Join (userID , getBucketName (start , end )),
154
+ (end - start )* 2 , (end - start )* 2 , prometheus .NewRegistry (),
155
+ )
156
+ if err != nil {
157
+ return nil , nil , err
158
+ }
159
+ bucket := & tsdbBucket {
160
+ db : tsdb ,
161
+ bucketStart : start ,
162
+ bucketEnd : end ,
163
+ }
164
+ if len (userBuckets ) > 0 {
165
+ bucket .id = userBuckets [len (userBuckets )- 1 ].id + 1
118
166
}
167
+ userBuckets = append (userBuckets [:insertIdx ], append ([]* tsdbBucket {bucket }, userBuckets [insertIdx :]... )... )
119
168
120
- return bucket , nil
169
+ i .TSDBState .backfillDBs .tsdbs [userID ] = userBuckets
170
+
171
+ return bucket , userBuckets , nil
121
172
}
122
173
123
174
func (i * Ingester ) openExistingBackfillTSDB (ctx context.Context ) error {
@@ -127,6 +178,9 @@ func (i *Ingester) openExistingBackfillTSDB(ctx context.Context) error {
127
178
128
179
users , err := ioutil .ReadDir (i .cfg .TSDBConfig .BackfillDir )
129
180
if err != nil {
181
+ if os .IsNotExist (err ) {
182
+ return nil
183
+ }
130
184
return err
131
185
}
132
186
@@ -145,7 +199,7 @@ func (i *Ingester) openExistingBackfillTSDB(ctx context.Context) error {
145
199
continue
146
200
}
147
201
148
- for _ , bucketName := range bucketNames {
202
+ for bucketID , bucketName := range bucketNames {
149
203
if bucketName .IsDir () {
150
204
continue
151
205
}
@@ -174,21 +228,26 @@ func (i *Ingester) openExistingBackfillTSDB(ctx context.Context) error {
174
228
}
175
229
176
230
wg .Add (1 )
177
- go func (userID , bucketName string ) {
231
+ go func (bucketID int , userID , bucketName string ) {
178
232
defer wg .Done ()
179
233
defer openGate .Done ()
180
234
defer func (ts time.Time ) {
181
235
i .TSDBState .walReplayTime .Observe (time .Since (ts ).Seconds ())
182
236
}(time .Now ())
183
237
184
238
start , end , err := getBucketRangesForBucketName (bucketName )
239
+ if err != nil {
240
+ level .Error (util .Logger ).Log ("msg" , "unable to get bucket range" , "err" , err , "user" , userID , "bucketName" , bucketName )
241
+ return
242
+ }
185
243
db , err := i .createNewTSDB (userID , filepath .Join (userID , bucketName ), (end - start )* 2 , (end - start )* 2 , prometheus .NewRegistry ())
186
244
if err != nil {
187
245
level .Error (util .Logger ).Log ("msg" , "unable to open user backfill TSDB" , "err" , err , "user" , userID )
188
246
return
189
247
}
190
248
191
249
bucket := & tsdbBucket {
250
+ id : bucketID ,
192
251
db : db ,
193
252
bucketStart : start ,
194
253
bucketEnd : end ,
@@ -198,7 +257,7 @@ func (i *Ingester) openExistingBackfillTSDB(ctx context.Context) error {
198
257
// Append at the end, we will sort it at the end.
199
258
i .TSDBState .backfillDBs .tsdbs [userID ] = append (i .TSDBState .backfillDBs .tsdbs [userID ], bucket )
200
259
i .TSDBState .backfillDBs .tsdbsMtx .Unlock ()
201
- }(userID , bucketName .Name ())
260
+ }(bucketID , userID , bucketName .Name ())
202
261
}
203
262
204
263
if runErr != nil {
@@ -278,6 +337,7 @@ func (i *Ingester) backfillSelect(ctx context.Context, userID string, from, thro
278
337
select {
279
338
case err := <- errC :
280
339
return nil , err
340
+ default :
281
341
}
282
342
283
343
return result , nil
@@ -286,7 +346,8 @@ func (i *Ingester) backfillSelect(ctx context.Context, userID string, from, thro
286
346
// Assumes 1h bucket range for . TODO(codesome): protect stuff with locks.
287
347
type backfillTSDBs struct {
288
348
tsdbsMtx sync.RWMutex
289
- tsdbs map [string ][]* tsdbBucket
349
+ // TODO(codesome): have more granular locks.
350
+ tsdbs map [string ][]* tsdbBucket
290
351
}
291
352
292
353
func newBackfillTSDBs () * backfillTSDBs {
@@ -302,6 +363,7 @@ func (b *backfillTSDBs) getBucketsForUser(userID string) []*tsdbBucket {
302
363
}
303
364
304
365
type tsdbBucket struct {
366
+ id int // This is any number but should be unique among all buckets of a user.
305
367
db * userTSDB
306
368
bucketStart , bucketEnd int64
307
369
}
0 commit comments