diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 31c7c1e9e2a..1dda52233ec 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -3,6 +3,7 @@ package ingester import ( "fmt" "net/http" + "sync" "time" "github.com/cortexproject/cortex/pkg/ingester/client" @@ -32,6 +33,13 @@ const ( type TSDBState struct { dbs map[string]*tsdb.DB // tsdb sharded by userID bucket objstore.Bucket + + // Keeps count of in-flight requests + inflightWriteReqs sync.WaitGroup + + // Used to run only once operations at shutdown, during the blocks/wal + // transferring to a joining ingester + transferOnce sync.Once } // NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage @@ -84,6 +92,24 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien return nil, err } + // Ensure the ingester shutdown procedure hasn't started + i.userStatesMtx.RLock() + + if i.stopped { + i.userStatesMtx.RUnlock() + return nil, fmt.Errorf("ingester stopping") + } + + // Keep track of in-flight requests, in order to safely start blocks transfer + // (at shutdown) only once all in-flight write requests have completed. + // It's important to increase the number of in-flight requests within the lock + // (even if sync.WaitGroup is thread-safe), otherwise there's a race condition + // with the TSDB transfer, which - after the stopped flag is set to true - waits + // until all in-flight requests to reach zero. + i.TSDBState.inflightWriteReqs.Add(1) + i.userStatesMtx.RUnlock() + defer i.TSDBState.inflightWriteReqs.Done() + // Keep track of some stats which are tracked only if the samples will be // successfully committed succeededSamplesCount := 0 @@ -96,10 +122,6 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien lset := cortex_tsdb.FromLabelAdaptersToLabels(ts.Labels) for _, s := range ts.Samples { - if i.stopped { - return nil, fmt.Errorf("ingester stopping") - } - _, err := app.Add(lset, s.TimestampMs, s.Value) if err == nil { succeededSamplesCount++ @@ -119,7 +141,7 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien // The error looks an issue on our side, so we should rollback if rollbackErr := app.Rollback(); rollbackErr != nil { - level.Warn(util.Logger).Log("failed to rollback on error", "userID", userID, "err", rollbackErr) + level.Warn(util.Logger).Log("msg", "failed to rollback on error", "userID", userID, "err", rollbackErr) } return nil, err @@ -397,3 +419,36 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*tsdb.DB, error) return db, nil } + +func (i *Ingester) closeAllTSDB() { + i.userStatesMtx.Lock() + + wg := &sync.WaitGroup{} + wg.Add(len(i.TSDBState.dbs)) + + // Concurrently close all users TSDB + for userID, db := range i.TSDBState.dbs { + userID := userID + + go func(db *tsdb.DB) { + defer wg.Done() + + if err := db.Close(); err != nil { + level.Warn(util.Logger).Log("msg", "unable to close TSDB", "err", err, "user", userID) + return + } + + // Now that the TSDB has been closed, we should remove it from the + // set of open ones. This lock acquisition doesn't deadlock with the + // outer one, because the outer one is released as soon as all go + // routines are started. + i.userStatesMtx.Lock() + delete(i.TSDBState.dbs, userID) + i.userStatesMtx.Unlock() + }(db) + } + + // Wait until all Close() completed + i.userStatesMtx.Unlock() + wg.Wait() +} diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index ceba2434e66..7595d5d3450 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -1,6 +1,7 @@ package ingester import ( + "errors" "io" "io/ioutil" "math" @@ -395,94 +396,116 @@ func TestIngesterFlush(t *testing.T) { } func TestV2IngesterTransfer(t *testing.T) { - limits, err := validation.NewOverrides(defaultLimitsTestConfig()) - require.NoError(t, err) - - dir1, err := ioutil.TempDir("", "tsdb") - require.NoError(t, err) - dir2, err := ioutil.TempDir("", "tsdb") - require.NoError(t, err) - require.NoError(t, os.Remove(dir2)) // remove the destination dir so there isn't a move conflict - - // Start the first ingester, and get it into ACTIVE state. - cfg1 := defaultIngesterTestConfig() - cfg1.TSDBEnabled = true - cfg1.TSDBConfig.Dir = dir1 - cfg1.TSDBConfig.S3 = s3.Config{ - Endpoint: "dummy", - BucketName: "dummy", - SecretAccessKey: "dummy", - AccessKeyID: "dummy", + scenarios := map[string]struct { + failedTransfers int + }{ + "transfer succeeded at first attempt": { + failedTransfers: 0, + }, + "transfer failed at first attempt, then succeeded": { + failedTransfers: 1, + }, } - cfg1.LifecyclerConfig.ID = "ingester1" - cfg1.LifecyclerConfig.Addr = "ingester1" - cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second - cfg1.MaxTransferRetries = 10 - ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil, nil) - require.NoError(t, err) - - test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { - return ing1.lifecycler.GetState() - }) - - // Now write a sample to this ingester - req, expectedResponse := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000) - ctx := user.InjectOrgID(context.Background(), userID) - _, err = ing1.Push(ctx, req) - require.NoError(t, err) - // Start a second ingester, but let it go into PENDING - cfg2 := defaultIngesterTestConfig() - cfg2.TSDBEnabled = true - cfg2.TSDBConfig.Dir = dir2 - cfg2.TSDBConfig.S3 = s3.Config{ - Endpoint: "dummy", - BucketName: "dummy", - SecretAccessKey: "dummy", - AccessKeyID: "dummy", + // We run the same under different scenarios + for name, scenario := range scenarios { + t.Run(name, func(t *testing.T) { + limits, err := validation.NewOverrides(defaultLimitsTestConfig()) + require.NoError(t, err) + + dir1, err := ioutil.TempDir("", "tsdb") + require.NoError(t, err) + dir2, err := ioutil.TempDir("", "tsdb") + require.NoError(t, err) + require.NoError(t, os.Remove(dir2)) // remove the destination dir so there isn't a move conflict + + // Start the first ingester, and get it into ACTIVE state. + cfg1 := defaultIngesterTestConfig() + cfg1.TSDBEnabled = true + cfg1.TSDBConfig.Dir = dir1 + cfg1.TSDBConfig.S3 = s3.Config{ + Endpoint: "dummy", + BucketName: "dummy", + SecretAccessKey: "dummy", + AccessKeyID: "dummy", + } + cfg1.LifecyclerConfig.ID = "ingester1" + cfg1.LifecyclerConfig.Addr = "ingester1" + cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second + cfg1.MaxTransferRetries = 10 + ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil, nil) + require.NoError(t, err) + + test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { + return ing1.lifecycler.GetState() + }) + + // Now write a sample to this ingester + req, expectedResponse := mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000) + ctx := user.InjectOrgID(context.Background(), userID) + _, err = ing1.Push(ctx, req) + require.NoError(t, err) + + // Start a second ingester, but let it go into PENDING + cfg2 := defaultIngesterTestConfig() + cfg2.TSDBEnabled = true + cfg2.TSDBConfig.Dir = dir2 + cfg2.TSDBConfig.S3 = s3.Config{ + Endpoint: "dummy", + BucketName: "dummy", + SecretAccessKey: "dummy", + AccessKeyID: "dummy", + } + cfg2.LifecyclerConfig.RingConfig.KVStore.Mock = cfg1.LifecyclerConfig.RingConfig.KVStore.Mock + cfg2.LifecyclerConfig.ID = "ingester2" + cfg2.LifecyclerConfig.Addr = "ingester2" + cfg2.LifecyclerConfig.JoinAfter = 100 * time.Second + ing2, err := New(cfg2, defaultClientTestConfig(), limits, nil, nil) + require.NoError(t, err) + + // Let ing1 send blocks/wal to ing2 + ingesterClientFactoryCount := 0 + + ing1.cfg.ingesterClientFactory = func(addr string, _ client.Config) (client.HealthAndIngesterClient, error) { + if ingesterClientFactoryCount++; ingesterClientFactoryCount <= scenario.failedTransfers { + return nil, errors.New("mocked error simulating the case the leaving ingester is unable to connect to the joining ingester") + } + + return ingesterClientAdapater{ + ingester: ing2, + }, nil + } + + // Now stop the first ingester, and wait for the second ingester to become ACTIVE. + ing1.Shutdown() + test.Poll(t, 10*time.Second, ring.ACTIVE, func() interface{} { + return ing2.lifecycler.GetState() + }) + + // And check the second ingester has the sample + matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "foo") + require.NoError(t, err) + + request, err := client.ToQueryRequest(model.TimeFromUnix(0), model.TimeFromUnix(200), []*labels.Matcher{matcher}) + require.NoError(t, err) + + response, err := ing2.Query(ctx, request) + require.NoError(t, err) + assert.Equal(t, expectedResponse, response) + + // Check we can send the same sample again to the new ingester and get the same result + req, _ = mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000) + _, err = ing2.Push(ctx, req) + require.NoError(t, err) + response, err = ing2.Query(ctx, request) + require.NoError(t, err) + assert.Equal(t, expectedResponse, response) + + // Assert the data is in the expected location of dir2 + files, err := ioutil.ReadDir(dir2) + require.NoError(t, err) + require.Equal(t, 1, len(files)) + require.Equal(t, "1", files[0].Name()) + }) } - cfg2.LifecyclerConfig.RingConfig.KVStore.Mock = cfg1.LifecyclerConfig.RingConfig.KVStore.Mock - cfg2.LifecyclerConfig.ID = "ingester2" - cfg2.LifecyclerConfig.Addr = "ingester2" - cfg2.LifecyclerConfig.JoinAfter = 100 * time.Second - ing2, err := New(cfg2, defaultClientTestConfig(), limits, nil, nil) - require.NoError(t, err) - - // Let ing2 send blocks/wal to ing1 - ing1.cfg.ingesterClientFactory = func(addr string, _ client.Config) (client.HealthAndIngesterClient, error) { - return ingesterClientAdapater{ - ingester: ing2, - }, nil - } - - // Now stop the first ingester, and wait for the second ingester to become ACTIVE. - ing1.Shutdown() - test.Poll(t, 10*time.Second, ring.ACTIVE, func() interface{} { - return ing2.lifecycler.GetState() - }) - - // And check the second ingester has the sample - matcher, err := labels.NewMatcher(labels.MatchEqual, model.MetricNameLabel, "foo") - require.NoError(t, err) - - request, err := client.ToQueryRequest(model.TimeFromUnix(0), model.TimeFromUnix(200), []*labels.Matcher{matcher}) - require.NoError(t, err) - - response, err := ing2.Query(ctx, request) - require.NoError(t, err) - assert.Equal(t, expectedResponse, response) - - // Check we can send the same sample again to the new ingester and get the same result - req, _ = mockWriteRequest(labels.Labels{{Name: labels.MetricName, Value: "foo"}}, 456, 123000) - _, err = ing2.Push(ctx, req) - require.NoError(t, err) - response, err = ing2.Query(ctx, request) - require.NoError(t, err) - assert.Equal(t, expectedResponse, response) - - // Assert the data is in the expected location of dir2 - files, err := ioutil.ReadDir(dir2) - require.NoError(t, err) - require.Equal(t, 1, len(files)) - require.Equal(t, "1", files[0].Name()) } diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index c14f397a42f..7431862559a 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -16,6 +16,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/thanos/pkg/shipper" "github.com/cortexproject/cortex/pkg/chunk/encoding" @@ -50,14 +51,10 @@ var ( Name: "cortex_ingester_sent_bytes_total", Help: "The total number of bytes sent by this ingester whilst leaving", }) - - once *sync.Once - errTransferNoPendingIngesters = errors.New("no pending ingesters") ) func init() { - once = &sync.Once{} prometheus.MustRegister(sentChunks) prometheus.MustRegister(receivedChunks) prometheus.MustRegister(sentFiles) @@ -415,6 +412,7 @@ func (i *Ingester) TransferOut(ctx context.Context) error { for backoff.Ongoing() { err = i.transferOut(ctx) if err == nil { + level.Info(util.Logger).Log("msg", "transfer successfully completed") return nil } @@ -502,32 +500,64 @@ func (i *Ingester) transferOut(ctx context.Context) error { } func (i *Ingester) v2TransferOut(ctx context.Context) error { - if len(i.TSDBState.dbs) == 0 { - level.Info(util.Logger).Log("msg", "nothing to transfer") + // Skip TSDB transfer if there are no DBs + i.userStatesMtx.RLock() + skip := len(i.TSDBState.dbs) == 0 + i.userStatesMtx.RUnlock() + + if skip { + level.Info(util.Logger).Log("msg", "the ingester has nothing to transfer") return nil } - // Close all user databases - wg := &sync.WaitGroup{} - // Only perform a shutdown once - once.Do(func() { + // This transfer function may be called multiple times in case of error, + // until the max number of retries is reached. For this reason, we run + // some initialization only once. + i.TSDBState.transferOnce.Do(func() { + // In order to transfer TSDB WAL without closing the TSDB itself - which is a + // pre-requisite to continue serving read requests while transferring - we need + // to make sure no more series will be written to the TSDB. For this reason, we + // wait until all in-flight write requests have been completed. No new write + // requests will be accepted because the "stopped" flag has already been set. + level.Info(util.Logger).Log("msg", "waiting for in-flight write requests to complete") + + // Do not use the parent context cause we don't want to interrupt while waiting + // for in-flight requests to complete if the parent context is cancelled, given + // this logic run only once. + waitCtx, waitCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer waitCancel() + + if err := util.WaitGroup(waitCtx, &i.TSDBState.inflightWriteReqs); err != nil { + level.Warn(util.Logger).Log("msg", "timeout expired while waiting in-flight write requests to complete, transfer will continue anyway", "err", err) + } + + // Before beginning transfer, we need to make sure no WAL compaction will occur. + // If there's an on-going compaction, the DisableCompactions() will wait until + // completed. + level.Info(util.Logger).Log("msg", "disabling compaction on all TSDBs") + + i.userStatesMtx.RLock() + wg := &sync.WaitGroup{} wg.Add(len(i.TSDBState.dbs)) + for _, db := range i.TSDBState.dbs { - go func(closer io.Closer) { + go func(db *tsdb.DB) { defer wg.Done() - if err := closer.Close(); err != nil { - level.Warn(util.Logger).Log("msg", "failed to close db", "err", err) - } + db.DisableCompactions() }(db) } + + i.userStatesMtx.RUnlock() + wg.Wait() }) + // Look for a joining ingester to transfer blocks and WAL to targetIngester, err := i.findTargetIngester(ctx) if err != nil { - return fmt.Errorf("cannot find ingester to transfer blocks to: %w", err) + return errors.Wrap(err, "cannot find ingester to transfer blocks to") } - level.Info(util.Logger).Log("msg", "sending blocks", "to_ingester", targetIngester.Addr) + level.Info(util.Logger).Log("msg", "begin transferring TSDB blocks and WAL to joining ingester", "to_ingester", targetIngester.Addr) c, err := i.cfg.ingesterClientFactory(targetIngester.Addr, i.clientConfig) if err != nil { return err @@ -537,11 +567,9 @@ func (i *Ingester) v2TransferOut(ctx context.Context) error { ctx = user.InjectOrgID(ctx, "-1") stream, err := c.TransferTSDB(ctx) if err != nil { - return errors.Wrap(err, "TransferTSDB") + return errors.Wrap(err, "TransferTSDB() has failed") } - wg.Wait() // wait for all databases to have closed - // Grab a list of all blocks that need to be shipped blocks, err := unshippedBlocks(i.cfg.TSDBConfig.Dir) if err != nil { @@ -559,6 +587,11 @@ func (i *Ingester) v2TransferOut(ctx context.Context) error { return errors.Wrap(err, "CloseAndRecv") } + // The transfer out has been successfully completed. Now we should close + // all open TSDBs: the Close() will wait until all on-going read operations + // will be completed. + i.closeAllTSDB() + return nil } @@ -620,7 +653,7 @@ func unshippedBlocks(dir string) (map[string][]string, error) { } func transferUser(ctx context.Context, stream client.Ingester_TransferTSDBClient, dir, ingesterID, userID string, blocks []string) { - level.Info(util.Logger).Log("msg", "xfer user", "user", userID) + level.Info(util.Logger).Log("msg", "transferring user blocks", "user", userID) // Transfer all blocks for _, blk := range blocks { err := filepath.Walk(filepath.Join(dir, userID, blk), func(path string, info os.FileInfo, err error) error { @@ -659,6 +692,7 @@ func transferUser(ctx context.Context, stream client.Ingester_TransferTSDBClient } // Transfer WAL + level.Info(util.Logger).Log("msg", "transferring user WAL", "user", userID) err := filepath.Walk(filepath.Join(dir, userID, "wal"), func(path string, info os.FileInfo, err error) error { if err != nil { return nil @@ -691,10 +725,10 @@ func transferUser(ctx context.Context, stream client.Ingester_TransferTSDBClient }) if err != nil { - level.Warn(util.Logger).Log("msg", "failed to transfer user wal", "err", err) + level.Warn(util.Logger).Log("msg", "failed to transfer user WAL", "err", err) } - level.Info(util.Logger).Log("msg", "xfer user complete", "user", userID) + level.Info(util.Logger).Log("msg", "user blocks and WAL transfer completed", "user", userID) } func batchSend(batch int, b []byte, stream client.Ingester_TransferTSDBClient, tsfile *client.TimeSeriesFile) error { diff --git a/pkg/util/sync.go b/pkg/util/sync.go new file mode 100644 index 00000000000..3263e477030 --- /dev/null +++ b/pkg/util/sync.go @@ -0,0 +1,26 @@ +package util + +import ( + "context" + "sync" +) + +// WaitGroup calls Wait() on a sync.WaitGroup and return once the Wait() completed +// or the context is cancelled or times out, whatever occurs first. Returns the +// specific context error if the context is cancelled or times out before Wait() +// completes. +func WaitGroup(ctx context.Context, wg *sync.WaitGroup) error { + c := make(chan struct{}) + + go func() { + defer close(c) + wg.Wait() + }() + + select { + case <-c: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/pkg/util/sync_test.go b/pkg/util/sync_test.go new file mode 100644 index 00000000000..83519438d07 --- /dev/null +++ b/pkg/util/sync_test.go @@ -0,0 +1,60 @@ +package util + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestWaitGroup(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + setup func(wg *sync.WaitGroup) (context.Context, context.CancelFunc) + expected error + }{ + "WaitGroup is done": { + setup: func(wg *sync.WaitGroup) (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), 100*time.Millisecond) + }, + expected: nil, + }, + "WaitGroup is not done and timeout expires": { + setup: func(wg *sync.WaitGroup) (context.Context, context.CancelFunc) { + wg.Add(1) + + return context.WithTimeout(context.Background(), 100*time.Millisecond) + }, + expected: context.DeadlineExceeded, + }, + "WaitGroup is not done and context is cancelled before timeout expires": { + setup: func(wg *sync.WaitGroup) (context.Context, context.CancelFunc) { + wg.Add(1) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + + go func() { + time.Sleep(100 * time.Millisecond) + cancel() + }() + + return ctx, cancel + }, + expected: context.Canceled, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + wg := sync.WaitGroup{} + ctx, cancel := testData.setup(&wg) + defer cancel() + + success := WaitGroup(ctx, &wg) + assert.Equal(t, testData.expected, success) + }) + } +}