Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
* [ENHANCEMENT] Experimental TSDB: Added `cortex_querier_blocks_meta_synced`, which reflects current state of synced blocks over all tenants. #2392
* [ENHANCEMENT] Added `cortex_distributor_latest_seen_sample_timestamp_seconds` metric to see how far behind Prometheus servers are in sending data. #2371
* [ENHANCEMENT] FIFO cache to support eviction based on memory usage. The `-<prefix>.fifocache.size` CLI flag has been renamed to `-<prefix>.fifocache.max-size-items` as well as its YAML config option `size` renamed to `max_size_items`. Added `-<prefix>.fifocache.max-size-bytes` CLI flag and YAML config option `max_size_bytes` to specify memory limit of the cache. #2319
* [ENHANCEMENT] Added the following metrics for monitoring delete requests: #2445
- `cortex_purger_delete_requests_received_total`: Number of delete requests received per user.
- `cortex_purger_delete_requests_processed_total`: Number of delete requests processed per user.
- `cortex_purger_delete_requests_chunks_selected_total`: Number of chunks selected while building delete plans per user.
- `cortex_purger_delete_requests_processing_failures_total`: Number of delete requests processing failures per user.
* [ENHANCEMENT] Single Binary: Added query-frontend to the single binary. Single binary users will now benefit from various query-frontend features. Primarily: sharding, parallelization, load shedding, additional caching (if configured), and query retries. #2437
* [ENHANCEMENT] Allow 1w (where w denotes week) and 1y (where y denotes year) when setting `-store.cache-lookups-older-than` and `-store.max-look-back-period`. #2454
* [ENHANCEMENT] Optimize index queries for matchers using "a|b|c"-type regex. #2446 #2475
Expand Down
4 changes: 3 additions & 1 deletion pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"regexp"
"strings"

"github.com/prometheus/client_golang/prometheus"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove new line please, and merge these two import groups.

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/route"
Expand Down Expand Up @@ -177,7 +179,7 @@ func (a *API) RegisterIngester(i *ingester.Ingester, pushConfig distributor.Conf
// match the Prometheus API but mirror it closely enough to justify their routing under the Prometheus
// component/
func (a *API) RegisterPurger(store *purger.DeleteStore) {
deleteRequestHandler := purger.NewDeleteRequestHandler(store)
deleteRequestHandler := purger.NewDeleteRequestHandler(store, prometheus.DefaultRegisterer)

a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/admin/tsdb/delete_series", http.HandlerFunc(deleteRequestHandler.AddDeleteRequestHandler), true, "PUT", "POST")
a.RegisterRoute(a.cfg.PrometheusHTTPPrefix+"/api/v1/admin/tsdb/delete_series", http.HandlerFunc(deleteRequestHandler.GetAllDeleteRequestsHandler), true, "GET")
Expand Down
64 changes: 58 additions & 6 deletions pkg/chunk/purger/purger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/weaveworks/common/user"
Expand All @@ -24,6 +26,34 @@ import (

const millisecondPerDay = int64(24 * time.Hour / time.Millisecond)

type purgerMetrics struct {
deleteRequestsProcessedTotal *prometheus.CounterVec
deleteRequestsChunksSelectedTotal *prometheus.CounterVec
deleteRequestsProcessingFailures *prometheus.CounterVec
}

func newPurgerMetrics(r prometheus.Registerer) *purgerMetrics {
m := purgerMetrics{}

m.deleteRequestsProcessedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "purger_delete_requests_processed_total",
Help: "Number of delete requests processed per user",
}, []string{"user"})
m.deleteRequestsChunksSelectedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "purger_delete_requests_chunks_selected_total",
Help: "Number of chunks selected while building delete plans per user",
}, []string{"user"})
m.deleteRequestsProcessingFailures = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "purger_delete_requests_processing_failures_total",
Help: "Number of delete requests processing failures per user",
}, []string{"user"})

return &m
}

type deleteRequestWithLogger struct {
DeleteRequest
logger log.Logger // logger is initialized with userID and requestID to add context to every log generated using this
Expand Down Expand Up @@ -58,6 +88,7 @@ type DataPurger struct {
deleteStore *DeleteStore
chunkStore chunk.Store
objectClient chunk.ObjectClient
metrics *purgerMetrics

executePlansChan chan deleteRequestWithLogger
workerJobChan chan workerJob
Expand All @@ -74,14 +105,15 @@ type DataPurger struct {
}

// NewDataPurger creates a new DataPurger
func NewDataPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store, storageClient chunk.ObjectClient) (*DataPurger, error) {
func NewDataPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store, storageClient chunk.ObjectClient, registerer prometheus.Registerer) (*DataPurger, error) {
util.WarnExperimentalUse("Delete series API")

dataPurger := DataPurger{
cfg: cfg,
deleteStore: deleteStore,
chunkStore: chunkStore,
objectClient: storageClient,
metrics: newPurgerMetrics(registerer),
executePlansChan: make(chan deleteRequestWithLogger, 50),
workerJobChan: make(chan workerJob, 50),
inProcessRequestIDs: map[string]string{},
Expand Down Expand Up @@ -140,6 +172,7 @@ func (dp *DataPurger) workerJobCleanup(job workerJob) {
level.Error(job.logger).Log("msg", "error updating delete request status to process", "err", err)
}

dp.metrics.deleteRequestsProcessedTotal.WithLabelValues(job.userID).Inc()
delete(dp.pendingPlansCount, job.deleteRequestID)
dp.pendingPlansCountMtx.Unlock()

Expand Down Expand Up @@ -182,6 +215,7 @@ func (dp *DataPurger) worker() {
for job := range dp.workerJobChan {
err := dp.executePlan(job.userID, job.deleteRequestID, job.planNo, job.logger)
if err != nil {
dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(job.userID).Inc()
level.Error(job.logger).Log("msg", "error executing delete plan",
"plan_no", job.planNo, "err", err)
continue
Expand Down Expand Up @@ -267,7 +301,9 @@ func (dp *DataPurger) loadInprocessDeleteRequests() error {
dp.inProcessRequestIDs[deleteRequest.UserID] = deleteRequest.RequestID
err := dp.buildDeletePlan(req)
if err != nil {
dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(deleteRequest.UserID).Inc()
level.Error(req.logger).Log("msg", "error building delete plan", "err", err)
continue
}

level.Info(req.logger).Log("msg", "sending delete request for execution")
Expand Down Expand Up @@ -329,6 +365,8 @@ func (dp *DataPurger) pullDeleteRequestsToPlanDeletes() error {

err := dp.buildDeletePlan(req)
if err != nil {
dp.metrics.deleteRequestsProcessingFailures.WithLabelValues(deleteRequest.UserID).Inc()

// We do not want to remove this delete request from inProcessRequestIDs to make sure
// we do not move multiple deleting requests in deletion process.
// None of the other delete requests from the user would be considered for processing until then.
Expand All @@ -355,6 +393,8 @@ func (dp *DataPurger) buildDeletePlan(req deleteRequestWithLogger) error {
level.Info(req.logger).Log("msg", "building delete plan", "num_plans", len(perDayTimeRange))

plans := make([][]byte, len(perDayTimeRange))
includedChunkIDs := map[string]struct{}{}

for i, planRange := range perDayTimeRange {
chunksGroups := []ChunksGroup{}

Expand All @@ -364,13 +404,17 @@ func (dp *DataPurger) buildDeletePlan(req deleteRequestWithLogger) error {
return err
}

// ToDo: remove duplicate chunks
chunks, err := dp.chunkStore.Get(ctx, req.UserID, planRange.Start, planRange.End, matchers...)
if err != nil {
return err
}

chunksGroups = append(chunksGroups, groupChunks(chunks, req.StartTime, req.EndTime)...)
var cg []ChunksGroup
cg, includedChunkIDs = groupChunks(chunks, req.StartTime, req.EndTime, includedChunkIDs)

if len(cg) != 0 {
chunksGroups = append(chunksGroups, cg...)
}
}

plan := DeletePlan{
Expand Down Expand Up @@ -399,6 +443,8 @@ func (dp *DataPurger) buildDeletePlan(req deleteRequestWithLogger) error {
return err
}

dp.metrics.deleteRequestsChunksSelectedTotal.WithLabelValues(req.UserID).Add(float64(len(includedChunkIDs)))

level.Info(req.logger).Log("msg", "built delete plans", "num_plans", len(perDayTimeRange))

return nil
Expand Down Expand Up @@ -482,10 +528,15 @@ func numPlans(start, end model.Time) int {

// groups chunks together by unique label sets i.e all the chunks with same labels would be stored in a group
// chunk details are stored in groups for each unique label set to avoid storing them repetitively for each chunk
func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []ChunksGroup {
func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time, includedChunkIDs map[string]struct{}) ([]ChunksGroup, map[string]struct{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it more like processedChunkIDs?

Also there is no need to return it back from the function – modifications are already visible to the caller. Function comment should however mention this fact (that passed map is modified).

metricToChunks := make(map[string]ChunksGroup)

for _, chk := range chunks {
chunkID := chk.ExternalKey()

if _, ok := includedChunkIDs[chunkID]; ok {
continue
}
// chunk.Metric are assumed to be sorted which should give same value from String() for same series.
// If they stop being sorted then in the worst case we would lose the benefit of grouping chunks to avoid storing labels repetitively.
metricString := chk.Metric.String()
Expand All @@ -494,7 +545,7 @@ func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []C
group = ChunksGroup{Labels: client.FromLabelsToLabelAdapters(chk.Metric)}
}

chunkDetails := ChunkDetails{ID: chk.ExternalKey()}
chunkDetails := ChunkDetails{ID: chunkID}

if deleteFrom > chk.From || deleteThrough < chk.Through {
partiallyDeletedInterval := Interval{StartTimestampMs: int64(chk.From), EndTimestampMs: int64(chk.Through)}
Expand All @@ -510,6 +561,7 @@ func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []C
}

group.Chunks = append(group.Chunks, chunkDetails)
includedChunkIDs[chunkID] = struct{}{}
metricToChunks[metricString] = group
}

Expand All @@ -519,7 +571,7 @@ func groupChunks(chunks []chunk.Chunk, deleteFrom, deleteThrough model.Time) []C
chunksGroups = append(chunksGroups, group)
}

return chunksGroups
return chunksGroups, includedChunkIDs
}

func isMissingChunkErr(err error) bool {
Expand Down
13 changes: 11 additions & 2 deletions pkg/chunk/purger/purger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func setupStoresAndPurger(t *testing.T) (*DeleteStore, chunk.Store, chunk.Object
var cfg Config
flagext.DefaultValues(&cfg)

dataPurger, err := NewDataPurger(cfg, deleteStore, chunkStore, storageClient)
dataPurger, err := NewDataPurger(cfg, deleteStore, chunkStore, storageClient, nil)
require.NoError(t, err)

return deleteStore, chunkStore, storageClient, dataPurger
Expand Down Expand Up @@ -149,6 +149,15 @@ var purgePlanTestCases = []struct {
firstChunkPartialDeletionInterval: &Interval{StartTimestampMs: int64(modelTimeDay.Add(-30 * time.Minute)),
EndTimestampMs: int64(modelTimeDay.Add(-15 * time.Minute))},
},
{
name: "building multi-day chunk and deleting part of it for each day",
chunkStoreDataInterval: model.Interval{Start: modelTimeDay.Add(-30 * time.Minute), End: modelTimeDay.Add(30 * time.Minute)},
deleteRequestInterval: model.Interval{Start: modelTimeDay.Add(-15 * time.Minute), End: modelTimeDay.Add(15 * time.Minute)},
expectedNumberOfPlans: 2,
numChunksToDelete: 1,
firstChunkPartialDeletionInterval: &Interval{StartTimestampMs: int64(modelTimeDay.Add(-15 * time.Minute)),
EndTimestampMs: int64(modelTimeDay.Add(15 * time.Minute))},
},
}

func TestDataPurger_BuildPlan(t *testing.T) {
Expand Down Expand Up @@ -327,7 +336,7 @@ func TestDataPurger_Restarts(t *testing.T) {
// create a new purger to check whether it picks up in process delete requests
var cfg Config
flagext.DefaultValues(&cfg)
newPurger, err := NewDataPurger(cfg, deleteStore, chunkStore, storageClient)
newPurger, err := NewDataPurger(cfg, deleteStore, chunkStore, storageClient, nil)
require.NoError(t, err)

// load in process delete requests by calling Run
Expand Down
29 changes: 26 additions & 3 deletions pkg/chunk/purger/request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,42 @@ import (
"fmt"
"net/http"

"github.com/cortexproject/cortex/pkg/util"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/util"
)

type deleteRequestHandlerMetrics struct {
deleteRequestsReceivedTotal *prometheus.CounterVec
}

func newDeleteRequestHandlerMetrics(r prometheus.Registerer) *deleteRequestHandlerMetrics {
m := deleteRequestHandlerMetrics{}

m.deleteRequestsReceivedTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "purger_delete_requests_received_total",
Help: "Number of delete requests received per user",
}, []string{"user"})

return &m
}

// DeleteRequestHandler provides handlers for delete requests
type DeleteRequestHandler struct {
deleteStore *DeleteStore
metrics *deleteRequestHandlerMetrics
}

// NewDeleteRequestHandler creates a DeleteRequestHandler
func NewDeleteRequestHandler(deleteStore *DeleteStore) *DeleteRequestHandler {
func NewDeleteRequestHandler(deleteStore *DeleteStore, registerer prometheus.Registerer) *DeleteRequestHandler {
deleteMgr := DeleteRequestHandler{
deleteStore: deleteStore,
metrics: newDeleteRequestHandlerMetrics(registerer),
}

return &deleteMgr
Expand Down Expand Up @@ -83,7 +103,10 @@ func (dm *DeleteRequestHandler) AddDeleteRequestHandler(w http.ResponseWriter, r

if err := dm.deleteStore.AddDeleteRequest(ctx, userID, model.Time(startTime), model.Time(endTime), match); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

dm.metrics.deleteRequestsReceivedTotal.WithLabelValues(userID).Inc()
}

// GetAllDeleteRequestsHandler handles get all delete requests
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func (t *Cortex) initDataPurger(cfg *Config) (services.Service, error) {
return nil, err
}

t.dataPurger, err = purger.NewDataPurger(cfg.DataPurgerConfig, t.deletesStore, t.store, storageClient)
t.dataPurger, err = purger.NewDataPurger(cfg.DataPurgerConfig, t.deletesStore, t.store, storageClient, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down