Skip to content

Commit b80d1f6

Browse files
pstibranygouthamve
authored andcommitted
Implement Hashicorps's recommendations when watching consul key (#1713)
* Implemented recommendations from Consul docs https://www.consul.io/api/features/blocking.html#implementation-details Fixes issue #1708 Signed-off-by: Peter Štibraný <[email protected]> * Added tests for rate limiting, and index resets. Signed-off-by: Peter Štibraný <[email protected]> * Call cancel function, even though we don't use it. This makes lint happy. Signed-off-by: Peter Štibraný <[email protected]> * Use rate limiter from golang.org/x/time/rate Removed copy of modified jaegers' rate-limiter Signed-off-by: Peter Štibraný <[email protected]> * Rate limit configuration Signed-off-by: Peter Štibraný <[email protected]> * WatchPrefix should use same index checks as WatchKey Signed-off-by: Peter Štibraný <[email protected]> * Added comment for NewInMemoryClientWithConfig Signed-off-by: Peter Štibraný <[email protected]> * Use rate.Inf as rate limit, when it is not set. Signed-off-by: Peter Štibraný <[email protected]> * TestReset doesn't use rate limit now. Signed-off-by: Peter Štibraný <[email protected]> * Remove unused variable. Signed-off-by: Peter Štibraný <[email protected]> * Rename watch-rate -> watch-rate-limit, watch-burst -> watch-burst-size. Signed-off-by: Peter Štibraný <[email protected]> * Use 0 instead of "Zero" Signed-off-by: Peter Štibraný <[email protected]> * Updated changelog with consul client enhancements. Signed-off-by: Peter Štibraný <[email protected]> * Moved changelog entries to the unreleased version. Signed-off-by: Peter Štibraný <[email protected]>
1 parent 14e68bc commit b80d1f6

File tree

4 files changed

+263
-12
lines changed

4 files changed

+263
-12
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
* [CHANGE] Removed `Delta` encoding. Any old chunks with `Delta` encoding cannot be read anymore. If `ingester.chunk-encoding` is set to `Delta` the ingester will fail to start. #1706
44
* [ENHANCEMENT] Allocation improvements in adding samples to Chunk. #1706
5+
* [ENHANCEMENT] Consul client now follows recommended practices for blocking queries wrt returned Index value. #1708
6+
* [ENHANCEMENT] Consul client can optionally rate-limit itself during Watch (used e.g. by ring watchers) and WatchPrefix (used by HA feature) operations. Rate limiting is disabled by default. New flags added: `--consul.watch-rate-limit`, and `--consul.watch-burst-size`. #1708
57

68
## 0.3.0 / 2019-10-11
79

pkg/ring/kv/consul/client.go

Lines changed: 60 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ import (
1010

1111
"github.com/go-kit/kit/log/level"
1212
consul "github.com/hashicorp/consul/api"
13-
cleanhttp "github.com/hashicorp/go-cleanhttp"
13+
"github.com/hashicorp/go-cleanhttp"
1414
"github.com/weaveworks/common/instrument"
15+
"golang.org/x/time/rate"
1516

1617
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
1718
"github.com/cortexproject/cortex/pkg/util"
@@ -39,6 +40,8 @@ type Config struct {
3940
ACLToken string
4041
HTTPClientTimeout time.Duration
4142
ConsistentReads bool
43+
WatchKeyRateLimit float64 // Zero disables rate limit
44+
WatchKeyBurstSize int // Burst when doing rate-limit, defaults to 1
4245
}
4346

4447
type kv interface {
@@ -62,6 +65,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) {
6265
f.StringVar(&cfg.ACLToken, prefix+"consul.acltoken", "", "ACL Token used to interact with Consul.")
6366
f.DurationVar(&cfg.HTTPClientTimeout, prefix+"consul.client-timeout", 2*longPollDuration, "HTTP timeout when talking to Consul")
6467
f.BoolVar(&cfg.ConsistentReads, prefix+"consul.consistent-reads", true, "Enable consistent reads to Consul.")
68+
f.Float64Var(&cfg.WatchKeyRateLimit, prefix+"consul.watch-rate-limit", 0, "Rate limit when watching key or prefix in Consul, in requests per second. 0 disables the rate limit.")
69+
f.IntVar(&cfg.WatchKeyBurstSize, prefix+"consul.watch-burst-size", 1, "Burst size used in rate limit. Values less than 1 are treated as 1.")
6570
}
6671

6772
// NewClient returns a new Client.
@@ -170,8 +175,17 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b
170175
var (
171176
backoff = util.NewBackoff(ctx, backoffConfig)
172177
index = uint64(0)
178+
limiter = c.createRateLimiter()
173179
)
180+
174181
for backoff.Ongoing() {
182+
err := limiter.Wait(ctx)
183+
if err != nil {
184+
level.Error(util.Logger).Log("msg", "error while rate-limiting", "key", key, "err", err)
185+
backoff.Wait()
186+
continue
187+
}
188+
175189
queryOptions := &consul.QueryOptions{
176190
AllowStale: !c.cfg.ConsistentReads,
177191
RequireConsistent: c.cfg.ConsistentReads,
@@ -187,12 +201,11 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b
187201
}
188202
backoff.Reset()
189203

190-
// Skip if the index is the same as last time, because the key value is
191-
// guaranteed to be the same as last time
192-
if index == meta.LastIndex {
204+
skip := false
205+
index, skip = checkLastIndex(index, meta.LastIndex)
206+
if skip {
193207
continue
194208
}
195-
index = meta.LastIndex
196209

197210
out, err := c.codec.Decode(kvp.Value)
198211
if err != nil {
@@ -212,8 +225,16 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
212225
var (
213226
backoff = util.NewBackoff(ctx, backoffConfig)
214227
index = uint64(0)
228+
limiter = c.createRateLimiter()
215229
)
216230
for backoff.Ongoing() {
231+
err := limiter.Wait(ctx)
232+
if err != nil {
233+
level.Error(util.Logger).Log("msg", "error while rate-limiting", "prefix", prefix, "err", err)
234+
backoff.Wait()
235+
continue
236+
}
237+
217238
queryOptions := &consul.QueryOptions{
218239
AllowStale: !c.cfg.ConsistentReads,
219240
RequireConsistent: c.cfg.ConsistentReads,
@@ -228,13 +249,13 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
228249
continue
229250
}
230251
backoff.Reset()
231-
// Skip if the index is the same as last time, because the key value is
232-
// guaranteed to be the same as last time
233-
if index == meta.LastIndex {
252+
253+
skip := false
254+
index, skip = checkLastIndex(index, meta.LastIndex)
255+
if skip {
234256
continue
235257
}
236258

237-
index = meta.LastIndex
238259
for _, kvp := range kvps {
239260
out, err := c.codec.Decode(kvp.Value)
240261
if err != nil {
@@ -264,3 +285,33 @@ func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {
264285
}
265286
return c.codec.Decode(kvp.Value)
266287
}
288+
289+
func checkLastIndex(index, metaLastIndex uint64) (newIndex uint64, skip bool) {
290+
// See https://www.consul.io/api/features/blocking.html#implementation-details for logic behind these checks.
291+
if metaLastIndex == 0 {
292+
// Don't just keep using index=0.
293+
// After blocking request, returned index must be at least 1.
294+
return 1, false
295+
} else if metaLastIndex < index {
296+
// Index reset.
297+
return 0, false
298+
} else if index == metaLastIndex {
299+
// Skip if the index is the same as last time, because the key value is
300+
// guaranteed to be the same as last time
301+
return metaLastIndex, true
302+
} else {
303+
return metaLastIndex, false
304+
}
305+
}
306+
307+
func (c *Client) createRateLimiter() *rate.Limiter {
308+
if c.cfg.WatchKeyRateLimit <= 0 {
309+
// burst is ignored when limit = rate.Inf
310+
return rate.NewLimiter(rate.Inf, 0)
311+
}
312+
burst := c.cfg.WatchKeyBurstSize
313+
if burst < 1 {
314+
burst = 1
315+
}
316+
return rate.NewLimiter(rate.Limit(c.cfg.WatchKeyRateLimit), burst)
317+
}

pkg/ring/kv/consul/client_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package consul
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strconv"
7+
"testing"
8+
"time"
9+
10+
"github.com/go-kit/kit/log/level"
11+
consul "github.com/hashicorp/consul/api"
12+
13+
"github.com/cortexproject/cortex/pkg/util"
14+
)
15+
16+
type stringCodec struct{}
17+
18+
func (c stringCodec) Encode(d interface{}) ([]byte, error) {
19+
if d == nil {
20+
return nil, fmt.Errorf("nil")
21+
}
22+
s, ok := d.(string)
23+
if !ok {
24+
return nil, fmt.Errorf("not string: %T", d)
25+
}
26+
27+
return []byte(s), nil
28+
}
29+
30+
func (c stringCodec) Decode(d []byte) (interface{}, error) {
31+
return string(d), nil
32+
}
33+
34+
func writeValuesToKV(client *Client, key string, start, end int, sleep time.Duration) <-chan struct{} {
35+
ch := make(chan struct{})
36+
go func() {
37+
defer close(ch)
38+
for i := start; i <= end; i++ {
39+
level.Debug(util.Logger).Log("ts", time.Now(), "msg", "writing value", "val", i)
40+
_, _ = client.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil)
41+
time.Sleep(sleep)
42+
}
43+
}()
44+
return ch
45+
}
46+
47+
func TestWatchKey(t *testing.T) {
48+
c := NewInMemoryClientWithConfig(&stringCodec{}, Config{
49+
WatchKeyRateLimit: 5.0,
50+
WatchKeyBurstSize: 1,
51+
})
52+
53+
const key = "test"
54+
const max = 100
55+
56+
// Make sure to start with non-empty value, otherwise WatchKey will bail
57+
_, _ = c.Put(&consul.KVPair{Key: key, Value: []byte("start")}, nil)
58+
59+
ch := writeValuesToKV(c, key, 0, max, 10*time.Millisecond)
60+
61+
observed := observeValueForSomeTime(c, key, 1200*time.Millisecond) // little over 1 second
62+
63+
// wait until updater finishes
64+
<-ch
65+
66+
if testing.Verbose() {
67+
t.Log(observed)
68+
}
69+
// Let's see how many updates we have observed. Given the rate limit and our observing time, it should be 6
70+
// We should also have seen one of the later values, as we're observing for longer than a second, so rate limit should allow
71+
// us to see it.
72+
if len(observed) < 5 || len(observed) > 10 {
73+
t.Error("Expected ~6 observed values, got", observed)
74+
}
75+
last := observed[len(observed)-1]
76+
n, _ := strconv.Atoi(last)
77+
if n < max/2 {
78+
t.Error("Expected to see high last observed value, got", observed)
79+
}
80+
}
81+
82+
func TestWatchKeyNoRateLimit(t *testing.T) {
83+
c := NewInMemoryClientWithConfig(&stringCodec{}, Config{
84+
WatchKeyRateLimit: 0,
85+
})
86+
87+
const key = "test"
88+
const max = 100
89+
90+
// Make sure to start with non-empty value, otherwise WatchKey will bail
91+
_, _ = c.Put(&consul.KVPair{Key: key, Value: []byte("start")}, nil)
92+
93+
ch := writeValuesToKV(c, key, 0, max, time.Millisecond)
94+
observed := observeValueForSomeTime(c, key, 500*time.Millisecond)
95+
96+
// wait until updater finishes
97+
<-ch
98+
99+
// With no limit, we should see most written values (we can lose some values if watching
100+
// code is busy while multiple new values are written)
101+
if len(observed) < 3*max/4 {
102+
t.Error("Expected at least 3/4 of all values, got", observed)
103+
}
104+
}
105+
106+
func TestReset(t *testing.T) {
107+
c := NewInMemoryClient(&stringCodec{})
108+
109+
const key = "test"
110+
const max = 5
111+
112+
// Make sure to start with non-empty value, otherwise WatchKey will bail
113+
_, _ = c.Put(&consul.KVPair{Key: key, Value: []byte("start")}, nil)
114+
115+
ch := make(chan error)
116+
go func() {
117+
defer close(ch)
118+
for i := 0; i <= max; i++ {
119+
level.Debug(util.Logger).Log("ts", time.Now(), "msg", "writing value", "val", i)
120+
_, _ = c.Put(&consul.KVPair{Key: key, Value: []byte(fmt.Sprintf("%d", i))}, nil)
121+
if i == 1 {
122+
c.kv.(*mockKV).ResetIndex()
123+
}
124+
if i == 2 {
125+
c.kv.(*mockKV).ResetIndexForKey(key)
126+
}
127+
time.Sleep(10 * time.Millisecond)
128+
}
129+
}()
130+
131+
observed := observeValueForSomeTime(c, key, 25*max*time.Millisecond)
132+
133+
// wait until updater finishes
134+
<-ch
135+
136+
// Let's see how many updates we have observed. Given the rate limit and our observing time, we should see all numeric values
137+
if testing.Verbose() {
138+
t.Log(observed)
139+
}
140+
if len(observed) < max {
141+
t.Error("Expected all values, got", observed)
142+
} else if observed[len(observed)-1] != fmt.Sprintf("%d", max) {
143+
t.Error("Expected to see last written value, got", observed)
144+
}
145+
}
146+
147+
func observeValueForSomeTime(client *Client, key string, timeout time.Duration) []string {
148+
observed := []string(nil)
149+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
150+
defer cancel()
151+
client.WatchKey(ctx, key, func(i interface{}) bool {
152+
s, ok := i.(string)
153+
if !ok {
154+
return false
155+
}
156+
level.Debug(util.Logger).Log("ts", time.Now(), "msg", "observed value", "val", s)
157+
observed = append(observed, s)
158+
return true
159+
})
160+
return observed
161+
}

pkg/ring/kv/consul/mock.go

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package consul
22

33
import (
4-
fmt "fmt"
4+
"fmt"
55
"sync"
66
"time"
77

@@ -21,6 +21,11 @@ type mockKV struct {
2121

2222
// NewInMemoryClient makes a new mock consul client.
2323
func NewInMemoryClient(codec codec.Codec) *Client {
24+
return NewInMemoryClientWithConfig(codec, Config{})
25+
}
26+
27+
// NewInMemoryClientWithConfig makes a new mock consul client with supplied Config.
28+
func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config) *Client {
2429
m := mockKV{
2530
kvps: map[string]*consul.KVPair{},
2631
}
@@ -29,6 +34,7 @@ func NewInMemoryClient(codec codec.Codec) *Client {
2934
return &Client{
3035
kv: &m,
3136
codec: codec,
37+
cfg: cfg,
3238
}
3339
}
3440

@@ -67,6 +73,8 @@ func (m *mockKV) Put(p *consul.KVPair, q *consul.WriteOptions) (*consul.WriteMet
6773
}
6874

6975
m.cond.Broadcast()
76+
77+
level.Debug(util.Logger).Log("msg", "Put", "key", p.Key, "value", fmt.Sprintf("%.40q", p.Value), "modify_index", m.current)
7078
return nil, nil
7179
}
7280

@@ -109,9 +117,16 @@ func (m *mockKV) Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consu
109117
return nil, &consul.QueryMeta{LastIndex: m.current}, nil
110118
}
111119

112-
if q.WaitTime > 0 {
120+
if q.WaitIndex >= value.ModifyIndex && q.WaitTime > 0 {
113121
deadline := time.Now().Add(q.WaitTime)
114-
for q.WaitIndex >= value.ModifyIndex && time.Now().Before(deadline) {
122+
if ctxDeadline, ok := q.Context().Deadline(); ok && ctxDeadline.Before(deadline) {
123+
// respect deadline from context, if set.
124+
deadline = ctxDeadline
125+
}
126+
127+
// simply wait until value.ModifyIndex changes. This allows us to test reporting old index values by resetting them.
128+
startModify := value.ModifyIndex
129+
for startModify == value.ModifyIndex && time.Now().Before(deadline) {
115130
m.cond.Wait()
116131
}
117132
if time.Now().After(deadline) {
@@ -144,3 +159,25 @@ func (m *mockKV) List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *c
144159
}
145160
return result, &consul.QueryMeta{LastIndex: m.current}, nil
146161
}
162+
163+
func (m *mockKV) ResetIndex() {
164+
m.mtx.Lock()
165+
defer m.mtx.Unlock()
166+
167+
m.current = 0
168+
m.cond.Broadcast()
169+
170+
level.Debug(util.Logger).Log("msg", "Reset")
171+
}
172+
173+
func (m *mockKV) ResetIndexForKey(key string) {
174+
m.mtx.Lock()
175+
defer m.mtx.Unlock()
176+
177+
if value, ok := m.kvps[key]; ok {
178+
value.ModifyIndex = 0
179+
}
180+
181+
m.cond.Broadcast()
182+
level.Debug(util.Logger).Log("msg", "ResetIndexForKey", "key", key)
183+
}

0 commit comments

Comments
 (0)