Skip to content

Commit 9afac71

Browse files
amscannegvisor-bot
authored andcommitted
Support synchronous AssertAndFetch for sleep package.
Some synchronization patterns require the ability to simultaneously wake and sleep a goroutine. For the sleep package, this is the case when a waker must be asserted when a subsequent fetch is imminent. Currently, this operation results in significant P churn in the runtime, which ping-pongs execution between multiple system threads and cores and consumes a significant amount of host CPU (and because of the context switches, this can be significant worse with mitigations for side channel vulnerabilities). The solution is to introduce a dedicated mechanism for a synchronous switch which does not wake another runtime P (see golang/go#32113). This can be used by the `AssertAndFetch` API in the sleep package. The benchmark results for this package are very similiar to raw channel operations for all cases, with the exception of operations that do not wait. The primary advantage is more precise control over scheduling. This will be used in a subsequent change. ``` BenchmarkGoAssertNonWaiting BenchmarkGoAssertNonWaiting-8 261364384 4.976 ns/op BenchmarkGoSingleSelect BenchmarkGoSingleSelect-8 20946358 57.77 ns/op BenchmarkGoMultiSelect BenchmarkGoMultiSelect-8 6071697 197.0 ns/op BenchmarkGoWaitOnSingleSelect BenchmarkGoWaitOnSingleSelect-8 4978051 235.4 ns/op BenchmarkGoWaitOnMultiSelect BenchmarkGoWaitOnMultiSelect-8 2309224 520.2 ns/op BenchmarkSleeperAssertNonWaiting BenchmarkSleeperAssertNonWaiting-8 447325033 2.657 ns/op BenchmarkSleeperSingleSelect BenchmarkSleeperSingleSelect-8 21488844 55.19 ns/op BenchmarkSleeperMultiSelect BenchmarkSleeperMultiSelect-8 21851674 54.89 ns/op BenchmarkSleeperWaitOnSingleSelect BenchmarkSleeperWaitOnSingleSelect-8 2860327 416.4 ns/op BenchmarkSleeperWaitOnSingleSelectSync BenchmarkSleeperWaitOnSingleSelectSync-8 2741733 427.1 ns/op BenchmarkSleeperWaitOnMultiSelect BenchmarkSleeperWaitOnMultiSelect-8 2867484 418.1 ns/op BenchmarkSleeperWaitOnMultiSelectSync BenchmarkSleeperWaitOnMultiSelectSync-8 2789158 427.9 ns/op ``` PiperOrigin-RevId: 415581417
1 parent 4d29819 commit 9afac71

File tree

9 files changed

+299
-26
lines changed

9 files changed

+299
-26
lines changed

pkg/sleep/sleep_test.go

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ package sleep
1717
import (
1818
"math/rand"
1919
"runtime"
20+
"sync"
21+
"sync/atomic"
2022
"testing"
2123
"time"
2224
)
@@ -332,6 +334,50 @@ func TestDoneFunction(t *testing.T) {
332334
}
333335
}
334336

337+
// TestAssertFetch tests basic assert fetch functionality.
338+
func TestAssertFetch(t *testing.T) {
339+
const sleeperWakers = 100
340+
const wakeRequests = 1000
341+
const seedAsserts = 10
342+
343+
ws := make([]Waker, sleeperWakers)
344+
ss := make([]Sleeper, sleeperWakers)
345+
for i := 0; i < sleeperWakers; i++ {
346+
ss[i].AddWaker(&ws[i])
347+
}
348+
defer func() {
349+
for i := 0; i < sleeperWakers; i++ {
350+
defer ss[i].Done()
351+
}
352+
}()
353+
var (
354+
count int32
355+
wg sync.WaitGroup
356+
)
357+
for i := 0; i < sleeperWakers; i++ {
358+
wg.Add(1)
359+
go func(i int) {
360+
defer wg.Done()
361+
ss[i].Fetch(true /* block */)
362+
w := &ws[(i+1)%sleeperWakers]
363+
for n := 0; n < wakeRequests; n++ {
364+
atomic.AddInt32(&count, 1)
365+
ss[i].AssertAndFetch(w)
366+
}
367+
w.Assert() // Final wake-up.
368+
}(i)
369+
}
370+
371+
// Fire the first assertion.
372+
ws[0].Assert()
373+
wg.Wait()
374+
375+
// Check what we got.
376+
if want := int32(sleeperWakers * wakeRequests); count != want {
377+
t.Errorf("unexpected count: got %d, wanted %d", count, want)
378+
}
379+
}
380+
335381
// TestRace tests that multiple wakers can continuously send wake requests to
336382
// the sleeper.
337383
func TestRace(t *testing.T) {
@@ -511,6 +557,29 @@ func BenchmarkSleeperWaitOnSingleSelect(b *testing.B) {
511557
}
512558
}
513559

560+
// BenchmarkSleeperWaitOnSingleSelectSync is a modification of the similarly
561+
// named benchmark, except it uses the synchronous AssertAndFetch.
562+
func BenchmarkSleeperWaitOnSingleSelectSync(b *testing.B) {
563+
var (
564+
s Sleeper
565+
w Waker
566+
ns Sleeper
567+
nw Waker
568+
)
569+
ns.AddWaker(&nw)
570+
s.AddWaker(&w)
571+
go func() {
572+
ns.Fetch(true)
573+
defer w.Assert()
574+
for i := 0; i < b.N-1; i++ {
575+
ns.AssertAndFetch(&w)
576+
}
577+
}()
578+
for i := 0; i < b.N; i++ {
579+
s.AssertAndFetch(&nw)
580+
}
581+
}
582+
514583
// BenchmarkGoWaitOnSingleSelect measures how long it takes to wait on one
515584
// channel while another goroutine wakes up the sleeper.
516585
func BenchmarkGoWaitOnSingleSelect(b *testing.B) {
@@ -556,6 +625,34 @@ func BenchmarkSleeperWaitOnMultiSelect(b *testing.B) {
556625
}
557626
}
558627

628+
// BenchmarkSleeperWaitOnMultiSelectSync is a modification of the similarly
629+
// named benchmark, except it uses the synchronous AssertAndFetch.
630+
func BenchmarkSleeperWaitOnMultiSelectSync(b *testing.B) {
631+
const count = 4
632+
var (
633+
s Sleeper
634+
ns Sleeper
635+
nw Waker
636+
)
637+
ns.AddWaker(&nw)
638+
w := make([]Waker, count)
639+
for i := range w {
640+
s.AddWaker(&w[i])
641+
}
642+
643+
b.ResetTimer()
644+
go func() {
645+
ns.Fetch(true)
646+
defer w[count-1].Assert()
647+
for i := 0; i < b.N-1; i++ {
648+
ns.AssertAndFetch(&w[count-1])
649+
}
650+
}()
651+
for i := 0; i < b.N; i++ {
652+
s.AssertAndFetch(&nw)
653+
}
654+
}
655+
559656
// BenchmarkGoWaitOnMultiSelect measures how long it takes to wait on 4 channels
560657
// while another goroutine wakes up the sleeper.
561658
func BenchmarkGoWaitOnMultiSelect(b *testing.B) {

pkg/sleep/sleep_unsafe.go

Lines changed: 100 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,14 @@ var (
9898
// returns. These restrictions allow this to be implemented lock-free.
9999
//
100100
// This struct is thread-compatible.
101+
//
102+
// +stateify savable
101103
type Sleeper struct {
104+
_ sync.NoCopy
105+
102106
// sharedList is a "stack" of asserted wakers. They atomically add
103107
// themselves to the front of this list as they become asserted.
104-
sharedList unsafe.Pointer
108+
sharedList unsafe.Pointer `state:".(*Waker)"`
105109

106110
// localList is a list of asserted wakers that is only accessible to the
107111
// waiter, and thus doesn't have to be accessed atomically. When
@@ -116,7 +120,17 @@ type Sleeper struct {
116120

117121
// waitingG holds the G that is sleeping, if any. It is used by wakers
118122
// to determine which G, if any, they should wake.
119-
waitingG uintptr
123+
waitingG uintptr `state:"zero"`
124+
}
125+
126+
// saveSharedList is invoked by stateify.
127+
func (s *Sleeper) saveSharedList() *Waker {
128+
return (*Waker)(atomic.LoadPointer(&s.sharedList))
129+
}
130+
131+
// loadSharedList is invoked by stateify.
132+
func (s *Sleeper) loadSharedList(w *Waker) {
133+
atomic.StorePointer(&s.sharedList, unsafe.Pointer(w))
120134
}
121135

122136
// AddWaker associates the given waker to the sleeper.
@@ -137,7 +151,7 @@ func (s *Sleeper) AddWaker(w *Waker) {
137151
for {
138152
p := (*Sleeper)(atomic.LoadPointer(&w.s))
139153
if p == &assertedSleeper {
140-
s.enqueueAssertedWaker(w)
154+
s.enqueueAssertedWaker(w, true /* wakep */)
141155
return
142156
}
143157

@@ -148,8 +162,11 @@ func (s *Sleeper) AddWaker(w *Waker) {
148162
}
149163

150164
// nextWaker returns the next waker in the notification list, blocking if
151-
// needed.
152-
func (s *Sleeper) nextWaker(block bool) *Waker {
165+
// needed. The parameter wakepOrSleep indicates that if the operation does not
166+
// block, then we will need to explicitly wake a runtime P.
167+
//
168+
// Precondition: wakepOrSleep may be true iff block is true.
169+
func (s *Sleeper) nextWaker(block, wakepOrSleep bool) *Waker {
153170
// Attempt to replenish the local list if it's currently empty.
154171
if s.localList == nil {
155172
for atomic.LoadPointer(&s.sharedList) == nil {
@@ -173,6 +190,10 @@ func (s *Sleeper) nextWaker(block bool) *Waker {
173190
break
174191
}
175192

193+
// Since we are sleeping for sure, we no longer
194+
// need to wakep once we get a value.
195+
wakepOrSleep = false
196+
176197
// Try to commit the sleep and report it to the
177198
// tracer as a select.
178199
//
@@ -203,6 +224,11 @@ func (s *Sleeper) nextWaker(block bool) *Waker {
203224
w := s.localList
204225
s.localList = w.next
205226

227+
// Do we need to wake a P?
228+
if wakepOrSleep {
229+
sync.Wakep()
230+
}
231+
206232
return w
207233
}
208234

@@ -218,17 +244,13 @@ func commitSleep(g uintptr, waitingG unsafe.Pointer) bool {
218244
return sync.RaceUncheckedAtomicCompareAndSwapUintptr((*uintptr)(waitingG), preparingG, g)
219245
}
220246

221-
// Fetch fetches the next wake-up notification. If a notification is
222-
// immediately available, the asserted waker is returned immediately.
223-
// Otherwise, the behavior depends on the value of 'block': if true, the
224-
// current goroutine blocks until a notification arrives and returns the
225-
// asserted waker; if false, nil will be returned.
247+
// fetch is the backing implementation for Fetch and AssertAndFetch.
226248
//
227-
// N.B. This method is *not* thread-safe. Only one goroutine at a time is
228-
// allowed to call this method.
229-
func (s *Sleeper) Fetch(block bool) *Waker {
249+
// Preconditions are the same as nextWaker.
250+
//go:nosplit
251+
func (s *Sleeper) fetch(block, wakepOrSleep bool) *Waker {
230252
for {
231-
w := s.nextWaker(block)
253+
w := s.nextWaker(block, wakepOrSleep)
232254
if w == nil {
233255
return nil
234256
}
@@ -242,6 +264,31 @@ func (s *Sleeper) Fetch(block bool) *Waker {
242264
}
243265
}
244266

267+
// Fetch fetches the next wake-up notification. If a notification is
268+
// immediately available, the asserted waker is returned immediately.
269+
// Otherwise, the behavior depends on the value of 'block': if true, the
270+
// current goroutine blocks until a notification arrives and returns the
271+
// asserted waker; if false, nil will be returned.
272+
//
273+
// N.B. This method is *not* thread-safe. Only one goroutine at a time is
274+
// allowed to call this method.
275+
func (s *Sleeper) Fetch(block bool) *Waker {
276+
return s.fetch(block, false /* wakepOrSleep */)
277+
}
278+
279+
// AssertAndFetch asserts the given waker and fetches the next wake-up notification.
280+
// Note that this will always be blocking, since there is no value in joining a
281+
// non-blocking operation.
282+
//
283+
// N.B. Like Fetch, this method is *not* thread-safe. This will also yield the current
284+
// P to the next goroutine, avoiding associated scheduled overhead.
285+
//+checkescapes:all
286+
//go:nosplit
287+
func (s *Sleeper) AssertAndFetch(n *Waker) *Waker {
288+
n.assert(false /* wakep */)
289+
return s.fetch(true /* block */, true /* wakepOrSleep*/)
290+
}
291+
245292
// Done is used to indicate that the caller won't use this Sleeper anymore. It
246293
// removes the association with all wakers so that they can be safely reused
247294
// by another sleeper after Done() returns.
@@ -264,7 +311,7 @@ func (s *Sleeper) Done() {
264311
// Dequeue exactly one waiter from the list, it may not be
265312
// this one but we know this one is in the process. We must
266313
// leave it in the asserted state but drop it from our lists.
267-
if w := s.nextWaker(true); w != nil {
314+
if w := s.nextWaker(true, false); w != nil {
268315
prev := &s.allWakers
269316
for *prev != w {
270317
prev = &((*prev).allWakersNext)
@@ -278,7 +325,7 @@ func (s *Sleeper) Done() {
278325

279326
// enqueueAssertedWaker enqueues an asserted waker to the "ready" circular list
280327
// of wakers that want to notify the sleeper.
281-
func (s *Sleeper) enqueueAssertedWaker(w *Waker) {
328+
func (s *Sleeper) enqueueAssertedWaker(w *Waker, wakep bool) {
282329
// Add the new waker to the front of the list.
283330
for {
284331
v := (*Waker)(atomic.LoadPointer(&s.sharedList))
@@ -298,7 +345,7 @@ func (s *Sleeper) enqueueAssertedWaker(w *Waker) {
298345
case 0, preparingG:
299346
default:
300347
// We managed to get a G. Wake it up.
301-
sync.Goready(g, 0)
348+
sync.Goready(g, 0, wakep)
302349
}
303350
}
304351

@@ -315,6 +362,8 @@ func (s *Sleeper) enqueueAssertedWaker(w *Waker) {
315362
//
316363
// Note, it is not safe to copy a Waker as its fields are modified by value
317364
// (the pointer fields are individually modified with atomic operations).
365+
//
366+
// +stateify savable
318367
type Waker struct {
319368
_ sync.NoCopy
320369

@@ -327,7 +376,7 @@ type Waker struct {
327376
// otherwise -- the waker is not asserted, and is associated with the
328377
// given sleeper. Once it transitions to asserted state, the
329378
// associated sleeper will be woken.
330-
s unsafe.Pointer
379+
s unsafe.Pointer `state:".(wakerState)"`
331380

332381
// next is used to form a linked list of asserted wakers in a sleeper.
333382
next *Waker
@@ -337,9 +386,32 @@ type Waker struct {
337386
allWakersNext *Waker
338387
}
339388

340-
// Assert moves the waker to an asserted state, if it isn't asserted yet. When
341-
// asserted, the waker will cause its matching sleeper to wake up.
342-
func (w *Waker) Assert() {
389+
type wakerState struct {
390+
asserted bool
391+
other *Sleeper
392+
}
393+
394+
// saveS is invoked by stateify.
395+
func (w *Waker) saveS() wakerState {
396+
s := (*Sleeper)(atomic.LoadPointer(&w.s))
397+
if s == &assertedSleeper {
398+
return wakerState{asserted: true}
399+
}
400+
return wakerState{other: s}
401+
}
402+
403+
// loadS is invoked by stateify.
404+
func (w *Waker) loadS(ws wakerState) {
405+
if ws.asserted {
406+
atomic.StorePointer(&w.s, unsafe.Pointer(&assertedSleeper))
407+
} else {
408+
atomic.StorePointer(&w.s, unsafe.Pointer(ws.other))
409+
}
410+
}
411+
412+
// assert is the implementation for Assert.
413+
//go:nosplit
414+
func (w *Waker) assert(wakep bool) {
343415
// Nothing to do if the waker is already asserted. This check allows us
344416
// to complete this case (already asserted) without any interlocked
345417
// operations on x86.
@@ -352,10 +424,16 @@ func (w *Waker) Assert() {
352424
case nil:
353425
case &assertedSleeper:
354426
default:
355-
s.enqueueAssertedWaker(w)
427+
s.enqueueAssertedWaker(w, wakep)
356428
}
357429
}
358430

431+
// Assert moves the waker to an asserted state, if it isn't asserted yet. When
432+
// asserted, the waker will cause its matching sleeper to wake up.
433+
func (w *Waker) Assert() {
434+
w.assert(true /* wakep */)
435+
}
436+
359437
// Clear moves the waker to then non-asserted state and returns whether it was
360438
// asserted before being cleared.
361439
//

pkg/sync/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ go_library(
2222
"race_amd64.s",
2323
"race_arm64.s",
2424
"race_unsafe.go",
25+
"runtime_amd64.go",
26+
"runtime_amd64.s",
27+
"runtime_other.go",
2528
"runtime_unsafe.go",
2629
"rwmutex_unsafe.go",
2730
"seqcount.go",

pkg/sync/runtime_amd64.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2020 The gVisor Authors.
2+
//
3+
// Use of this source code is governed by a BSD-style
4+
// license that can be found in the LICENSE file.
5+
6+
//go:build amd64 && go1.8 && !go1.19 && !goexperiment.staticlockranking
7+
// +build amd64,go1.8,!go1.19,!goexperiment.staticlockranking
8+
9+
package sync
10+
11+
import (
12+
"sync/atomic"
13+
)
14+
15+
const supportsWakeSuppression = true
16+
17+
// addrOfSpinning returns the address of runtime.sched.nmspinning.
18+
func addrOfSpinning() *int32
19+
20+
// nmspinning caches addrOfSpinning.
21+
var nmspinning = addrOfSpinning()
22+
23+
func preGoReadyWakeSuppression() {
24+
atomic.AddInt32(nmspinning, 1)
25+
}
26+
27+
func postGoReadyWakeSuppression() {
28+
atomic.AddInt32(nmspinning, -1)
29+
}

0 commit comments

Comments
 (0)