Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
* [ENHANCEMENT] Add `-cassandra.num-connections` to allow increasing the number of TCP connections to each Cassandra server. #2666
* [ENHANCEMENT] Use separate Cassandra clients and connections for reads and writes. #2666
* [ENHANCEMENT] Add `-cassandra.reconnect-interval` to allow specifying the reconnect interval to a Cassandra server that has been marked `DOWN` by the gocql driver. Also change the default value of the reconnect interval from `60s` to `1s`. #2687
* [ENHANCEMENT] Experimental TSDB: Applied a jitter to the period bucket scans in order to better distribute bucket operations over the time and increase the probability of hitting the shared cache (if configured). #2693
* [BUGFIX] Ruler: Ensure temporary rule files with special characters are properly mapped and cleaned up. #2506
* [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372
* [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400
Expand Down
5 changes: 4 additions & 1 deletion pkg/querier/blocks_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ func NewBlocksScanner(cfg BlocksScannerConfig, bucketClient objstore.Bucket, log
prometheus.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg).MustRegister(d.fetchersMetrics)
}

d.Service = services.NewTimerService(cfg.ScanInterval, d.starting, d.scan, nil)
// Apply a jitter to the sync frequency in order to increase the probability
// of hitting the shared cache (if any).
scanInterval := util.DurationWithJitter(cfg.ScanInterval, 0.2)
d.Service = services.NewTimerService(scanInterval, d.starting, d.scan, nil)

return d
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring/kv"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)

Expand Down Expand Up @@ -226,12 +227,14 @@ func (g *StoreGateway) running(ctx context.Context) error {
var ringTickerChan <-chan time.Time
var ringLastState ring.ReplicationSet

syncTicker := time.NewTicker(g.storageCfg.BucketStore.SyncInterval)
// Apply a jitter to the sync frequency in order to increase the probability
// of hitting the shared cache (if any).
syncTicker := time.NewTicker(util.DurationWithJitter(g.storageCfg.BucketStore.SyncInterval, 0.2))
defer syncTicker.Stop()

if g.gatewayCfg.ShardingEnabled {
ringLastState, _ = g.ring.GetAll(ring.BlocksSync) // nolint:errcheck
ringTicker := time.NewTicker(g.gatewayCfg.ShardingRing.RingCheckPeriod)
ringTicker := time.NewTicker(util.DurationWithJitter(g.gatewayCfg.ShardingRing.RingCheckPeriod, 0.2))
defer ringTicker.Stop()
ringTickerChan = ringTicker.C
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package util

import (
"math"
"math/rand"
"net/http"
"strconv"
"time"
Expand Down Expand Up @@ -34,3 +35,10 @@ func ParseTime(s string) (int64, error) {
}
return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid timestamp", s)
}

func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration {
variance := int64(float64(input) * variancePerc)
jitter := rand.Int63n(variance*2) - variance

return input + time.Duration(jitter)
}
11 changes: 11 additions & 0 deletions pkg/util/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -23,3 +24,13 @@ func TestTimeFromMillis(t *testing.T) {
})
}
}

func TestDurationWithJitter(t *testing.T) {
const numRuns = 1000

for i := 0; i < numRuns; i++ {
actual := DurationWithJitter(time.Minute, 0.5)
assert.GreaterOrEqual(t, int64(actual), int64(30*time.Second))
assert.LessOrEqual(t, int64(actual), int64(90*time.Second))
}
}