Skip to content

Commit 852f415

Browse files
committed
Check for unshipped blocks before deleting
Signed-off-by: Ganesh Vernekar <[email protected]>
1 parent 1f5c8d0 commit 852f415

File tree

2 files changed

+49
-1
lines changed

2 files changed

+49
-1
lines changed

pkg/ingester/ingester_v2.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package ingester
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"io"
8+
"io/ioutil"
79
"math"
810
"net/http"
911
"os"
@@ -12,6 +14,7 @@ import (
1214
"time"
1315

1416
"github.com/go-kit/kit/log/level"
17+
"github.com/oklog/ulid"
1518
"github.com/pkg/errors"
1619
"github.com/prometheus/client_golang/prometheus"
1720
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -120,6 +123,38 @@ func (u *userTSDB) setLastUpdate(t time.Time) {
120123
u.lastUpdate.Store(t.Unix())
121124
}
122125

126+
func (u *userTSDB) getShippedBlocksULID() ([]ulid.ULID, error) {
127+
b, err := ioutil.ReadFile(filepath.Join(u.Dir(), shipper.MetaFilename))
128+
if err != nil {
129+
return nil, errors.Wrap(err, "read shipper meta file")
130+
}
131+
var shipperMeta shipper.Meta
132+
if err := json.Unmarshal(b, &shipperMeta); err != nil {
133+
return nil, errors.Wrap(err, "unmarshal shipper meta file to json")
134+
}
135+
136+
return shipperMeta.Uploaded, nil
137+
}
138+
139+
func (u *userTSDB) getUnshippedBlocksULID() (unshipped []ulid.ULID, err error) {
140+
shippedBlocks, err := u.getShippedBlocksULID()
141+
if err != nil {
142+
return nil, errors.Wrap(err, "get shipped blocks")
143+
}
144+
145+
Outer:
146+
for _, b := range u.Blocks() {
147+
for _, uid := range shippedBlocks {
148+
if uid == b.Meta().ULID {
149+
continue Outer
150+
}
151+
}
152+
unshipped = append(unshipped, b.Meta().ULID)
153+
}
154+
155+
return unshipped, nil
156+
}
157+
123158
// TSDBState holds data structures used by the TSDB storage engine
124159
type TSDBState struct {
125160
dbs map[string]*userTSDB // tsdb sharded by userID

pkg/ingester/ingester_v2_backfill.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,20 @@ func (i *Ingester) closeOldBackfillTSDBsAndDelete(gracePeriod int64) error {
471471
if err := db.Close(); err != nil {
472472
return errors.Wrap(err, "close backfill TSDB")
473473
}
474-
// TODO(codesome): check if the blocks are shipped.
474+
475+
unshippedBlocks, err := db.getUnshippedBlocksULID()
476+
if err != nil {
477+
return errors.Wrap(err, "get unshipped blocks")
478+
}
479+
if len(unshippedBlocks) > 0 {
480+
// Ship the unshipped blocks.
481+
uploaded, err := db.shipper.Sync(context.Background())
482+
if err != nil {
483+
return errors.Wrap(err, "ship block")
484+
}
485+
level.Debug(util.Logger).Log("msg", "shipper successfully synchronized backfill TSDB blocks with storage", "user", db.userID, "uploaded", uploaded, "bucket_dir", db.Dir())
486+
}
487+
475488
if err := os.RemoveAll(db.Dir()); err != nil {
476489
return errors.Wrap(err, "delete backfill TSDB dir")
477490
}

0 commit comments

Comments
 (0)