@@ -3,35 +3,62 @@ package sync
3
3
import "internal/task"
4
4
5
5
type WaitGroup struct {
6
- counter uint
7
- waiters task.Stack
6
+ futex task.Futex
8
7
}
9
8
10
9
func (wg * WaitGroup ) Add (delta int ) {
11
10
if delta > 0 {
12
11
// Check for overflow.
13
- if uint (delta ) > (^ uint (0 ))- wg .counter {
14
- panic ("sync: WaitGroup counter overflowed" )
15
- }
12
+ for {
13
+ counter := wg .futex .Load ()
14
+ if uint32 (delta ) > (^ uint32 (0 ))- counter {
15
+ panic ("sync: WaitGroup counter overflowed" )
16
+ }
16
17
17
- // Add to the counter.
18
- wg .counter += uint (delta )
19
- } else {
20
- // Check for underflow.
21
- if uint (- delta ) > wg .counter {
22
- panic ("sync: negative WaitGroup counter" )
18
+ // Add to the counter.
19
+ if wg .futex .CompareAndSwap (counter , counter + uint32 (delta )) {
20
+ // Successfully added.
21
+ return
22
+ }
23
23
}
24
+ } else {
25
+ for {
26
+ counter := wg .futex .Load ()
27
+
28
+ // Check for underflow.
29
+ if uint32 (- delta ) > counter {
30
+ panic ("sync: negative WaitGroup counter" )
31
+ }
24
32
25
- // Subtract from the counter.
26
- wg .counter -= uint (- delta )
33
+ // Subtract from the counter.
34
+ if ! wg .futex .CompareAndSwap (counter , counter - uint32 (- delta )) {
35
+ // Could not swap, trying again.
36
+ continue
37
+ }
27
38
28
- // If the counter is zero, everything is done and the waiters should be resumed.
29
- // This code assumes that the waiters cannot wake up until after this function returns.
30
- // In the current implementation, this is always correct.
31
- if wg .counter == 0 {
32
- for t := wg .waiters .Pop (); t != nil ; t = wg .waiters .Pop () {
33
- scheduleTask (t )
39
+ // If the counter is zero, everything is done and the waiters should
40
+ // be resumed.
41
+ // When there are multiple thread, there is a chance for the counter
42
+ // to go to zero, WakeAll to be called, and then the counter to be
43
+ // incremented again before a waiting goroutine has a chance to
44
+ // check the new (zero) value. However the last increment is
45
+ // explicitly given in the docs as something that should not be
46
+ // done:
47
+ //
48
+ // > Note that calls with a positive delta that occur when the
49
+ // > counter is zero must happen before a Wait.
50
+ //
51
+ // So we're fine here.
52
+ if counter - uint32 (- delta ) == 0 {
53
+ // TODO: this is not the most efficient implementation possible
54
+ // because we wake up all waiters unconditionally, even if there
55
+ // might be none. Though since the common usage is for this to
56
+ // be called with at least one waiter, it's probably fine.
57
+ wg .futex .WakeAll ()
34
58
}
59
+
60
+ // Successfully swapped (and woken all waiting tasks if needed).
61
+ return
35
62
}
36
63
}
37
64
}
@@ -41,14 +68,15 @@ func (wg *WaitGroup) Done() {
41
68
}
42
69
43
70
func (wg * WaitGroup ) Wait () {
44
- if wg .counter == 0 {
45
- // Everything already finished.
46
- return
47
- }
48
-
49
- // Push the current goroutine onto the waiter stack.
50
- wg .waiters .Push (task .Current ())
71
+ for {
72
+ counter := wg .futex .Load ()
73
+ if counter == 0 {
74
+ return // everything already finished
75
+ }
51
76
52
- // Pause until the waiters are awoken by Add/Done.
53
- task .Pause ()
77
+ if wg .futex .Wait (counter ) {
78
+ // Successfully woken by WakeAll (in wg.Add).
79
+ break
80
+ }
81
+ }
54
82
}
0 commit comments