Skip to content

Commit 0976fa6

Browse files
changkungopherbot
authored andcommitted
x/sync/errgroup: add TryGo and SetLimit to control concurrency
This benchmark shows the difference between two implementations. Using explicit waiter with mutex (old, before PS3) or channel (new, since PS4). There is no significant difference at a measure: name old time/op new time/op delta Go-8 247ns ±10% 245ns ±10% ~ (p=0.571 n=5+10) name old alloc/op new alloc/op delta Go-8 48.0B ± 0% 40.0B ± 0% -16.67% (p=0.000 n=5+10) name old allocs/op new allocs/op delta Go-8 2.00 ± 0% 2.00 ± 0% ~ (all equal) Fixes golang/go#27837 Change-Id: I60247f1a2a1cdce2b180f10b409e37de8b82341e Reviewed-on: https://go-review.googlesource.com/c/sync/+/405174 Reviewed-by: Bryan Mills <[email protected]> Reviewed-by: Heschi Kreinick <[email protected]> TryBot-Result: Gopher Robot <[email protected]> Run-TryBot: Changkun Ou <[email protected]> Auto-Submit: Bryan Mills <[email protected]>
1 parent 036812b commit 0976fa6

File tree

2 files changed

+152
-1
lines changed

2 files changed

+152
-1
lines changed

errgroup/errgroup.go

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,12 @@ package errgroup
88

99
import (
1010
"context"
11+
"fmt"
1112
"sync"
1213
)
1314

15+
type token struct{}
16+
1417
// A Group is a collection of goroutines working on subtasks that are part of
1518
// the same overall task.
1619
//
@@ -20,10 +23,19 @@ type Group struct {
2023

2124
wg sync.WaitGroup
2225

26+
sem chan token
27+
2328
errOnce sync.Once
2429
err error
2530
}
2631

32+
func (g *Group) done() {
33+
if g.sem != nil {
34+
<-g.sem
35+
}
36+
g.wg.Done()
37+
}
38+
2739
// WithContext returns a new Group and an associated Context derived from ctx.
2840
//
2941
// The derived Context is canceled the first time a function passed to Go
@@ -45,14 +57,48 @@ func (g *Group) Wait() error {
4557
}
4658

4759
// Go calls the given function in a new goroutine.
60+
// It blocks until the new goroutine can be added without the number of
61+
// active goroutines in the group exceeding the configured limit.
4862
//
4963
// The first call to return a non-nil error cancels the group; its error will be
5064
// returned by Wait.
5165
func (g *Group) Go(f func() error) {
66+
if g.sem != nil {
67+
g.sem <- token{}
68+
}
69+
5270
g.wg.Add(1)
71+
go func() {
72+
defer g.done()
73+
74+
if err := f(); err != nil {
75+
g.errOnce.Do(func() {
76+
g.err = err
77+
if g.cancel != nil {
78+
g.cancel()
79+
}
80+
})
81+
}
82+
}()
83+
}
84+
85+
// TryGo calls the given function in a new goroutine only if the number of
86+
// active goroutines in the group is currently below the configured limit.
87+
//
88+
// The return value reports whether the goroutine was started.
89+
func (g *Group) TryGo(f func() error) bool {
90+
if g.sem != nil {
91+
select {
92+
case g.sem <- token{}:
93+
// Note: this allows barging iff channels in general allow barging.
94+
default:
95+
return false
96+
}
97+
}
5398

99+
g.wg.Add(1)
54100
go func() {
55-
defer g.wg.Done()
101+
defer g.done()
56102

57103
if err := f(); err != nil {
58104
g.errOnce.Do(func() {
@@ -63,4 +109,23 @@ func (g *Group) Go(f func() error) {
63109
})
64110
}
65111
}()
112+
return true
113+
}
114+
115+
// SetLimit limits the number of active goroutines in this group to at most n.
116+
// A negative value indicates no limit.
117+
//
118+
// Any subsequent call to the Go method will block until it can add an active
119+
// goroutine without exceeding the configured limit.
120+
//
121+
// The limit must not be modified while any goroutines in the group are active.
122+
func (g *Group) SetLimit(n int) {
123+
if n < 0 {
124+
g.sem = nil
125+
return
126+
}
127+
if len(g.sem) != 0 {
128+
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", len(g.sem)))
129+
}
130+
g.sem = make(chan token, n)
66131
}

errgroup/errgroup_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010
"fmt"
1111
"net/http"
1212
"os"
13+
"sync/atomic"
1314
"testing"
15+
"time"
1416

1517
"golang.org/x/sync/errgroup"
1618
)
@@ -174,3 +176,87 @@ func TestWithContext(t *testing.T) {
174176
}
175177
}
176178
}
179+
180+
func TestTryGo(t *testing.T) {
181+
g := &errgroup.Group{}
182+
n := 42
183+
g.SetLimit(42)
184+
ch := make(chan struct{})
185+
fn := func() error {
186+
ch <- struct{}{}
187+
return nil
188+
}
189+
for i := 0; i < n; i++ {
190+
if !g.TryGo(fn) {
191+
t.Fatalf("TryGo should succeed but got fail at %d-th call.", i)
192+
}
193+
}
194+
if g.TryGo(fn) {
195+
t.Fatalf("TryGo is expected to fail but succeeded.")
196+
}
197+
go func() {
198+
for i := 0; i < n; i++ {
199+
<-ch
200+
}
201+
}()
202+
g.Wait()
203+
204+
if !g.TryGo(fn) {
205+
t.Fatalf("TryGo should success but got fail after all goroutines.")
206+
}
207+
go func() { <-ch }()
208+
g.Wait()
209+
210+
// Switch limit.
211+
g.SetLimit(1)
212+
if !g.TryGo(fn) {
213+
t.Fatalf("TryGo should success but got failed.")
214+
}
215+
if g.TryGo(fn) {
216+
t.Fatalf("TryGo should fail but succeeded.")
217+
}
218+
go func() { <-ch }()
219+
g.Wait()
220+
221+
// Block all calls.
222+
g.SetLimit(0)
223+
for i := 0; i < 1<<10; i++ {
224+
if g.TryGo(fn) {
225+
t.Fatalf("TryGo should fail but got succeded.")
226+
}
227+
}
228+
g.Wait()
229+
}
230+
231+
func TestGoLimit(t *testing.T) {
232+
const limit = 10
233+
234+
g := &errgroup.Group{}
235+
g.SetLimit(limit)
236+
var active int32
237+
for i := 0; i <= 1<<10; i++ {
238+
g.Go(func() error {
239+
n := atomic.AddInt32(&active, 1)
240+
if n > limit {
241+
return fmt.Errorf("saw %d active goroutines; want ≤ %d", n, limit)
242+
}
243+
time.Sleep(1 * time.Microsecond) // Give other goroutines a chance to increment active.
244+
atomic.AddInt32(&active, -1)
245+
return nil
246+
})
247+
}
248+
if err := g.Wait(); err != nil {
249+
t.Fatal(err)
250+
}
251+
}
252+
253+
func BenchmarkGo(b *testing.B) {
254+
fn := func() {}
255+
g := &errgroup.Group{}
256+
b.ResetTimer()
257+
b.ReportAllocs()
258+
for i := 0; i < b.N; i++ {
259+
g.Go(func() error { fn(); return nil })
260+
}
261+
g.Wait()
262+
}

0 commit comments

Comments
 (0)