Skip to content
Merged
2 changes: 1 addition & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
}

if cfg.TSDBEnabled {
return NewV2(cfg, clientConfig, limits, chunkStore, registerer)
return NewV2(cfg, clientConfig, limits, registerer)
}

i := &Ingester{
Expand Down
136 changes: 75 additions & 61 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ingester
import (
"fmt"
"net/http"
"path/filepath"
"time"

"github.com/cortexproject/cortex/pkg/ingester/client"
Expand All @@ -26,14 +25,18 @@ import (
old_ctx "golang.org/x/net/context"
)

const (
errTSDBCreateIncompatibleState = "cannot create a new TSDB while the ingester is not in active state (current state: %s)"
)

// TSDBState holds data structures used by the TSDB storage engine
type TSDBState struct {
dbs map[string]*tsdb.DB // tsdb sharded by userID
bucket objstore.Bucket
}

// NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage
func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, chunkStore ChunkStore, registerer prometheus.Registerer) (*Ingester, error) {
func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) {
bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg.TSDBConfig, "cortex", util.Logger)
if err != nil {
return nil, err
Expand All @@ -44,7 +47,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
clientConfig: clientConfig,
metrics: newIngesterMetrics(registerer),
limits: limits,
chunkStore: chunkStore,
chunkStore: nil,
quit: make(chan struct{}),

TSDBState: TSDBState{
Expand Down Expand Up @@ -77,7 +80,7 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien
return nil, fmt.Errorf("no user id")
}

db, err := i.getOrCreateTSDB(userID)
db, err := i.getOrCreateTSDB(userID, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -151,9 +154,9 @@ func (i *Ingester) v2Query(ctx old_ctx.Context, req *client.QueryRequest) (*clie

i.metrics.queries.Inc()

db, err := i.getOrCreateTSDB(userID)
if err != nil {
return nil, fmt.Errorf("failed to find/create user db: %v", err)
db := i.getTSDB(userID)
if db == nil {
return &client.QueryResponse{}, nil
}

q, err := db.Querier(int64(from), int64(through))
Expand Down Expand Up @@ -220,9 +223,9 @@ func (i *Ingester) v2LabelValues(ctx old_ctx.Context, req *client.LabelValuesReq
return nil, err
}

db, err := i.getOrCreateTSDB(userID)
if err != nil {
return nil, fmt.Errorf("failed to find/create user db: %v", err)
db := i.getTSDB(userID)
if db == nil {
return &client.LabelValuesResponse{}, nil
}

through := time.Now()
Expand All @@ -249,9 +252,9 @@ func (i *Ingester) v2LabelNames(ctx old_ctx.Context, req *client.LabelNamesReque
return nil, err
}

db, err := i.getOrCreateTSDB(userID)
if err != nil {
return nil, fmt.Errorf("failed to find/create user db: %v", err)
db := i.getTSDB(userID)
if db == nil {
return &client.LabelNamesResponse{}, nil
}

through := time.Now()
Expand Down Expand Up @@ -279,58 +282,69 @@ func (i *Ingester) getTSDB(userID string) *tsdb.DB {
return db
}

func (i *Ingester) getOrCreateTSDB(userID string) (*tsdb.DB, error) {
func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error) {
db := i.getTSDB(userID)
if db == nil {
i.userStatesMtx.Lock()
defer i.userStatesMtx.Unlock()

// Check again for DB in the event it was created in-between locks
var ok bool
db, ok = i.TSDBState.dbs[userID]
if !ok {

udir := i.userDir(userID)

// Create a new user database
var err error
db, err = tsdb.Open(udir, util.Logger, nil, &tsdb.Options{
RetentionDuration: uint64(i.cfg.TSDBConfig.Retention / time.Millisecond),
BlockRanges: i.cfg.TSDBConfig.BlockRanges.ToMillisecondRanges(),
NoLockfile: true,
})
if err != nil {
return nil, err
}
if db != nil {
return db, nil
}

// Thanos shipper requires at least 1 external label to be set. For this reason,
// we set the tenant ID as external label and we'll filter it out when reading
// the series from the storage.
l := lbls.Labels{
{
Name: cortex_tsdb.TenantIDExternalLabel,
Value: userID,
},
}
i.userStatesMtx.Lock()
defer i.userStatesMtx.Unlock()

// Create a new shipper for this database
s := shipper.New(util.Logger, nil, udir, &Bucket{userID, i.TSDBState.bucket}, func() lbls.Labels { return l }, metadata.ReceiveSource)
i.done.Add(1)
go func() {
defer i.done.Done()
runutil.Repeat(i.cfg.TSDBConfig.ShipInterval, i.quit, func() error {
if uploaded, err := s.Sync(context.Background()); err != nil {
level.Warn(util.Logger).Log("err", err, "uploaded", uploaded)
}
return nil
})
}()

i.TSDBState.dbs[userID] = db
}
// Check again for DB in the event it was created in-between locks
var ok bool
db, ok = i.TSDBState.dbs[userID]
if ok {
return db, nil
}

// We're ready to create the TSDB, however we must be sure that the ingester
// is in the ACTIVE state, otherwise it may conflict with the transfer in/out.
// The TSDB is created when the first series is pushed and this shouldn't happen
// to a non-ACTIVE ingester, however we want to protect from any bug, cause we
// may have data loss or TSDB WAL corruption if the TSDB is created before/during
// a transfer in occurs.
if ingesterState := i.lifecycler.GetState(); !force && ingesterState != ring.ACTIVE {
return nil, fmt.Errorf(errTSDBCreateIncompatibleState, ingesterState)
}

udir := i.cfg.TSDBConfig.BlocksDir(userID)

// Create a new user database
var err error
db, err = tsdb.Open(udir, util.Logger, nil, &tsdb.Options{
RetentionDuration: uint64(i.cfg.TSDBConfig.Retention / time.Millisecond),
BlockRanges: i.cfg.TSDBConfig.BlockRanges.ToMillisecondRanges(),
NoLockfile: true,
})
if err != nil {
return nil, err
}

// Thanos shipper requires at least 1 external label to be set. For this reason,
// we set the tenant ID as external label and we'll filter it out when reading
// the series from the storage.
l := lbls.Labels{
{
Name: cortex_tsdb.TenantIDExternalLabel,
Value: userID,
},
}

// Create a new shipper for this database
s := shipper.New(util.Logger, nil, udir, &Bucket{userID, i.TSDBState.bucket}, func() lbls.Labels { return l }, metadata.ReceiveSource)
i.done.Add(1)
go func() {
defer i.done.Done()
runutil.Repeat(i.cfg.TSDBConfig.ShipInterval, i.quit, func() error {
if uploaded, err := s.Sync(context.Background()); err != nil {
level.Warn(util.Logger).Log("err", err, "uploaded", uploaded)
}
return nil
})
}()

i.TSDBState.dbs[userID] = db

return db, nil
}

func (i *Ingester) userDir(userID string) string { return filepath.Join(i.cfg.TSDBConfig.Dir, userID) }
154 changes: 152 additions & 2 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingester

import (
"fmt"
"io/ioutil"
"math"
"net/http"
Expand Down Expand Up @@ -138,13 +139,21 @@ func TestIngester_v2Push(t *testing.T) {
registry := prometheus.NewRegistry()

// Create a mocked ingester
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), registry)
cfg := defaultIngesterTestConfig()
cfg.LifecyclerConfig.JoinAfter = 0

i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, registry)
require.NoError(t, err)
defer i.Shutdown()
defer cleanup()

ctx := user.InjectOrgID(context.Background(), "test")

// Wait until the ingester is ACTIVE
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
return i.lifecycler.GetState()
})

// Push timeseries
for idx, req := range testData.reqs {
_, err := i.v2Push(ctx, req)
Expand Down Expand Up @@ -388,6 +397,147 @@ func Test_Ingester_v2Query(t *testing.T) {
})
}
}
func TestIngester_v2Query_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) {
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
require.NoError(t, err)
defer i.Shutdown()
defer cleanup()

// Mock request
userID := "test"
ctx := user.InjectOrgID(context.Background(), userID)
req := &client.QueryRequest{}

res, err := i.v2Query(ctx, req)
require.NoError(t, err)
assert.Equal(t, &client.QueryResponse{}, res)

// Check if the TSDB has been created
_, tsdbCreated := i.TSDBState.dbs[userID]
assert.False(t, tsdbCreated)
}

func TestIngester_v2LabelValues_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) {
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
require.NoError(t, err)
defer i.Shutdown()
defer cleanup()

// Mock request
userID := "test"
ctx := user.InjectOrgID(context.Background(), userID)
req := &client.LabelValuesRequest{}

res, err := i.v2LabelValues(ctx, req)
require.NoError(t, err)
assert.Equal(t, &client.LabelValuesResponse{}, res)

// Check if the TSDB has been created
_, tsdbCreated := i.TSDBState.dbs[userID]
assert.False(t, tsdbCreated)
}

func TestIngester_v2LabelNames_ShouldNotCreateTSDBIfDoesNotExists(t *testing.T) {
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
require.NoError(t, err)
defer i.Shutdown()
defer cleanup()

// Mock request
userID := "test"
ctx := user.InjectOrgID(context.Background(), userID)
req := &client.LabelNamesRequest{}

res, err := i.v2LabelNames(ctx, req)
require.NoError(t, err)
assert.Equal(t, &client.LabelNamesResponse{}, res)

// Check if the TSDB has been created
_, tsdbCreated := i.TSDBState.dbs[userID]
assert.False(t, tsdbCreated)
}

func TestIngester_v2Push_ShouldNotCreateTSDBIfNotInActiveState(t *testing.T) {
i, cleanup, err := newIngesterMockWithTSDBStorage(defaultIngesterTestConfig(), nil)
require.NoError(t, err)
defer i.Shutdown()
defer cleanup()
require.Equal(t, ring.PENDING, i.lifecycler.GetState())

// Mock request
userID := "test"
ctx := user.InjectOrgID(context.Background(), userID)
req := &client.WriteRequest{}

res, err := i.v2Push(ctx, req)
assert.Equal(t, fmt.Errorf(errTSDBCreateIncompatibleState, "PENDING"), err)
assert.Nil(t, res)

// Check if the TSDB has been created
_, tsdbCreated := i.TSDBState.dbs[userID]
assert.False(t, tsdbCreated)
}

func TestIngester_getOrCreateTSDB_ShouldNotAllowToCreateTSDBIfIngesterStateIsNotActive(t *testing.T) {
tests := map[string]struct {
state ring.IngesterState
expectedErr error
}{
"not allow to create TSDB if in PENDING state": {
state: ring.PENDING,
expectedErr: fmt.Errorf(errTSDBCreateIncompatibleState, ring.PENDING),
},
"not allow to create TSDB if in JOINING state": {
state: ring.JOINING,
expectedErr: fmt.Errorf(errTSDBCreateIncompatibleState, ring.JOINING),
},
"not allow to create TSDB if in LEAVING state": {
state: ring.LEAVING,
expectedErr: fmt.Errorf(errTSDBCreateIncompatibleState, ring.LEAVING),
},
"allow to create TSDB if in ACTIVE state": {
state: ring.ACTIVE,
expectedErr: nil,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
cfg := defaultIngesterTestConfig()
cfg.LifecyclerConfig.JoinAfter = 60 * time.Second

i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, nil)
require.NoError(t, err)
defer i.Shutdown()
defer cleanup()

// Switch ingester state to the expected one in the test
if i.lifecycler.GetState() != testData.state {
var stateChain []ring.IngesterState

if testData.state == ring.LEAVING {
stateChain = []ring.IngesterState{ring.ACTIVE, ring.LEAVING}
} else {
stateChain = []ring.IngesterState{testData.state}
}

for _, s := range stateChain {
err = i.lifecycler.ChangeState(context.Background(), s)
require.NoError(t, err)
}
}

db, err := i.getOrCreateTSDB("test", false)
assert.Equal(t, testData.expectedErr, err)

if testData.expectedErr != nil {
assert.Nil(t, db)
} else {
assert.NotNil(t, db)
}
})
}
}

func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) (*client.WriteRequest, *client.QueryResponse) {
samples := []client.Sample{
Expand Down Expand Up @@ -432,7 +582,7 @@ func newIngesterMockWithTSDBStorage(ingesterCfg Config, registerer prometheus.Re
ingesterCfg.TSDBConfig.Backend = "s3"
ingesterCfg.TSDBConfig.S3.Endpoint = "localhost"

ingester, err := NewV2(ingesterCfg, clientCfg, overrides, nil, registerer)
ingester, err := NewV2(ingesterCfg, clientCfg, overrides, registerer)
if err != nil {
return nil, nil, err
}
Expand Down
Loading