Skip to content

Commit b9b3122

Browse files
authored
Added jitter to blocks storage period bucket scans (#2693)
Signed-off-by: Marco Pracucci <[email protected]>
1 parent a81c917 commit b9b3122

File tree

5 files changed

+29
-3
lines changed

5 files changed

+29
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@
106106
* [ENHANCEMENT] Add `-cassandra.num-connections` to allow increasing the number of TCP connections to each Cassandra server. #2666
107107
* [ENHANCEMENT] Use separate Cassandra clients and connections for reads and writes. #2666
108108
* [ENHANCEMENT] Add `-cassandra.reconnect-interval` to allow specifying the reconnect interval to a Cassandra server that has been marked `DOWN` by the gocql driver. Also change the default value of the reconnect interval from `60s` to `1s`. #2687
109+
* [ENHANCEMENT] Experimental TSDB: Applied a jitter to the period bucket scans in order to better distribute bucket operations over the time and increase the probability of hitting the shared cache (if configured). #2693
109110
* [BUGFIX] Ruler: Ensure temporary rule files with special characters are properly mapped and cleaned up. #2506
110111
* [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372
111112
* [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400

pkg/querier/blocks_scanner.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@ func NewBlocksScanner(cfg BlocksScannerConfig, bucketClient objstore.Bucket, log
8888
prometheus.WrapRegistererWith(prometheus.Labels{"component": "querier"}, reg).MustRegister(d.fetchersMetrics)
8989
}
9090

91-
d.Service = services.NewTimerService(cfg.ScanInterval, d.starting, d.scan, nil)
91+
// Apply a jitter to the sync frequency in order to increase the probability
92+
// of hitting the shared cache (if any).
93+
scanInterval := util.DurationWithJitter(cfg.ScanInterval, 0.2)
94+
d.Service = services.NewTimerService(scanInterval, d.starting, d.scan, nil)
9295

9396
return d
9497
}

pkg/storegateway/gateway.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/cortexproject/cortex/pkg/ring/kv"
2121
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2222
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
23+
"github.com/cortexproject/cortex/pkg/util"
2324
"github.com/cortexproject/cortex/pkg/util/services"
2425
)
2526

@@ -226,12 +227,14 @@ func (g *StoreGateway) running(ctx context.Context) error {
226227
var ringTickerChan <-chan time.Time
227228
var ringLastState ring.ReplicationSet
228229

229-
syncTicker := time.NewTicker(g.storageCfg.BucketStore.SyncInterval)
230+
// Apply a jitter to the sync frequency in order to increase the probability
231+
// of hitting the shared cache (if any).
232+
syncTicker := time.NewTicker(util.DurationWithJitter(g.storageCfg.BucketStore.SyncInterval, 0.2))
230233
defer syncTicker.Stop()
231234

232235
if g.gatewayCfg.ShardingEnabled {
233236
ringLastState, _ = g.ring.GetAll(ring.BlocksSync) // nolint:errcheck
234-
ringTicker := time.NewTicker(g.gatewayCfg.ShardingRing.RingCheckPeriod)
237+
ringTicker := time.NewTicker(util.DurationWithJitter(g.gatewayCfg.ShardingRing.RingCheckPeriod, 0.2))
235238
defer ringTicker.Stop()
236239
ringTickerChan = ringTicker.C
237240
}

pkg/util/time.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package util
22

33
import (
44
"math"
5+
"math/rand"
56
"net/http"
67
"strconv"
78
"time"
@@ -34,3 +35,10 @@ func ParseTime(s string) (int64, error) {
3435
}
3536
return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid timestamp", s)
3637
}
38+
39+
func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration {
40+
variance := int64(float64(input) * variancePerc)
41+
jitter := rand.Int63n(variance*2) - variance
42+
43+
return input + time.Duration(jitter)
44+
}

pkg/util/time_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"testing"
55
"time"
66

7+
"github.com/stretchr/testify/assert"
78
"github.com/stretchr/testify/require"
89
)
910

@@ -23,3 +24,13 @@ func TestTimeFromMillis(t *testing.T) {
2324
})
2425
}
2526
}
27+
28+
func TestDurationWithJitter(t *testing.T) {
29+
const numRuns = 1000
30+
31+
for i := 0; i < numRuns; i++ {
32+
actual := DurationWithJitter(time.Minute, 0.5)
33+
assert.GreaterOrEqual(t, int64(actual), int64(30*time.Second))
34+
assert.LessOrEqual(t, int64(actual), int64(90*time.Second))
35+
}
36+
}

0 commit comments

Comments
 (0)