Skip to content

Commit adb384a

Browse files
0introbradfitz
authored andcommitted
net: implement asynchonous cancelable I/O on Plan 9
This change is an experimental implementation of asynchronous cancelable I/O operations on Plan 9, which are required to implement deadlines. There are no asynchronous syscalls on Plan 9. I/O operations are performed with blocking pread and pwrite syscalls. Implementing deadlines in Go requires a way to interrupt I/O operations. It is possible to interrupt reads and writes on a TCP connection by forcing the closure of the TCP connection. This approach has been used successfully in CL 31390. However, we can't implement deadlines with this method, since we require to be able to reuse the connection after the timeout. On Plan 9, I/O operations are interrupted when the process receives a note. We can rely on this behavior to implement a more generic approach. When doing an I/O operation (read or write), we start the I/O in its own process, then wait for the result asynchronously. The process is able to handle the "hangup" note. When receiving the "hangup" note, the currently running I/O operation is canceled and the process returns. This way, deadlines can be implemented by sending an "hangup" note to the process running the blocking I/O operation, after the expiration of a timer. Fixes #11932. Fixes #17498. Change-Id: I414f72c7a9a4f9b8f9c09ed3b6c269f899d9b430 Reviewed-on: https://go-review.googlesource.com/31521 Reviewed-by: Brad Fitzpatrick <[email protected]>
1 parent 456f2f5 commit adb384a

File tree

6 files changed

+231
-5
lines changed

6 files changed

+231
-5
lines changed

src/net/fd_io_plan9.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright 2016 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package net
6+
7+
import (
8+
"os"
9+
"runtime"
10+
"sync"
11+
"syscall"
12+
)
13+
14+
// asyncIO implements asynchronous cancelable I/O.
15+
// An asyncIO represents a single asynchronous Read or Write
16+
// operation. The result is returned on the result channel.
17+
// The undergoing I/O system call can either complete or be
18+
// interrupted by a note.
19+
type asyncIO struct {
20+
res chan result
21+
22+
// mu guards the pid field.
23+
mu sync.Mutex
24+
25+
// pid holds the process id of
26+
// the process running the IO operation.
27+
pid int
28+
}
29+
30+
// result is the return value of a Read or Write operation.
31+
type result struct {
32+
n int
33+
err error
34+
}
35+
36+
// newAsyncIO returns a new asyncIO that performs an I/O
37+
// operation by calling fn, which must do one and only one
38+
// interruptible system call.
39+
func newAsyncIO(fn func([]byte) (int, error), b []byte) *asyncIO {
40+
aio := &asyncIO{
41+
res: make(chan result, 0),
42+
}
43+
aio.mu.Lock()
44+
go func() {
45+
// Lock the current goroutine to its process
46+
// and store the pid in io so that Cancel can
47+
// interrupt it. We ignore the "hangup" signal,
48+
// so the signal does not take down the entire
49+
// Go runtime.
50+
runtime.LockOSThread()
51+
runtime_ignoreHangup()
52+
aio.pid = os.Getpid()
53+
aio.mu.Unlock()
54+
55+
n, err := fn(b)
56+
57+
aio.mu.Lock()
58+
aio.pid = -1
59+
runtime_unignoreHangup()
60+
aio.mu.Unlock()
61+
62+
aio.res <- result{n, err}
63+
}()
64+
return aio
65+
}
66+
67+
var hangupNote os.Signal = syscall.Note("hangup")
68+
69+
// Cancel interrupts the I/O operation, causing
70+
// the Wait function to return.
71+
func (aio *asyncIO) Cancel() {
72+
aio.mu.Lock()
73+
defer aio.mu.Unlock()
74+
if aio.pid == -1 {
75+
return
76+
}
77+
proc, err := os.FindProcess(aio.pid)
78+
if err != nil {
79+
return
80+
}
81+
proc.Signal(hangupNote)
82+
}
83+
84+
// Wait for the I/O operation to complete.
85+
func (aio *asyncIO) Wait() (int, error) {
86+
res := <-aio.res
87+
return res.n, res.err
88+
}
89+
90+
// The following functions, provided by the runtime, are used to
91+
// ignore and unignore the "hangup" signal received by the process.
92+
func runtime_ignoreHangup()
93+
func runtime_unignoreHangup()

src/net/fd_plan9.go

Lines changed: 100 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,17 @@ package net
77
import (
88
"io"
99
"os"
10+
"sync/atomic"
1011
"syscall"
1112
"time"
1213
)
1314

15+
type atomicBool int32
16+
17+
func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 }
18+
func (b *atomicBool) setFalse() { atomic.StoreInt32((*int32)(b), 0) }
19+
func (b *atomicBool) setTrue() { atomic.StoreInt32((*int32)(b), 1) }
20+
1421
// Network file descriptor.
1522
type netFD struct {
1623
// locking/lifetime of sysfd + serialize access to Read and Write methods
@@ -23,6 +30,14 @@ type netFD struct {
2330
listen, ctl, data *os.File
2431
laddr, raddr Addr
2532
isStream bool
33+
34+
// deadlines
35+
raio *asyncIO
36+
waio *asyncIO
37+
rtimer *time.Timer
38+
wtimer *time.Timer
39+
rtimedout atomicBool // set true when read deadline has been reached
40+
wtimedout atomicBool // set true when write deadline has been reached
2641
}
2742

2843
var (
@@ -84,6 +99,9 @@ func (fd *netFD) destroy() {
8499
}
85100

86101
func (fd *netFD) Read(b []byte) (n int, err error) {
102+
if fd.rtimedout.isSet() {
103+
return 0, errTimeout
104+
}
87105
if !fd.ok() || fd.data == nil {
88106
return 0, syscall.EINVAL
89107
}
@@ -94,10 +112,15 @@ func (fd *netFD) Read(b []byte) (n int, err error) {
94112
if len(b) == 0 {
95113
return 0, nil
96114
}
97-
n, err = fd.data.Read(b)
115+
fd.raio = newAsyncIO(fd.data.Read, b)
116+
n, err = fd.raio.Wait()
117+
fd.raio = nil
98118
if isHangup(err) {
99119
err = io.EOF
100120
}
121+
if isInterrupted(err) {
122+
err = errTimeout
123+
}
101124
if fd.net == "udp" && err == io.EOF {
102125
n = 0
103126
err = nil
@@ -106,14 +129,23 @@ func (fd *netFD) Read(b []byte) (n int, err error) {
106129
}
107130

108131
func (fd *netFD) Write(b []byte) (n int, err error) {
132+
if fd.wtimedout.isSet() {
133+
return 0, errTimeout
134+
}
109135
if !fd.ok() || fd.data == nil {
110136
return 0, syscall.EINVAL
111137
}
112138
if err := fd.writeLock(); err != nil {
113139
return 0, err
114140
}
115141
defer fd.writeUnlock()
116-
return fd.data.Write(b)
142+
fd.waio = newAsyncIO(fd.data.Write, b)
143+
n, err = fd.waio.Wait()
144+
fd.waio = nil
145+
if isInterrupted(err) {
146+
err = errTimeout
147+
}
148+
return
117149
}
118150

119151
func (fd *netFD) closeRead() error {
@@ -185,15 +217,74 @@ func (fd *netFD) file(f *os.File, s string) (*os.File, error) {
185217
}
186218

187219
func (fd *netFD) setDeadline(t time.Time) error {
188-
return syscall.EPLAN9
220+
return setDeadlineImpl(fd, t, 'r'+'w')
189221
}
190222

191223
func (fd *netFD) setReadDeadline(t time.Time) error {
192-
return syscall.EPLAN9
224+
return setDeadlineImpl(fd, t, 'r')
193225
}
194226

195227
func (fd *netFD) setWriteDeadline(t time.Time) error {
196-
return syscall.EPLAN9
228+
return setDeadlineImpl(fd, t, 'w')
229+
}
230+
231+
func setDeadlineImpl(fd *netFD, t time.Time, mode int) error {
232+
d := t.Sub(time.Now())
233+
if mode == 'r' || mode == 'r'+'w' {
234+
fd.rtimedout.setFalse()
235+
}
236+
if mode == 'w' || mode == 'r'+'w' {
237+
fd.wtimedout.setFalse()
238+
}
239+
if t.IsZero() || d < 0 {
240+
// Stop timer
241+
if mode == 'r' || mode == 'r'+'w' {
242+
if fd.rtimer != nil {
243+
fd.rtimer.Stop()
244+
}
245+
fd.rtimer = nil
246+
}
247+
if mode == 'w' || mode == 'r'+'w' {
248+
if fd.wtimer != nil {
249+
fd.wtimer.Stop()
250+
}
251+
fd.wtimer = nil
252+
}
253+
} else {
254+
// Interrupt I/O operation once timer has expired
255+
if mode == 'r' || mode == 'r'+'w' {
256+
fd.rtimer = time.AfterFunc(d, func() {
257+
fd.rtimedout.setTrue()
258+
if fd.raio != nil {
259+
fd.raio.Cancel()
260+
}
261+
})
262+
}
263+
if mode == 'w' || mode == 'r'+'w' {
264+
fd.wtimer = time.AfterFunc(d, func() {
265+
fd.wtimedout.setTrue()
266+
if fd.waio != nil {
267+
fd.waio.Cancel()
268+
}
269+
})
270+
}
271+
}
272+
if !t.IsZero() && d < 0 {
273+
// Interrupt current I/O operation
274+
if mode == 'r' || mode == 'r'+'w' {
275+
fd.rtimedout.setTrue()
276+
if fd.raio != nil {
277+
fd.raio.Cancel()
278+
}
279+
}
280+
if mode == 'w' || mode == 'r'+'w' {
281+
fd.wtimedout.setTrue()
282+
if fd.waio != nil {
283+
fd.waio.Cancel()
284+
}
285+
}
286+
}
287+
return nil
197288
}
198289

199290
func setReadBuffer(fd *netFD, bytes int) error {
@@ -207,3 +298,7 @@ func setWriteBuffer(fd *netFD, bytes int) error {
207298
func isHangup(err error) bool {
208299
return err != nil && stringsHasSuffix(err.Error(), "Hangup")
209300
}
301+
302+
func isInterrupted(err error) bool {
303+
return err != nil && stringsHasSuffix(err.Error(), "interrupted")
304+
}

src/net/tcpsock_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,11 @@ func TestTCPConcurrentAccept(t *testing.T) {
467467

468468
func TestTCPReadWriteAllocs(t *testing.T) {
469469
switch runtime.GOOS {
470+
case "plan9":
471+
// The implementation of asynchronous cancelable
472+
// I/O on Plan 9 allocates memory.
473+
// See net/fd_io_plan9.go.
474+
t.Skipf("not supported on %s", runtime.GOOS)
470475
case "nacl":
471476
// NaCl needs to allocate pseudo file descriptor
472477
// stuff. See syscall/fd_nacl.go.

src/runtime/net_plan9.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2016 The Go Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
package runtime
6+
7+
import (
8+
_ "unsafe"
9+
)
10+
11+
//go:linkname runtime_ignoreHangup net.runtime_ignoreHangup
12+
func runtime_ignoreHangup() {
13+
getg().m.ignoreHangup = true
14+
}
15+
16+
//go:linkname runtime_unignoreHangup net.runtime_unignoreHangup
17+
func runtime_unignoreHangup(sig string) {
18+
getg().m.ignoreHangup = false
19+
}
20+
21+
func ignoredNote(note *byte) bool {
22+
if note == nil {
23+
return false
24+
}
25+
if gostringnocopy(note) != "hangup" {
26+
return false
27+
}
28+
return getg().m.ignoreHangup
29+
}

src/runtime/os3_plan9.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ func sighandler(_ureg *ureg, note *byte, gp *g) int {
100100
return _NCONT
101101
}
102102
if flags&_SigNotify != 0 {
103+
if ignoredNote(note) {
104+
return _NCONT
105+
}
103106
if sendNote(note) {
104107
return _NCONT
105108
}

src/runtime/os_plan9.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ type mOS struct {
1313
waitsemacount uint32
1414
notesig *int8
1515
errstr *byte
16+
ignoreHangup bool
1617
}
1718

1819
func closefd(fd int32) int32

0 commit comments

Comments
 (0)