diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c3596e4d3e..1885d6b9d4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -50,6 +50,7 @@ * [CHANGE] Available command-line flags are printed to stdout, and only when requested via `-help`. Using invalid flag no longer causes printing of all available flags. #2691 * [CHANGE] Experimental Memberlist ring: randomize gossip node names to avoid conflicts when running multiple clients on the same host, or reusing host names (eg. pods in statefulset). Node name randomization can be disabled by using `-memberlist.randomize-node-name=false`. #2715 * [CHANGE] Memberlist KV client is no longer considered experimental. #2725 +* [CHANGE] Change target flag for purger from `data-purger` to `purger` and make delete request cancellation duration configurable. #2760 * [FEATURE] TLS config options added for GRPC clients in Querier (Query-frontend client & Ingester client), Ruler, Store Gateway, as well as HTTP client in Config store client. #2502 * [FEATURE] The flag `-frontend.max-cache-freshness` is now supported within the limits overrides, to specify per-tenant max cache freshness values. The corresponding YAML config parameter has been changed from `results_cache.max_freshness` to `limits_config.max_cache_freshness`. The legacy YAML config parameter (`results_cache.max_freshness`) will continue to be supported till Cortex release `v1.4.0`. #2609 * [FEATURE] Experimental gRPC Store: Added support to 3rd parties index and chunk stores using gRPC client/server plugin mechanism. #2220 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index e55c8774bc1..e360b639c22 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3296,4 +3296,10 @@ The `purger_config` configures the purger which takes care of delete requests # Name of the object store to use for storing delete plans # CLI flag: -purger.object-store-type [object_store_type: | default = ""] + +# Allow cancellation of delete request until duration after they are created. +# Data would be deleted only after delete requests have been older than this +# duration. Ideally this should be set to at least 24h. +# CLI flag: -purger.delete-request-cancel-period +[delete_request_cancel_period: | default = 24h] ``` diff --git a/pkg/api/api.go b/pkg/api/api.go index 2b2ce7de646..b78a8a3a099 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -7,6 +7,7 @@ import ( "net/http" "regexp" "strings" + "time" "github.com/opentracing-contrib/go-stdlib/nethttp" "github.com/opentracing/opentracing-go" @@ -180,8 +181,8 @@ func (a *API) RegisterIngester(i *ingester.Ingester, pushConfig distributor.Conf // RegisterPurger registers the endpoints associated with the Purger/DeleteStore. They do not exactly // match the Prometheus API but mirror it closely enough to justify their routing under the Prometheus // component/ -func (a *API) RegisterPurger(store *purger.DeleteStore) { - deleteRequestHandler := purger.NewDeleteRequestHandler(store, prometheus.DefaultRegisterer) +func (a *API) RegisterPurger(store *purger.DeleteStore, deleteRequestCancelPeriod time.Duration) { + deleteRequestHandler := purger.NewDeleteRequestHandler(store, deleteRequestCancelPeriod, prometheus.DefaultRegisterer) a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/admin/tsdb/delete_series", http.HandlerFunc(deleteRequestHandler.AddDeleteRequestHandler), true, "PUT", "POST") a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/admin/tsdb/delete_series", http.HandlerFunc(deleteRequestHandler.GetAllDeleteRequestsHandler), true, "GET") diff --git a/pkg/chunk/purger/purger.go b/pkg/chunk/purger/purger.go index b3f282e5c61..7a771e0aa34 100644 --- a/pkg/chunk/purger/purger.go +++ b/pkg/chunk/purger/purger.go @@ -26,10 +26,9 @@ import ( ) const ( - millisecondPerDay = int64(24 * time.Hour / time.Millisecond) - deleteRequestCancellationDeadline = 24 * time.Hour - statusSuccess = "success" - statusFail = "fail" + millisecondPerDay = int64(24 * time.Hour / time.Millisecond) + statusSuccess = "success" + statusFail = "fail" ) type purgerMetrics struct { @@ -83,11 +82,12 @@ type deleteRequestWithLogger struct { logger log.Logger // logger is initialized with userID and requestID to add context to every log generated using this } -// Config holds config for DataPurger +// Config holds config for Purger type Config struct { - Enable bool `yaml:"enable"` - NumWorkers int `yaml:"num_workers"` - ObjectStoreType string `yaml:"object_store_type"` + Enable bool `yaml:"enable"` + NumWorkers int `yaml:"num_workers"` + ObjectStoreType string `yaml:"object_store_type"` + DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"` } // RegisterFlags registers CLI flags for Config @@ -95,6 +95,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.Enable, "purger.enable", false, "Enable purger to allow deletion of series. Be aware that Delete series feature is still experimental") f.IntVar(&cfg.NumWorkers, "purger.num-workers", 2, "Number of workers executing delete plans in parallel") f.StringVar(&cfg.ObjectStoreType, "purger.object-store-type", "", "Name of the object store to use for storing delete plans") + f.DurationVar(&cfg.DeleteRequestCancelPeriod, "purger.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.") } type workerJob struct { @@ -104,8 +105,8 @@ type workerJob struct { logger log.Logger } -// DataPurger does the purging of data which is requested to be deleted -type DataPurger struct { +// Purger does the purging of data which is requested to be deleted +type Purger struct { services.Service cfg Config @@ -135,11 +136,11 @@ type DataPurger struct { wg sync.WaitGroup } -// NewDataPurger creates a new DataPurger -func NewDataPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store, storageClient chunk.ObjectClient, registerer prometheus.Registerer) (*DataPurger, error) { +// NewPurger creates a new Purger +func NewPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store, storageClient chunk.ObjectClient, registerer prometheus.Registerer) (*Purger, error) { util.WarnExperimentalUse("Delete series API") - dataPurger := DataPurger{ + purger := Purger{ cfg: cfg, deleteStore: deleteStore, chunkStore: chunkStore, @@ -153,34 +154,34 @@ func NewDataPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store, pendingPlansCount: map[string]int{}, } - dataPurger.Service = services.NewBasicService(dataPurger.init, dataPurger.loop, dataPurger.stop) - return &dataPurger, nil + purger.Service = services.NewBasicService(purger.init, purger.loop, purger.stop) + return &purger, nil } // init starts workers, scheduler and then loads in process delete requests -func (dp *DataPurger) init(ctx context.Context) error { - for i := 0; i < dp.cfg.NumWorkers; i++ { - dp.wg.Add(1) - go dp.worker() +func (p *Purger) init(ctx context.Context) error { + for i := 0; i < p.cfg.NumWorkers; i++ { + p.wg.Add(1) + go p.worker() } - dp.wg.Add(1) - go dp.jobScheduler(ctx) + p.wg.Add(1) + go p.jobScheduler(ctx) - return dp.loadInprocessDeleteRequests() + return p.loadInprocessDeleteRequests() } -func (dp *DataPurger) loop(ctx context.Context) error { +func (p *Purger) loop(ctx context.Context) error { loadRequests := func() { status := statusSuccess - err := dp.pullDeleteRequestsToPlanDeletes() + err := p.pullDeleteRequestsToPlanDeletes() if err != nil { status = statusFail level.Error(util.Logger).Log("msg", "error pulling delete requests for building plans", "err", err) } - dp.metrics.loadPendingRequestsAttempsTotal.WithLabelValues(status).Inc() + p.metrics.loadPendingRequestsAttempsTotal.WithLabelValues(status).Inc() } // load requests on startup instead of waiting for first ticker @@ -193,7 +194,7 @@ func (dp *DataPurger) loop(ctx context.Context) error { select { case <-loadRequestsTicker.C: loadRequests() - case <-dp.pullNewRequestsChan: + case <-p.pullNewRequestsChan: loadRequests() case <-ctx.Done(): return nil @@ -202,102 +203,102 @@ func (dp *DataPurger) loop(ctx context.Context) error { } // Stop waits until all background tasks stop. -func (dp *DataPurger) stop(_ error) error { - dp.wg.Wait() +func (p *Purger) stop(_ error) error { + p.wg.Wait() return nil } -func (dp *DataPurger) workerJobCleanup(job workerJob) { - err := dp.removeDeletePlan(context.Background(), job.userID, job.deleteRequestID, job.planNo) +func (p *Purger) workerJobCleanup(job workerJob) { + err := p.removeDeletePlan(context.Background(), job.userID, job.deleteRequestID, job.planNo) if err != nil { level.Error(job.logger).Log("msg", "error removing delete plan", "plan_no", job.planNo, "err", err) return } - dp.pendingPlansCountMtx.Lock() - dp.pendingPlansCount[job.deleteRequestID]-- + p.pendingPlansCountMtx.Lock() + p.pendingPlansCount[job.deleteRequestID]-- - if dp.pendingPlansCount[job.deleteRequestID] == 0 { + if p.pendingPlansCount[job.deleteRequestID] == 0 { level.Info(job.logger).Log("msg", "finished execution of all plans, cleaning up and updating status of request") - err := dp.deleteStore.UpdateStatus(context.Background(), job.userID, job.deleteRequestID, StatusProcessed) + err := p.deleteStore.UpdateStatus(context.Background(), job.userID, job.deleteRequestID, StatusProcessed) if err != nil { level.Error(job.logger).Log("msg", "error updating delete request status to process", "err", err) } - dp.metrics.deleteRequestsProcessedTotal.WithLabelValues(job.userID).Inc() - delete(dp.pendingPlansCount, job.deleteRequestID) - dp.pendingPlansCountMtx.Unlock() + p.metrics.deleteRequestsProcessedTotal.WithLabelValues(job.userID).Inc() + delete(p.pendingPlansCount, job.deleteRequestID) + p.pendingPlansCountMtx.Unlock() - dp.inProcessRequestIDsMtx.Lock() - delete(dp.inProcessRequests, job.userID) - dp.inProcessRequestIDsMtx.Unlock() + p.inProcessRequestIDsMtx.Lock() + delete(p.inProcessRequests, job.userID) + p.inProcessRequestIDsMtx.Unlock() // request loading of more delete request if // - user has more pending requests and // - we do not have a pending request to load more requests - dp.usersWithPendingRequestsMtx.Lock() - defer dp.usersWithPendingRequestsMtx.Unlock() - if _, ok := dp.usersWithPendingRequests[job.userID]; ok { - delete(dp.usersWithPendingRequests, job.userID) + p.usersWithPendingRequestsMtx.Lock() + defer p.usersWithPendingRequestsMtx.Unlock() + if _, ok := p.usersWithPendingRequests[job.userID]; ok { + delete(p.usersWithPendingRequests, job.userID) select { - case dp.pullNewRequestsChan <- struct{}{}: + case p.pullNewRequestsChan <- struct{}{}: // sent default: // already sent } } } else { - dp.pendingPlansCountMtx.Unlock() + p.pendingPlansCountMtx.Unlock() } } // we send all the delete plans to workerJobChan -func (dp *DataPurger) jobScheduler(ctx context.Context) { - defer dp.wg.Done() +func (p *Purger) jobScheduler(ctx context.Context) { + defer p.wg.Done() for { select { - case req := <-dp.executePlansChan: + case req := <-p.executePlansChan: numPlans := numPlans(req.StartTime, req.EndTime) level.Info(req.logger).Log("msg", "sending jobs to workers for purging data", "num_jobs", numPlans) - dp.pendingPlansCountMtx.Lock() - dp.pendingPlansCount[req.RequestID] = numPlans - dp.pendingPlansCountMtx.Unlock() + p.pendingPlansCountMtx.Lock() + p.pendingPlansCount[req.RequestID] = numPlans + p.pendingPlansCountMtx.Unlock() for i := 0; i < numPlans; i++ { - dp.workerJobChan <- workerJob{planNo: i, userID: req.UserID, + p.workerJobChan <- workerJob{planNo: i, userID: req.UserID, deleteRequestID: req.RequestID, logger: req.logger} } case <-ctx.Done(): - close(dp.workerJobChan) + close(p.workerJobChan) return } } } -func (dp *DataPurger) worker() { - defer dp.wg.Done() +func (p *Purger) worker() { + defer p.wg.Done() - for job := range dp.workerJobChan { - err := dp.executePlan(job.userID, job.deleteRequestID, job.planNo, job.logger) + for job := range p.workerJobChan { + err := p.executePlan(job.userID, job.deleteRequestID, job.planNo, job.logger) if err != nil { - dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(job.userID).Inc() + p.metrics.deleteRequestsProcessingFailures.WithLabelValues(job.userID).Inc() level.Error(job.logger).Log("msg", "error executing delete plan", "plan_no", job.planNo, "err", err) continue } - dp.workerJobCleanup(job) + p.workerJobCleanup(job) } } -func (dp *DataPurger) executePlan(userID, requestID string, planNo int, logger log.Logger) error { +func (p *Purger) executePlan(userID, requestID string, planNo int, logger log.Logger) error { logger = log.With(logger, "plan_no", planNo) - plan, err := dp.getDeletePlan(context.Background(), userID, requestID, planNo) + plan, err := p.getDeletePlan(context.Background(), userID, requestID, planNo) if err != nil { if err == chunk.ErrStorageObjectNotFound { level.Info(logger).Log("msg", "plan not found, must have been executed already") @@ -328,7 +329,7 @@ func (dp *DataPurger) executePlan(userID, requestID string, planNo int, logger l } } - err = dp.chunkStore.DeleteChunk(ctx, chunkRef.From, chunkRef.Through, chunkRef.UserID, + err = p.chunkStore.DeleteChunk(ctx, chunkRef.From, chunkRef.Through, chunkRef.UserID, chunkDetails.ID, client.FromLabelAdaptersToLabels(plan.ChunksGroup[i].Labels), partiallyDeletedInterval) if err != nil { if isMissingChunkErr(err) { @@ -343,7 +344,7 @@ func (dp *DataPurger) executePlan(userID, requestID string, planNo int, logger l level.Debug(logger).Log("msg", "deleting series", "labels", plan.ChunksGroup[i].Labels) // this is mostly required to clean up series ids from series store - err := dp.chunkStore.DeleteSeriesIDs(ctx, model.Time(plan.PlanInterval.StartTimestampMs), model.Time(plan.PlanInterval.EndTimestampMs), + err := p.chunkStore.DeleteSeriesIDs(ctx, model.Time(plan.PlanInterval.StartTimestampMs), model.Time(plan.PlanInterval.EndTimestampMs), userID, client.FromLabelAdaptersToLabels(plan.ChunksGroup[i].Labels)) if err != nil { return err @@ -356,8 +357,8 @@ func (dp *DataPurger) executePlan(userID, requestID string, planNo int, logger l } // we need to load all in process delete requests on startup to finish them first -func (dp *DataPurger) loadInprocessDeleteRequests() error { - requestsWithBuildingPlanStatus, err := dp.deleteStore.GetDeleteRequestsByStatus(context.Background(), StatusBuildingPlan) +func (p *Purger) loadInprocessDeleteRequests() error { + requestsWithBuildingPlanStatus, err := p.deleteStore.GetDeleteRequestsByStatus(context.Background(), StatusBuildingPlan) if err != nil { return err } @@ -367,19 +368,19 @@ func (dp *DataPurger) loadInprocessDeleteRequests() error { level.Info(req.logger).Log("msg", "loaded in process delete requests with status building plan") - dp.inProcessRequests[deleteRequest.UserID] = deleteRequest - err := dp.buildDeletePlan(req) + p.inProcessRequests[deleteRequest.UserID] = deleteRequest + err := p.buildDeletePlan(req) if err != nil { - dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(deleteRequest.UserID).Inc() + p.metrics.deleteRequestsProcessingFailures.WithLabelValues(deleteRequest.UserID).Inc() level.Error(req.logger).Log("msg", "error building delete plan", "err", err) continue } level.Info(req.logger).Log("msg", "sending delete request for execution") - dp.executePlansChan <- req + p.executePlansChan <- req } - requestsWithDeletingStatus, err := dp.deleteStore.GetDeleteRequestsByStatus(context.Background(), StatusDeleting) + requestsWithDeletingStatus, err := p.deleteStore.GetDeleteRequestsByStatus(context.Background(), StatusDeleting) if err != nil { return err } @@ -388,8 +389,8 @@ func (dp *DataPurger) loadInprocessDeleteRequests() error { req := makeDeleteRequestWithLogger(deleteRequest, util.Logger) level.Info(req.logger).Log("msg", "loaded in process delete requests with status deleting") - dp.inProcessRequests[deleteRequest.UserID] = deleteRequest - dp.executePlansChan <- req + p.inProcessRequests[deleteRequest.UserID] = deleteRequest + p.executePlansChan <- req } return nil @@ -397,22 +398,22 @@ func (dp *DataPurger) loadInprocessDeleteRequests() error { // pullDeleteRequestsToPlanDeletes pulls delete requests which do not have their delete plans built yet and sends them for building delete plans // after pulling delete requests for building plans, it updates its status to StatusBuildingPlan status to avoid picking this up again next time -func (dp *DataPurger) pullDeleteRequestsToPlanDeletes() error { - deleteRequests, err := dp.deleteStore.GetDeleteRequestsByStatus(context.Background(), StatusReceived) +func (p *Purger) pullDeleteRequestsToPlanDeletes() error { + deleteRequests, err := p.deleteStore.GetDeleteRequestsByStatus(context.Background(), StatusReceived) if err != nil { return err } - dp.inProcessRequestIDsMtx.RLock() - pendingDeleteRequestsCount := len(dp.inProcessRequests) - dp.inProcessRequestIDsMtx.RUnlock() + p.inProcessRequestIDsMtx.RLock() + pendingDeleteRequestsCount := len(p.inProcessRequests) + p.inProcessRequestIDsMtx.RUnlock() now := model.Now() oldestPendingRequestCreatedAt := now // requests which are still being processed are also considered pending if pendingDeleteRequestsCount != 0 { - oldestInProcessRequest := dp.getOldestInProcessRequest() + oldestInProcessRequest := p.getOldestInProcessRequest() if oldestInProcessRequest != nil { oldestPendingRequestCreatedAt = oldestInProcessRequest.CreatedAt } @@ -420,7 +421,7 @@ func (dp *DataPurger) pullDeleteRequestsToPlanDeletes() error { for _, deleteRequest := range deleteRequests { // adding an extra minute here to avoid a race between cancellation of request and picking of the request for processing - if deleteRequest.CreatedAt.Add(deleteRequestCancellationDeadline).Add(time.Minute).After(model.Now()) { + if deleteRequest.CreatedAt.Add(p.cfg.DeleteRequestCancelPeriod).Add(time.Minute).After(model.Now()) { continue } @@ -429,14 +430,14 @@ func (dp *DataPurger) pullDeleteRequestsToPlanDeletes() error { oldestPendingRequestCreatedAt = deleteRequest.CreatedAt } - dp.inProcessRequestIDsMtx.RLock() - inprocessDeleteRequest, ok := dp.inProcessRequests[deleteRequest.UserID] - dp.inProcessRequestIDsMtx.RUnlock() + p.inProcessRequestIDsMtx.RLock() + inprocessDeleteRequest, ok := p.inProcessRequests[deleteRequest.UserID] + p.inProcessRequestIDsMtx.RUnlock() if ok { - dp.usersWithPendingRequestsMtx.Lock() - dp.usersWithPendingRequests[deleteRequest.UserID] = struct{}{} - dp.usersWithPendingRequestsMtx.Unlock() + p.usersWithPendingRequestsMtx.Lock() + p.usersWithPendingRequests[deleteRequest.UserID] = struct{}{} + p.usersWithPendingRequestsMtx.Unlock() level.Debug(util.Logger).Log("msg", "skipping delete request processing for now since another request from same user is already in process", "inprocess_request_id", inprocessDeleteRequest.RequestID, @@ -444,22 +445,22 @@ func (dp *DataPurger) pullDeleteRequestsToPlanDeletes() error { continue } - err = dp.deleteStore.UpdateStatus(context.Background(), deleteRequest.UserID, deleteRequest.RequestID, StatusBuildingPlan) + err = p.deleteStore.UpdateStatus(context.Background(), deleteRequest.UserID, deleteRequest.RequestID, StatusBuildingPlan) if err != nil { return err } - dp.inProcessRequestIDsMtx.Lock() - dp.inProcessRequests[deleteRequest.UserID] = deleteRequest - dp.inProcessRequestIDsMtx.Unlock() + p.inProcessRequestIDsMtx.Lock() + p.inProcessRequests[deleteRequest.UserID] = deleteRequest + p.inProcessRequestIDsMtx.Unlock() req := makeDeleteRequestWithLogger(deleteRequest, util.Logger) level.Info(req.logger).Log("msg", "building plan for a new delete request") - err := dp.buildDeletePlan(req) + err := p.buildDeletePlan(req) if err != nil { - dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(deleteRequest.UserID).Inc() + p.metrics.deleteRequestsProcessingFailures.WithLabelValues(deleteRequest.UserID).Inc() // We do not want to remove this delete request from inProcessRequests to make sure // we do not move multiple deleting requests in deletion process. @@ -469,11 +470,11 @@ func (dp *DataPurger) pullDeleteRequestsToPlanDeletes() error { } level.Info(req.logger).Log("msg", "sending delete request for execution") - dp.executePlansChan <- req + p.executePlansChan <- req } - dp.metrics.oldestPendingDeleteRequestAgeSeconds.Set(float64(now.Sub(oldestPendingRequestCreatedAt) / time.Second)) - dp.metrics.pendingDeleteRequestsCount.Set(float64(pendingDeleteRequestsCount)) + p.metrics.oldestPendingDeleteRequestAgeSeconds.Set(float64(now.Sub(oldestPendingRequestCreatedAt) / time.Second)) + p.metrics.pendingDeleteRequestsCount.Set(float64(pendingDeleteRequestsCount)) return nil } @@ -482,7 +483,7 @@ func (dp *DataPurger) pullDeleteRequestsToPlanDeletes() error { // A days plan will include chunk ids and labels of all the chunks which are supposed to be deleted. // Chunks are grouped together by labels to avoid storing labels repetitively. // After building delete plans it updates status of delete request to StatusDeleting and sends it for execution -func (dp *DataPurger) buildDeletePlan(req deleteRequestWithLogger) error { +func (p *Purger) buildDeletePlan(req deleteRequestWithLogger) error { ctx := context.Background() ctx = user.InjectOrgID(ctx, req.UserID) @@ -501,7 +502,7 @@ func (dp *DataPurger) buildDeletePlan(req deleteRequestWithLogger) error { return err } - chunks, err := dp.chunkStore.Get(ctx, req.UserID, planRange.Start, planRange.End, matchers...) + chunks, err := p.chunkStore.Get(ctx, req.UserID, planRange.Start, planRange.End, matchers...) if err != nil { return err } @@ -530,28 +531,28 @@ func (dp *DataPurger) buildDeletePlan(req deleteRequestWithLogger) error { plans[i] = pb } - err := dp.putDeletePlans(ctx, req.UserID, req.RequestID, plans) + err := p.putDeletePlans(ctx, req.UserID, req.RequestID, plans) if err != nil { return err } - err = dp.deleteStore.UpdateStatus(ctx, req.UserID, req.RequestID, StatusDeleting) + err = p.deleteStore.UpdateStatus(ctx, req.UserID, req.RequestID, StatusDeleting) if err != nil { return err } - dp.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(req.UserID).Add(float64(len(includedChunkIDs))) + p.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(req.UserID).Add(float64(len(includedChunkIDs))) level.Info(req.logger).Log("msg", "built delete plans", "num_plans", len(perDayTimeRange)) return nil } -func (dp *DataPurger) putDeletePlans(ctx context.Context, userID, requestID string, plans [][]byte) error { +func (p *Purger) putDeletePlans(ctx context.Context, userID, requestID string, plans [][]byte) error { for i, plan := range plans { objectKey := buildObjectKeyForPlan(userID, requestID, i) - err := dp.objectClient.PutObject(ctx, objectKey, bytes.NewReader(plan)) + err := p.objectClient.PutObject(ctx, objectKey, bytes.NewReader(plan)) if err != nil { return err } @@ -560,10 +561,10 @@ func (dp *DataPurger) putDeletePlans(ctx context.Context, userID, requestID stri return nil } -func (dp *DataPurger) getDeletePlan(ctx context.Context, userID, requestID string, planNo int) (*DeletePlan, error) { +func (p *Purger) getDeletePlan(ctx context.Context, userID, requestID string, planNo int) (*DeletePlan, error) { objectKey := buildObjectKeyForPlan(userID, requestID, planNo) - readCloser, err := dp.objectClient.GetObject(ctx, objectKey) + readCloser, err := p.objectClient.GetObject(ctx, objectKey) if err != nil { return nil, err } @@ -584,17 +585,17 @@ func (dp *DataPurger) getDeletePlan(ctx context.Context, userID, requestID strin return &plan, nil } -func (dp *DataPurger) removeDeletePlan(ctx context.Context, userID, requestID string, planNo int) error { +func (p *Purger) removeDeletePlan(ctx context.Context, userID, requestID string, planNo int) error { objectKey := buildObjectKeyForPlan(userID, requestID, planNo) - return dp.objectClient.DeleteObject(ctx, objectKey) + return p.objectClient.DeleteObject(ctx, objectKey) } -func (dp *DataPurger) getOldestInProcessRequest() *DeleteRequest { - dp.inProcessRequestIDsMtx.RLock() - defer dp.inProcessRequestIDsMtx.RUnlock() +func (p *Purger) getOldestInProcessRequest() *DeleteRequest { + p.inProcessRequestIDsMtx.RLock() + defer p.inProcessRequestIDsMtx.RUnlock() var oldestRequest *DeleteRequest - for _, request := range dp.inProcessRequests { + for _, request := range p.inProcessRequests { if oldestRequest == nil || request.CreatedAt.Before(oldestRequest.CreatedAt) { oldestRequest = &request } diff --git a/pkg/chunk/purger/purger_test.go b/pkg/chunk/purger/purger_test.go index 6acb3f998db..15f3ce3d721 100644 --- a/pkg/chunk/purger/purger_test.go +++ b/pkg/chunk/purger/purger_test.go @@ -52,7 +52,7 @@ func setupTestDeleteStore(t *testing.T) *DeleteStore { return deleteStore } -func setupStoresAndPurger(t *testing.T) (*DeleteStore, chunk.Store, chunk.ObjectClient, *DataPurger, *prometheus.Registry) { +func setupStoresAndPurger(t *testing.T) (*DeleteStore, chunk.Store, chunk.ObjectClient, *Purger, *prometheus.Registry) { registry := prometheus.NewRegistry() deleteStore := setupTestDeleteStore(t) @@ -66,10 +66,10 @@ func setupStoresAndPurger(t *testing.T) (*DeleteStore, chunk.Store, chunk.Object var cfg Config flagext.DefaultValues(&cfg) - dataPurger, err := NewDataPurger(cfg, deleteStore, chunkStore, storageClient, registry) + purger, err := NewPurger(cfg, deleteStore, chunkStore, storageClient, registry) require.NoError(t, err) - return deleteStore, chunkStore, storageClient, dataPurger, registry + return deleteStore, chunkStore, storageClient, purger, registry } func buildChunks(from, through model.Time, batchSize int) ([]chunk.Chunk, error) { @@ -173,13 +173,13 @@ var purgePlanTestCases = []struct { }, } -func TestDataPurger_BuildPlan(t *testing.T) { +func TestPurger_BuildPlan(t *testing.T) { for _, tc := range purgePlanTestCases { for batchSize := 1; batchSize <= 5; batchSize++ { t.Run(fmt.Sprintf("%s/batch-size=%d", tc.name, batchSize), func(t *testing.T) { - deleteStore, chunkStore, storageClient, dataPurger, _ := setupStoresAndPurger(t) + deleteStore, chunkStore, storageClient, purger, _ := setupStoresAndPurger(t) defer func() { - dataPurger.StopAsync() + purger.StopAsync() chunkStore.Stop() }() @@ -198,7 +198,7 @@ func TestDataPurger_BuildPlan(t *testing.T) { deleteRequest := deleteRequests[0] requestWithLogger := makeDeleteRequestWithLogger(deleteRequest, util.Logger) - err = dataPurger.buildDeletePlan(requestWithLogger) + err = purger.buildDeletePlan(requestWithLogger) require.NoError(t, err) planPath := fmt.Sprintf("%s:%s/", userID, deleteRequest.RequestID) @@ -213,7 +213,7 @@ func TestDataPurger_BuildPlan(t *testing.T) { chunkIDs := map[string]struct{}{} for i := range plans { - deletePlan, err := dataPurger.getDeletePlan(context.Background(), userID, deleteRequest.RequestID, i) + deletePlan, err := purger.getDeletePlan(context.Background(), userID, deleteRequest.RequestID, i) require.NoError(t, err) for _, chunksGroup := range deletePlan.ChunksGroup { numChunksInGroup := len(chunksGroup.Chunks) @@ -244,13 +244,13 @@ func TestDataPurger_BuildPlan(t *testing.T) { } require.Equal(t, tc.numChunksToDelete*batchSize, len(chunkIDs)) - require.Equal(t, float64(tc.numChunksToDelete*batchSize), testutil.ToFloat64(dataPurger.metrics.deleteRequestsChunksSelectedTotal)) + require.Equal(t, float64(tc.numChunksToDelete*batchSize), testutil.ToFloat64(purger.metrics.deleteRequestsChunksSelectedTotal)) }) } } } -func TestDataPurger_ExecutePlan(t *testing.T) { +func TestPurger_ExecutePlan(t *testing.T) { fooMetricNameMatcher, err := parser.ParseMetricSelector(`foo`) if err != nil { t.Fatal(err) @@ -259,9 +259,9 @@ func TestDataPurger_ExecutePlan(t *testing.T) { for _, tc := range purgePlanTestCases { for batchSize := 1; batchSize <= 5; batchSize++ { t.Run(fmt.Sprintf("%s/batch-size=%d", tc.name, batchSize), func(t *testing.T) { - deleteStore, chunkStore, _, dataPurger, _ := setupStoresAndPurger(t) + deleteStore, chunkStore, _, purger, _ := setupStoresAndPurger(t) defer func() { - dataPurger.StopAsync() + purger.StopAsync() chunkStore.Stop() }() @@ -290,12 +290,12 @@ func TestDataPurger_ExecutePlan(t *testing.T) { deleteRequest := deleteRequests[0] requestWithLogger := makeDeleteRequestWithLogger(deleteRequest, util.Logger) - err = dataPurger.buildDeletePlan(requestWithLogger) + err = purger.buildDeletePlan(requestWithLogger) require.NoError(t, err) // execute all the plans for i := 0; i < tc.expectedNumberOfPlans; i++ { - err := dataPurger.executePlan(userID, deleteRequest.RequestID, i, requestWithLogger.logger) + err := purger.executePlan(userID, deleteRequest.RequestID, i, requestWithLogger.logger) require.NoError(t, err) } @@ -314,13 +314,13 @@ func TestDataPurger_ExecutePlan(t *testing.T) { } } -func TestDataPurger_Restarts(t *testing.T) { +func TestPurger_Restarts(t *testing.T) { fooMetricNameMatcher, err := parser.ParseMetricSelector(`foo`) if err != nil { t.Fatal(err) } - deleteStore, chunkStore, storageClient, dataPurger, _ := setupStoresAndPurger(t) + deleteStore, chunkStore, storageClient, purger, _ := setupStoresAndPurger(t) defer func() { chunkStore.Stop() }() @@ -341,16 +341,16 @@ func TestDataPurger_Restarts(t *testing.T) { deleteRequest := deleteRequests[0] requestWithLogger := makeDeleteRequestWithLogger(deleteRequest, util.Logger) - err = dataPurger.buildDeletePlan(requestWithLogger) + err = purger.buildDeletePlan(requestWithLogger) require.NoError(t, err) // stop the existing purger - require.NoError(t, services.StopAndAwaitTerminated(context.Background(), dataPurger)) + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), purger)) // create a new purger to check whether it picks up in process delete requests var cfg Config flagext.DefaultValues(&cfg) - newPurger, err := NewDataPurger(cfg, deleteStore, chunkStore, storageClient, prometheus.NewPedanticRegistry()) + newPurger, err := NewPurger(cfg, deleteStore, chunkStore, storageClient, prometheus.NewPedanticRegistry()) require.NoError(t, err) // load in process delete requests by calling Run diff --git a/pkg/chunk/purger/request_handler.go b/pkg/chunk/purger/request_handler.go index 7b019614e96..c8475800aee 100644 --- a/pkg/chunk/purger/request_handler.go +++ b/pkg/chunk/purger/request_handler.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "net/http" + "time" "github.com/go-kit/kit/log/level" @@ -34,15 +35,17 @@ func newDeleteRequestHandlerMetrics(r prometheus.Registerer) *deleteRequestHandl // DeleteRequestHandler provides handlers for delete requests type DeleteRequestHandler struct { - deleteStore *DeleteStore - metrics *deleteRequestHandlerMetrics + deleteStore *DeleteStore + metrics *deleteRequestHandlerMetrics + deleteRequestCancelPeriod time.Duration } // NewDeleteRequestHandler creates a DeleteRequestHandler -func NewDeleteRequestHandler(deleteStore *DeleteStore, registerer prometheus.Registerer) *DeleteRequestHandler { +func NewDeleteRequestHandler(deleteStore *DeleteStore, deleteRequestCancelPeriod time.Duration, registerer prometheus.Registerer) *DeleteRequestHandler { deleteMgr := DeleteRequestHandler{ - deleteStore: deleteStore, - metrics: newDeleteRequestHandlerMetrics(registerer), + deleteStore: deleteStore, + deleteRequestCancelPeriod: deleteRequestCancelPeriod, + metrics: newDeleteRequestHandlerMetrics(registerer), } return &deleteMgr @@ -163,8 +166,8 @@ func (dm *DeleteRequestHandler) CancelDeleteRequestHandler(w http.ResponseWriter return } - if deleteRequest.CreatedAt.Add(deleteRequestCancellationDeadline).Before(model.Now()) { - http.Error(w, fmt.Sprintf("deletion of request past the deadline of %s since its creation is not allowed", deleteRequestCancellationDeadline.String()), http.StatusBadRequest) + if deleteRequest.CreatedAt.Add(dm.deleteRequestCancelPeriod).Before(model.Now()) { + http.Error(w, fmt.Sprintf("deletion of request past the deadline of %s since its creation is not allowed", dm.deleteRequestCancelPeriod.String()), http.StatusBadRequest) return } diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 4c6faa4e105..7279a622315 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -77,27 +77,27 @@ type Config struct { PrintConfig bool `yaml:"-"` HTTPPrefix string `yaml:"http_prefix"` - API api.Config `yaml:"api"` - Server server.Config `yaml:"server"` - Distributor distributor.Config `yaml:"distributor"` - Querier querier.Config `yaml:"querier"` - IngesterClient client.Config `yaml:"ingester_client"` - Ingester ingester.Config `yaml:"ingester"` - Flusher flusher.Config `yaml:"flusher"` - Storage storage.Config `yaml:"storage"` - ChunkStore chunk.StoreConfig `yaml:"chunk_store"` - Schema chunk.SchemaConfig `yaml:"schema" doc:"hidden"` // Doc generation tool doesn't support it because part of the SchemaConfig doesn't support CLI flags (needs manual documentation) - LimitsConfig validation.Limits `yaml:"limits"` - Prealloc client.PreallocConfig `yaml:"prealloc" doc:"hidden"` - Worker frontend.WorkerConfig `yaml:"frontend_worker"` - Frontend frontend.Config `yaml:"frontend"` - QueryRange queryrange.Config `yaml:"query_range"` - TableManager chunk.TableManagerConfig `yaml:"table_manager"` - Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags. - TSDB tsdb.Config `yaml:"tsdb"` - Compactor compactor.Config `yaml:"compactor"` - StoreGateway storegateway.Config `yaml:"store_gateway"` - DataPurgerConfig purger.Config `yaml:"purger"` + API api.Config `yaml:"api"` + Server server.Config `yaml:"server"` + Distributor distributor.Config `yaml:"distributor"` + Querier querier.Config `yaml:"querier"` + IngesterClient client.Config `yaml:"ingester_client"` + Ingester ingester.Config `yaml:"ingester"` + Flusher flusher.Config `yaml:"flusher"` + Storage storage.Config `yaml:"storage"` + ChunkStore chunk.StoreConfig `yaml:"chunk_store"` + Schema chunk.SchemaConfig `yaml:"schema" doc:"hidden"` // Doc generation tool doesn't support it because part of the SchemaConfig doesn't support CLI flags (needs manual documentation) + LimitsConfig validation.Limits `yaml:"limits"` + Prealloc client.PreallocConfig `yaml:"prealloc" doc:"hidden"` + Worker frontend.WorkerConfig `yaml:"frontend_worker"` + Frontend frontend.Config `yaml:"frontend"` + QueryRange queryrange.Config `yaml:"query_range"` + TableManager chunk.TableManagerConfig `yaml:"table_manager"` + Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags. + TSDB tsdb.Config `yaml:"tsdb"` + Compactor compactor.Config `yaml:"compactor"` + StoreGateway storegateway.Config `yaml:"store_gateway"` + PurgerConfig purger.Config `yaml:"purger"` Ruler ruler.Config `yaml:"ruler"` Configs configs.Config `yaml:"configs"` @@ -135,7 +135,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.TSDB.RegisterFlags(f) c.Compactor.RegisterFlags(f) c.StoreGateway.RegisterFlags(f) - c.DataPurgerConfig.RegisterFlags(f) + c.PurgerConfig.RegisterFlags(f) c.Ruler.RegisterFlags(f) c.Configs.RegisterFlags(f) @@ -207,7 +207,7 @@ type Cortex struct { TableManager *chunk.TableManager Cache cache.Cache RuntimeConfig *runtimeconfig.Manager - DataPurger *purger.DataPurger + Purger *purger.Purger TombstonesLoader *purger.TombstonesLoader Ruler *ruler.Ruler diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 4cd1ed61720..f1b97345c7a 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -60,7 +60,7 @@ const ( Compactor string = "compactor" StoreGateway string = "store-gateway" MemberlistKV string = "memberlist-kv" - DataPurger string = "data-purger" + Purger string = "data-purger" All string = "all" ) @@ -281,7 +281,7 @@ func (t *Cortex) initStore() (serv services.Service, err error) { } func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) { - if !t.Cfg.DataPurgerConfig.Enable { + if !t.Cfg.PurgerConfig.Enable { // until we need to explicitly enable delete series support we need to do create TombstonesLoader without DeleteStore which acts as noop t.TombstonesLoader = purger.NewTombstonesLoader(nil, nil) @@ -394,7 +394,7 @@ func (t *Cortex) initTableManager() (services.Service, error) { util.CheckFatal("initializing bucket client", err) var extraTables []chunk.ExtraTables - if t.Cfg.DataPurgerConfig.Enable { + if t.Cfg.PurgerConfig.Enable { deleteStoreTableClient, err := storage.NewTableClient(t.Cfg.Storage.DeleteStoreConfig.Store, t.Cfg.Storage) if err != nil { return nil, err @@ -489,24 +489,24 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) { return t.MemberlistKV, nil } -func (t *Cortex) initDataPurger() (services.Service, error) { - if !t.Cfg.DataPurgerConfig.Enable { +func (t *Cortex) initPurger() (services.Service, error) { + if !t.Cfg.PurgerConfig.Enable { return nil, nil } - storageClient, err := storage.NewObjectClient(t.Cfg.DataPurgerConfig.ObjectStoreType, t.Cfg.Storage) + storageClient, err := storage.NewObjectClient(t.Cfg.PurgerConfig.ObjectStoreType, t.Cfg.Storage) if err != nil { return nil, err } - t.DataPurger, err = purger.NewDataPurger(t.Cfg.DataPurgerConfig, t.DeletesStore, t.Store, storageClient, prometheus.DefaultRegisterer) + t.Purger, err = purger.NewPurger(t.Cfg.PurgerConfig, t.DeletesStore, t.Store, storageClient, prometheus.DefaultRegisterer) if err != nil { return nil, err } - t.API.RegisterPurger(t.DeletesStore) + t.API.RegisterPurger(t.DeletesStore, t.Cfg.PurgerConfig.DeleteRequestCancelPeriod) - return t.DataPurger, nil + return t.Purger, nil } func (t *Cortex) setupModuleManager() error { @@ -534,7 +534,7 @@ func (t *Cortex) setupModuleManager() error { mm.RegisterModule(AlertManager, t.initAlertManager) mm.RegisterModule(Compactor, t.initCompactor) mm.RegisterModule(StoreGateway, t.initStoreGateway) - mm.RegisterModule(DataPurger, t.initDataPurger) + mm.RegisterModule(Purger, t.initPurger) mm.RegisterModule(All, nil) mm.RegisterModule(StoreGateway, t.initStoreGateway) @@ -556,8 +556,8 @@ func (t *Cortex) setupModuleManager() error { AlertManager: {API}, Compactor: {API}, StoreGateway: {API}, - DataPurger: {Store, DeleteRequestsStore, API}, - All: {QueryFrontend, Querier, Ingester, Distributor, TableManager, DataPurger, StoreGateway}, + Purger: {Store, DeleteRequestsStore, API}, + All: {QueryFrontend, Querier, Ingester, Distributor, TableManager, Purger, StoreGateway}, } for mod, targets := range deps { if err := mm.AddDependency(mod, targets...); err != nil {