Skip to content

Commit 676550d

Browse files
Paul Marksbradfitz
Paul Marks
authored andcommitted
net: use dialTCP cancelation for DualStack dialing.
The previous Happy Eyeballs implementation would intentionally leak connections, because dialTCP could not be reliably terminated upon losing the race. Now that dialTCP supports cancelation (plan9 excluded), dialParallel can wait for responses from both the primary and fallback racers, strictly before returning control to the caller. In dial_test.go, we no longer need Sleep to avoid leaks. Also, fix a typo in the Benchmark IPv4 address. Updates #11225 Fixes #14279 Change-Id: Ibf3fe5c7ac2f7a438c1ab2cdb57032beb8bc27b5 Reviewed-on: https://go-review.googlesource.com/19390 Reviewed-by: Mikio Hara <[email protected]> Reviewed-by: Brad Fitzpatrick <[email protected]>
1 parent 5583e8a commit 676550d

File tree

2 files changed

+149
-69
lines changed

2 files changed

+149
-69
lines changed

src/net/dial.go

Lines changed: 60 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package net
66

77
import (
88
"errors"
9+
"runtime"
910
"time"
1011
)
1112

@@ -225,8 +226,10 @@ func (d *Dialer) Dial(network, address string) (Conn, error) {
225226
finalDeadline: finalDeadline,
226227
}
227228

229+
// DualStack mode requires that dialTCP support cancelation. This is
230+
// not available on plan9 (golang.org/issue/11225), so we ignore it.
228231
var primaries, fallbacks addrList
229-
if d.DualStack && network == "tcp" {
232+
if d.DualStack && network == "tcp" && runtime.GOOS != "plan9" {
230233
primaries, fallbacks = addrs.partition(isIPv4)
231234
} else {
232235
primaries = addrs
@@ -236,9 +239,9 @@ func (d *Dialer) Dial(network, address string) (Conn, error) {
236239
if len(fallbacks) == 0 {
237240
// dialParallel can accept an empty fallbacks list,
238241
// but this shortcut avoids the goroutine/channel overhead.
239-
c, err = dialSerial(ctx, primaries, nil)
242+
c, err = dialSerial(ctx, primaries, ctx.Cancel)
240243
} else {
241-
c, err = dialParallel(ctx, primaries, fallbacks)
244+
c, err = dialParallel(ctx, primaries, fallbacks, ctx.Cancel)
242245
}
243246

244247
if d.KeepAlive > 0 && err == nil {
@@ -255,10 +258,9 @@ func (d *Dialer) Dial(network, address string) (Conn, error) {
255258
// head start. It returns the first established connection and
256259
// closes the others. Otherwise it returns an error from the first
257260
// primary address.
258-
func dialParallel(ctx *dialContext, primaries, fallbacks addrList) (Conn, error) {
259-
results := make(chan dialResult) // unbuffered, so dialSerialAsync can detect race loss & cleanup
261+
func dialParallel(ctx *dialContext, primaries, fallbacks addrList, userCancel <-chan struct{}) (Conn, error) {
262+
results := make(chan dialResult, 2)
260263
cancel := make(chan struct{})
261-
defer close(cancel)
262264

263265
// Spawn the primary racer.
264266
go dialSerialAsync(ctx, primaries, nil, cancel, results)
@@ -267,28 +269,59 @@ func dialParallel(ctx *dialContext, primaries, fallbacks addrList) (Conn, error)
267269
fallbackTimer := time.NewTimer(ctx.fallbackDelay())
268270
go dialSerialAsync(ctx, fallbacks, fallbackTimer, cancel, results)
269271

270-
var primaryErr error
271-
for nracers := 2; nracers > 0; nracers-- {
272-
res := <-results
273-
// If we're still waiting for a connection, then hasten the delay.
274-
// Otherwise, disable the Timer and let cancel take over.
275-
if fallbackTimer.Stop() && res.error != nil {
276-
fallbackTimer.Reset(0)
277-
}
278-
if res.error == nil {
279-
return res.Conn, nil
272+
// Wait for both racers to succeed or fail.
273+
var primaryResult, fallbackResult dialResult
274+
for !primaryResult.done || !fallbackResult.done {
275+
select {
276+
case <-userCancel:
277+
// Forward an external cancelation request.
278+
if cancel != nil {
279+
close(cancel)
280+
cancel = nil
281+
}
282+
userCancel = nil
283+
case res := <-results:
284+
// Drop the result into its assigned bucket.
285+
if res.primary {
286+
primaryResult = res
287+
} else {
288+
fallbackResult = res
289+
}
290+
// On success, cancel the other racer (if one exists.)
291+
if res.error == nil && cancel != nil {
292+
close(cancel)
293+
cancel = nil
294+
}
295+
// If the fallbackTimer was pending, then either we've canceled the
296+
// fallback because we no longer want it, or we haven't canceled yet
297+
// and therefore want it to wake up immediately.
298+
if fallbackTimer.Stop() && cancel != nil {
299+
fallbackTimer.Reset(0)
300+
}
280301
}
281-
if res.primary {
282-
primaryErr = res.error
302+
}
303+
304+
// Return, in order of preference:
305+
// 1. The primary connection (but close the other if we got both.)
306+
// 2. The fallback connection.
307+
// 3. The primary error.
308+
if primaryResult.error == nil {
309+
if fallbackResult.error == nil {
310+
fallbackResult.Conn.Close()
283311
}
312+
return primaryResult.Conn, nil
313+
} else if fallbackResult.error == nil {
314+
return fallbackResult.Conn, nil
315+
} else {
316+
return nil, primaryResult.error
284317
}
285-
return nil, primaryErr
286318
}
287319

288320
type dialResult struct {
289321
Conn
290322
error
291323
primary bool
324+
done bool
292325
}
293326

294327
// dialSerialAsync runs dialSerial after some delay, and returns the
@@ -300,19 +333,11 @@ func dialSerialAsync(ctx *dialContext, ras addrList, timer *time.Timer, cancel <
300333
select {
301334
case <-timer.C:
302335
case <-cancel:
303-
return
336+
// dialSerial will immediately return errCanceled in this case.
304337
}
305338
}
306339
c, err := dialSerial(ctx, ras, cancel)
307-
select {
308-
case results <- dialResult{c, err, timer == nil}:
309-
// We won the race.
310-
case <-cancel:
311-
// The other goroutine won the race.
312-
if c != nil {
313-
c.Close()
314-
}
315-
}
340+
results <- dialResult{Conn: c, error: err, primary: timer == nil, done: true}
316341
}
317342

318343
// dialSerial connects to a list of addresses in sequence, returning
@@ -336,11 +361,11 @@ func dialSerial(ctx *dialContext, ras addrList, cancel <-chan struct{}) (Conn, e
336361
break
337362
}
338363

339-
// dialTCP does not support cancelation (see golang.org/issue/11225),
340-
// so if cancel fires, we'll continue trying to connect until the next
341-
// timeout, or return a spurious connection for the caller to close.
364+
// If this dial is canceled, the implementation is expected to complete
365+
// quickly, but it's still possible that we could return a spurious Conn,
366+
// which the caller must Close.
342367
dialer := func(d time.Time) (Conn, error) {
343-
return dialSingle(ctx, ra, d)
368+
return dialSingle(ctx, ra, d, cancel)
344369
}
345370
c, err := dial(ctx.network, ra, dialer, partialDeadline)
346371
if err == nil {
@@ -360,15 +385,15 @@ func dialSerial(ctx *dialContext, ras addrList, cancel <-chan struct{}) (Conn, e
360385
// dialSingle attempts to establish and returns a single connection to
361386
// the destination address. This must be called through the OS-specific
362387
// dial function, because some OSes don't implement the deadline feature.
363-
func dialSingle(ctx *dialContext, ra Addr, deadline time.Time) (c Conn, err error) {
388+
func dialSingle(ctx *dialContext, ra Addr, deadline time.Time, cancel <-chan struct{}) (c Conn, err error) {
364389
la := ctx.LocalAddr
365390
if la != nil && la.Network() != ra.Network() {
366391
return nil, &OpError{Op: "dial", Net: ctx.network, Source: la, Addr: ra, Err: errors.New("mismatched local address type " + la.Network())}
367392
}
368393
switch ra := ra.(type) {
369394
case *TCPAddr:
370395
la, _ := la.(*TCPAddr)
371-
c, err = testHookDialTCP(ctx.network, la, ra, deadline, ctx.Cancel)
396+
c, err = testHookDialTCP(ctx.network, la, ra, deadline, cancel)
372397
case *UDPAddr:
373398
la, _ := la.(*UDPAddr)
374399
c, err = dialUDP(ctx.network, la, ra, deadline)

src/net/dial_test.go

Lines changed: 89 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,8 @@ func TestDialerDualStackFDLeak(t *testing.T) {
228228
// expected to hang until the timeout elapses. These addresses are reserved
229229
// for benchmarking by RFC 6890.
230230
const (
231-
slowDst4 = "192.18.0.254"
232-
slowDst6 = "2001:2::254"
233-
slowTimeout = 1 * time.Second
231+
slowDst4 = "198.18.0.254"
232+
slowDst6 = "2001:2::254"
234233
)
235234

236235
// In some environments, the slow IPs may be explicitly unreachable, and fail
@@ -239,7 +238,10 @@ const (
239238
func slowDialTCP(net string, laddr, raddr *TCPAddr, deadline time.Time, cancel <-chan struct{}) (*TCPConn, error) {
240239
c, err := dialTCP(net, laddr, raddr, deadline, cancel)
241240
if ParseIP(slowDst4).Equal(raddr.IP) || ParseIP(slowDst6).Equal(raddr.IP) {
242-
time.Sleep(deadline.Sub(time.Now()))
241+
select {
242+
case <-cancel:
243+
case <-time.After(deadline.Sub(time.Now())):
244+
}
243245
}
244246
return c, err
245247
}
@@ -283,6 +285,9 @@ func TestDialParallel(t *testing.T) {
283285
if !supportsIPv4 || !supportsIPv6 {
284286
t.Skip("both IPv4 and IPv6 are required")
285287
}
288+
if runtime.GOOS == "plan9" {
289+
t.Skip("skipping on plan9; cannot cancel dialTCP, golang.org/issue/11225")
290+
}
286291

287292
closedPortDelay, expectClosedPortDelay := dialClosedPort()
288293
if closedPortDelay > expectClosedPortDelay {
@@ -388,7 +393,6 @@ func TestDialParallel(t *testing.T) {
388393
fallbacks := makeAddrs(tt.fallbacks, dss.port)
389394
d := Dialer{
390395
FallbackDelay: fallbackDelay,
391-
Timeout: slowTimeout,
392396
}
393397
ctx := &dialContext{
394398
Dialer: d,
@@ -397,7 +401,7 @@ func TestDialParallel(t *testing.T) {
397401
finalDeadline: d.deadline(time.Now()),
398402
}
399403
startTime := time.Now()
400-
c, err := dialParallel(ctx, primaries, fallbacks)
404+
c, err := dialParallel(ctx, primaries, fallbacks, nil)
401405
elapsed := time.Now().Sub(startTime)
402406

403407
if c != nil {
@@ -417,9 +421,27 @@ func TestDialParallel(t *testing.T) {
417421
} else if !(elapsed <= expectElapsedMax) {
418422
t.Errorf("#%d: got %v; want <= %v", i, elapsed, expectElapsedMax)
419423
}
424+
425+
// Repeat each case, ensuring that it can be canceled quickly.
426+
cancel := make(chan struct{})
427+
var wg sync.WaitGroup
428+
wg.Add(1)
429+
go func() {
430+
time.Sleep(5 * time.Millisecond)
431+
close(cancel)
432+
wg.Done()
433+
}()
434+
startTime = time.Now()
435+
c, err = dialParallel(ctx, primaries, fallbacks, cancel)
436+
if c != nil {
437+
c.Close()
438+
}
439+
elapsed = time.Now().Sub(startTime)
440+
if elapsed > 100*time.Millisecond {
441+
t.Errorf("#%d (cancel): got %v; want <= 100ms", i, elapsed)
442+
}
443+
wg.Wait()
420444
}
421-
// Wait for any slowDst4/slowDst6 connections to timeout.
422-
time.Sleep(slowTimeout * 3 / 2)
423445
}
424446

425447
func lookupSlowFast(fn func(string) ([]IPAddr, error), host string) ([]IPAddr, error) {
@@ -462,8 +484,6 @@ func TestDialerFallbackDelay(t *testing.T) {
462484
{true, 200 * time.Millisecond, 200 * time.Millisecond},
463485
// The default is 300ms.
464486
{true, 0, 300 * time.Millisecond},
465-
// This case is last, in order to wait for hanging slowDst6 connections.
466-
{false, 0, slowTimeout},
467487
}
468488

469489
handler := func(dss *dualStackServer, ln Listener) {
@@ -487,7 +507,7 @@ func TestDialerFallbackDelay(t *testing.T) {
487507
}
488508

489509
for i, tt := range testCases {
490-
d := &Dialer{DualStack: tt.dualstack, FallbackDelay: tt.delay, Timeout: slowTimeout}
510+
d := &Dialer{DualStack: tt.dualstack, FallbackDelay: tt.delay}
491511

492512
startTime := time.Now()
493513
c, err := d.Dial("tcp", JoinHostPort("slow6loopback4", dss.port))
@@ -508,46 +528,82 @@ func TestDialerFallbackDelay(t *testing.T) {
508528
}
509529
}
510530

511-
func TestDialSerialAsyncSpuriousConnection(t *testing.T) {
531+
func TestDialParallelSpuriousConnection(t *testing.T) {
532+
if !supportsIPv4 || !supportsIPv6 {
533+
t.Skip("both IPv4 and IPv6 are required")
534+
}
512535
if runtime.GOOS == "plan9" {
513-
t.Skip("skipping on plan9; no deadline support, golang.org/issue/11932")
536+
t.Skip("skipping on plan9; cannot cancel dialTCP, golang.org/issue/11225")
537+
}
538+
539+
var wg sync.WaitGroup
540+
wg.Add(2)
541+
handler := func(dss *dualStackServer, ln Listener) {
542+
// Accept one connection per address.
543+
c, err := ln.Accept()
544+
if err != nil {
545+
t.Fatal(err)
546+
}
547+
// The client should close itself, without sending data.
548+
c.SetReadDeadline(time.Now().Add(1 * time.Second))
549+
var b [1]byte
550+
if _, err := c.Read(b[:]); err != io.EOF {
551+
t.Errorf("got %v; want %v", err, io.EOF)
552+
}
553+
c.Close()
554+
wg.Done()
514555
}
515-
ln, err := newLocalListener("tcp")
556+
dss, err := newDualStackServer([]streamListener{
557+
{network: "tcp4", address: "127.0.0.1"},
558+
{network: "tcp6", address: "::1"},
559+
})
516560
if err != nil {
517561
t.Fatal(err)
518562
}
519-
defer ln.Close()
563+
defer dss.teardown()
564+
if err := dss.buildup(handler); err != nil {
565+
t.Fatal(err)
566+
}
567+
568+
const fallbackDelay = 100 * time.Millisecond
569+
570+
origTestHookDialTCP := testHookDialTCP
571+
defer func() { testHookDialTCP = origTestHookDialTCP }()
572+
testHookDialTCP = func(net string, laddr, raddr *TCPAddr, deadline time.Time, cancel <-chan struct{}) (*TCPConn, error) {
573+
// Sleep long enough for Happy Eyeballs to kick in, and inhibit cancelation.
574+
// This forces dialParallel to juggle two successful connections.
575+
time.Sleep(fallbackDelay * 2)
576+
cancel = nil
577+
return dialTCP(net, laddr, raddr, deadline, cancel)
578+
}
520579

521-
d := Dialer{}
580+
d := Dialer{
581+
FallbackDelay: fallbackDelay,
582+
}
522583
ctx := &dialContext{
523584
Dialer: d,
524585
network: "tcp",
525586
address: "?",
526587
finalDeadline: d.deadline(time.Now()),
527588
}
528589

529-
results := make(chan dialResult)
530-
cancel := make(chan struct{})
531-
532-
// Spawn a connection in the background.
533-
go dialSerialAsync(ctx, addrList{ln.Addr()}, nil, cancel, results)
590+
makeAddr := func(ip string) addrList {
591+
addr, err := ResolveTCPAddr("tcp", JoinHostPort(ip, dss.port))
592+
if err != nil {
593+
t.Fatal(err)
594+
}
595+
return addrList{addr}
596+
}
534597

535-
// Receive it at the server.
536-
c, err := ln.Accept()
598+
// dialParallel returns one connection (and closes the other.)
599+
c, err := dialParallel(ctx, makeAddr("127.0.0.1"), makeAddr("::1"), nil)
537600
if err != nil {
538601
t.Fatal(err)
539602
}
540-
defer c.Close()
541-
542-
// Tell dialSerialAsync that someone else won the race.
543-
close(cancel)
603+
c.Close()
544604

545-
// The connection should close itself, without sending data.
546-
c.SetReadDeadline(time.Now().Add(1 * time.Second))
547-
var b [1]byte
548-
if _, err := c.Read(b[:]); err != io.EOF {
549-
t.Errorf("got %v; want %v", err, io.EOF)
550-
}
605+
// The server should've seen both connections.
606+
wg.Wait()
551607
}
552608

553609
func TestDialerPartialDeadline(t *testing.T) {
@@ -676,7 +732,6 @@ func TestDialerDualStack(t *testing.T) {
676732
c.Close()
677733
}
678734
}
679-
time.Sleep(timeout * 3 / 2) // wait for the dial racers to stop
680735
}
681736

682737
func TestDialerKeepAlive(t *testing.T) {

0 commit comments

Comments
 (0)