Skip to content

Commit 491b9de

Browse files
pstibranygouthamve
authored andcommitted
Fix for flaky test. (#1770)
* Fix for flaky test. Make sure we get most notifications, but not necessarily all. Signed-off-by: Peter Štibraný <[email protected]> * Different fix for flaky test. Timeout for entire test is now longer, but we end early if there is no value for 500ms. We again check that all keys were received exactly once, no room for exceptions. Signed-off-by: Peter Štibraný <[email protected]> * Stop test early if we have observed enough keys. Signed-off-by: Peter Štibraný <[email protected]>
1 parent 2e21f82 commit 491b9de

File tree

1 file changed

+36
-17
lines changed

1 file changed

+36
-17
lines changed

pkg/ring/kv/kv_test.go

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -150,38 +150,57 @@ func TestWatchPrefix(t *testing.T) {
150150

151151
const max = 100
152152
const sleep = time.Millisecond * 10
153+
const totalTestTimeout = 3 * max * sleep
153154

154-
gen := func(p string, ch chan struct{}) {
155-
defer close(ch)
156-
for i := 0; i < max; i++ {
157-
// Start with sleeping, so that watching client see empty KV store at the beginning.
155+
observedKeysCh := make(chan string, max)
156+
157+
ctx, cancel := context.WithCancel(context.Background())
158+
defer cancel()
159+
go func() {
160+
// start watching before we even start generating values. values will be buffered
161+
client.WatchPrefix(ctx, prefix, func(key string, val interface{}) bool {
162+
observedKeysCh <- key
163+
return true
164+
})
165+
}()
166+
167+
gen := func(p string) {
168+
for i := 0; i < max && ctx.Err() == nil; i++ {
169+
// Start with sleeping, so that watching client can see empty KV store at the beginning.
158170
time.Sleep(sleep)
159171

160172
key := fmt.Sprintf("%s%d", p, i)
161173
err := client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) {
162174
return key, true, nil
163175
})
176+
177+
if ctx.Err() != nil {
178+
break
179+
}
164180
require.NoError(t, err)
165181
}
166182
}
167183

168-
ch1 := make(chan struct{})
169-
ch2 := make(chan struct{})
170-
go gen(prefix, ch1)
171-
go gen(prefix2, ch2) // we don't want to see these keys reported
184+
go gen(prefix)
185+
go gen(prefix2) // we don't want to see these keys reported
172186

173187
observedKeys := map[string]int{}
174-
ctx, cfn := context.WithTimeout(context.Background(), 1.5*max*sleep)
175-
defer cfn()
176188

177-
client.WatchPrefix(ctx, prefix, func(key string, val interface{}) bool {
178-
observedKeys[key] = observedKeys[key] + 1
179-
return true
180-
})
189+
totalDeadline := time.After(totalTestTimeout)
190+
191+
for watching := true; watching; {
192+
select {
193+
case <-totalDeadline:
194+
watching = false
195+
case key := <-observedKeysCh:
196+
observedKeys[key]++
197+
if len(observedKeys) == max {
198+
watching = false
199+
}
200+
}
201+
}
181202

182-
// wait until updaters finish (should be done by now)
183-
<-ch1
184-
<-ch2
203+
cancel() // stop all goroutines
185204

186205
// verify that each key was reported once, and keys outside prefix were not reported
187206
for i := 0; i < max; i++ {

0 commit comments

Comments
 (0)