Skip to content

Do not close and re-open TSDBs during ingester transfer #1854

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 60 additions & 5 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingester
import (
"fmt"
"net/http"
"sync"
"time"

"github.com/cortexproject/cortex/pkg/ingester/client"
Expand Down Expand Up @@ -32,6 +33,13 @@ const (
type TSDBState struct {
dbs map[string]*tsdb.DB // tsdb sharded by userID
bucket objstore.Bucket

// Keeps count of in-flight requests
inflightWriteReqs sync.WaitGroup

// Used to run only once operations at shutdown, during the blocks/wal
// transferring to a joining ingester
transferOnce sync.Once
}

// NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage
Expand Down Expand Up @@ -84,6 +92,24 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien
return nil, err
}

// Ensure the ingester shutdown procedure hasn't started
i.userStatesMtx.RLock()

if i.stopped {
i.userStatesMtx.RUnlock()
return nil, fmt.Errorf("ingester stopping")
}

// Keep track of in-flight requests, in order to safely start blocks transfer
// (at shutdown) only once all in-flight write requests have completed.
// It's important to increase the number of in-flight requests within the lock
// (even if sync.WaitGroup is thread-safe), otherwise there's a race condition
// with the TSDB transfer, which - after the stopped flag is set to true - waits
// until all in-flight requests to reach zero.
i.TSDBState.inflightWriteReqs.Add(1)
i.userStatesMtx.RUnlock()
defer i.TSDBState.inflightWriteReqs.Done()

// Keep track of some stats which are tracked only if the samples will be
// successfully committed
succeededSamplesCount := 0
Expand All @@ -96,10 +122,6 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien
lset := cortex_tsdb.FromLabelAdaptersToLabels(ts.Labels)

for _, s := range ts.Samples {
if i.stopped {
return nil, fmt.Errorf("ingester stopping")
}

_, err := app.Add(lset, s.TimestampMs, s.Value)
if err == nil {
succeededSamplesCount++
Expand All @@ -119,7 +141,7 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien

// The error looks an issue on our side, so we should rollback
if rollbackErr := app.Rollback(); rollbackErr != nil {
level.Warn(util.Logger).Log("failed to rollback on error", "userID", userID, "err", rollbackErr)
level.Warn(util.Logger).Log("msg", "failed to rollback on error", "userID", userID, "err", rollbackErr)
}

return nil, err
Expand Down Expand Up @@ -397,3 +419,36 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error)

return db, nil
}

func (i *Ingester) closeAllTSDB() {
i.userStatesMtx.Lock()

wg := &sync.WaitGroup{}
wg.Add(len(i.TSDBState.dbs))

// Concurrently close all users TSDB
for userID, db := range i.TSDBState.dbs {
userID := userID

go func(db *tsdb.DB) {
defer wg.Done()

if err := db.Close(); err != nil {
level.Warn(util.Logger).Log("msg", "unable to close TSDB", "err", err, "user", userID)
return
}

// Now that the TSDB has been closed, we should remove it from the
// set of open ones. This lock acquisition doesn't deadlock with the
// outer one, because the outer one is released as soon as all go
// routines are started.
i.userStatesMtx.Lock()
delete(i.TSDBState.dbs, userID)
i.userStatesMtx.Unlock()
}(db)
}

// Wait until all Close() completed
i.userStatesMtx.Unlock()
wg.Wait()
}
197 changes: 110 additions & 87 deletions pkg/ingester/lifecycle_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingester

import (
"errors"
"io"
"io/ioutil"
"math"
Expand Down Expand Up @@ -395,94 +396,116 @@ func TestIngesterFlush(t *testing.T) {
}

func TestV2IngesterTransfer(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig())
require.NoError(t, err)

dir1, err := ioutil.TempDir("", "tsdb")
require.NoError(t, err)
dir2, err := ioutil.TempDir("", "tsdb")
require.NoError(t, err)
require.NoError(t, os.Remove(dir2)) // remove the destination dir so there isn't a move conflict

// Start the first ingester, and get it into ACTIVE state.
cfg1 := defaultIngesterTestConfig()
cfg1.TSDBEnabled = true
cfg1.TSDBConfig.Dir = dir1
cfg1.TSDBConfig.S3 = s3.Config{
Endpoint: "dummy",
BucketName: "dummy",
SecretAccessKey: "dummy",
AccessKeyID: "dummy",
scenarios := map[string]struct {
failedTransfers int
}{
"transfer succeeded at first attempt": {
failedTransfers: 0,
},
"transfer failed at first attempt, then succeeded": {
failedTransfers: 1,
},
}
cfg1.LifecyclerConfig.ID = "ingester1"
cfg1.LifecyclerConfig.Addr = "ingester1"
cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second
cfg1.MaxTransferRetries = 10
ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil, nil)
require.NoError(t, err)

test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
return ing1.lifecycler.GetState()
})

// Now write a sample to this ingester
req, expectedResponse := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
ctx := user.InjectOrgID(context.Background(), userID)
_, err = ing1.Push(ctx, req)
require.NoError(t, err)

// Start a second ingester, but let it go into PENDING
cfg2 := defaultIngesterTestConfig()
cfg2.TSDBEnabled = true
cfg2.TSDBConfig.Dir = dir2
cfg2.TSDBConfig.S3 = s3.Config{
Endpoint: "dummy",
BucketName: "dummy",
SecretAccessKey: "dummy",
AccessKeyID: "dummy",
// We run the same under different scenarios
for name, scenario := range scenarios {
t.Run(name, func(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig())
require.NoError(t, err)

dir1, err := ioutil.TempDir("", "tsdb")
require.NoError(t, err)
dir2, err := ioutil.TempDir("", "tsdb")
require.NoError(t, err)
require.NoError(t, os.Remove(dir2)) // remove the destination dir so there isn't a move conflict

// Start the first ingester, and get it into ACTIVE state.
cfg1 := defaultIngesterTestConfig()
cfg1.TSDBEnabled = true
cfg1.TSDBConfig.Dir = dir1
cfg1.TSDBConfig.S3 = s3.Config{
Endpoint: "dummy",
BucketName: "dummy",
SecretAccessKey: "dummy",
AccessKeyID: "dummy",
}
cfg1.LifecyclerConfig.ID = "ingester1"
cfg1.LifecyclerConfig.Addr = "ingester1"
cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second
cfg1.MaxTransferRetries = 10
ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil, nil)
require.NoError(t, err)

test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
return ing1.lifecycler.GetState()
})

// Now write a sample to this ingester
req, expectedResponse := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
ctx := user.InjectOrgID(context.Background(), userID)
_, err = ing1.Push(ctx, req)
require.NoError(t, err)

// Start a second ingester, but let it go into PENDING
cfg2 := defaultIngesterTestConfig()
cfg2.TSDBEnabled = true
cfg2.TSDBConfig.Dir = dir2
cfg2.TSDBConfig.S3 = s3.Config{
Endpoint: "dummy",
BucketName: "dummy",
SecretAccessKey: "dummy",
AccessKeyID: "dummy",
}
cfg2.LifecyclerConfig.RingConfig.KVStore.Mock = cfg1.LifecyclerConfig.RingConfig.KVStore.Mock
cfg2.LifecyclerConfig.ID = "ingester2"
cfg2.LifecyclerConfig.Addr = "ingester2"
cfg2.LifecyclerConfig.JoinAfter = 100 * time.Second
ing2, err := New(cfg2, defaultClientTestConfig(), limits, nil, nil)
require.NoError(t, err)

// Let ing1 send blocks/wal to ing2
ingesterClientFactoryCount := 0

ing1.cfg.ingesterClientFactory = func(addr string, _ client.Config) (client.HealthAndIngesterClient, error) {
if ingesterClientFactoryCount++; ingesterClientFactoryCount <= scenario.failedTransfers {
return nil, errors.New("mocked error simulating the case the leaving ingester is unable to connect to the joining ingester")
}

return ingesterClientAdapater{
ingester: ing2,
}, nil
}

// Now stop the first ingester, and wait for the second ingester to become ACTIVE.
ing1.Shutdown()
test.Poll(t, 10*time.Second, ring.ACTIVE, func() interface{} {
return ing2.lifecycler.GetState()
})

// And check the second ingester has the sample
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "foo")
require.NoError(t, err)

request, err := client.ToQueryRequest(model.TimeFromUnix(0), model.TimeFromUnix(200), []*labels.Matcher{matcher})
require.NoError(t, err)

response, err := ing2.Query(ctx, request)
require.NoError(t, err)
assert.Equal(t, expectedResponse, response)

// Check we can send the same sample again to the new ingester and get the same result
req, _ = mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
_, err = ing2.Push(ctx, req)
require.NoError(t, err)
response, err = ing2.Query(ctx, request)
require.NoError(t, err)
assert.Equal(t, expectedResponse, response)

// Assert the data is in the expected location of dir2
files, err := ioutil.ReadDir(dir2)
require.NoError(t, err)
require.Equal(t, 1, len(files))
require.Equal(t, "1", files[0].Name())
})
}
cfg2.LifecyclerConfig.RingConfig.KVStore.Mock = cfg1.LifecyclerConfig.RingConfig.KVStore.Mock
cfg2.LifecyclerConfig.ID = "ingester2"
cfg2.LifecyclerConfig.Addr = "ingester2"
cfg2.LifecyclerConfig.JoinAfter = 100 * time.Second
ing2, err := New(cfg2, defaultClientTestConfig(), limits, nil, nil)
require.NoError(t, err)

// Let ing2 send blocks/wal to ing1
ing1.cfg.ingesterClientFactory = func(addr string, _ client.Config) (client.HealthAndIngesterClient, error) {
return ingesterClientAdapater{
ingester: ing2,
}, nil
}

// Now stop the first ingester, and wait for the second ingester to become ACTIVE.
ing1.Shutdown()
test.Poll(t, 10*time.Second, ring.ACTIVE, func() interface{} {
return ing2.lifecycler.GetState()
})

// And check the second ingester has the sample
matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "foo")
require.NoError(t, err)

request, err := client.ToQueryRequest(model.TimeFromUnix(0), model.TimeFromUnix(200), []*labels.Matcher{matcher})
require.NoError(t, err)

response, err := ing2.Query(ctx, request)
require.NoError(t, err)
assert.Equal(t, expectedResponse, response)

// Check we can send the same sample again to the new ingester and get the same result
req, _ = mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000)
_, err = ing2.Push(ctx, req)
require.NoError(t, err)
response, err = ing2.Query(ctx, request)
require.NoError(t, err)
assert.Equal(t, expectedResponse, response)

// Assert the data is in the expected location of dir2
files, err := ioutil.ReadDir(dir2)
require.NoError(t, err)
require.Equal(t, 1, len(files))
require.Equal(t, "1", files[0].Name())
}
Loading