@@ -22,6 +22,7 @@ import (
22
22
"strconv"
23
23
"strings"
24
24
"sync"
25
+ "time"
25
26
26
27
"golang.org/x/net/http2/hpack"
27
28
)
@@ -84,6 +85,11 @@ type Transport struct {
84
85
// to mean no limit.
85
86
MaxHeaderListSize uint32
86
87
88
+ // t1, if non-nil, is the standard library Transport using
89
+ // this transport. Its settings are used (but not its
90
+ // RoundTrip method, etc).
91
+ t1 * http.Transport
92
+
87
93
connPoolOnce sync.Once
88
94
connPoolOrDef ClientConnPool // non-nil version of ConnPool
89
95
}
@@ -99,12 +105,7 @@ func (t *Transport) maxHeaderListSize() uint32 {
99
105
}
100
106
101
107
func (t * Transport ) disableCompression () bool {
102
- if t .DisableCompression {
103
- return true
104
- }
105
- // TODO: also disable if this transport is somehow linked to an http1 Transport
106
- // and it's configured there?
107
- return false
108
+ return t .DisableCompression || (t .t1 != nil && t .t1 .DisableCompression )
108
109
}
109
110
110
111
var errTransportVersion = errors .New ("http2: ConfigureTransport is only supported starting at Go 1.6" )
@@ -160,7 +161,7 @@ type ClientConn struct {
160
161
henc * hpack.Encoder
161
162
freeBuf [][]byte
162
163
163
- wmu sync.Mutex // held while writing; acquire AFTER wmu if holding both
164
+ wmu sync.Mutex // held while writing; acquire AFTER mu if holding both
164
165
werr error // first write error that has occurred
165
166
}
166
167
@@ -178,7 +179,7 @@ type clientStream struct {
178
179
inflow flow // guarded by cc.mu
179
180
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
180
181
readErr error // sticky read error; owned by transportResponseBody.Read
181
- stopReqBody bool // stop writing req body; guarded by cc.mu
182
+ stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
182
183
183
184
peerReset chan struct {} // closed on peer reset
184
185
resetErr error // populated before peerReset is closed
@@ -221,10 +222,13 @@ func (cs *clientStream) checkReset() error {
221
222
}
222
223
}
223
224
224
- func (cs * clientStream ) abortRequestBodyWrite () {
225
+ func (cs * clientStream ) abortRequestBodyWrite (err error ) {
226
+ if err == nil {
227
+ panic ("nil error" )
228
+ }
225
229
cc := cs .cc
226
230
cc .mu .Lock ()
227
- cs .stopReqBody = true
231
+ cs .stopReqBody = err
228
232
cc .cond .Broadcast ()
229
233
cc .mu .Unlock ()
230
234
}
@@ -364,6 +368,12 @@ func (t *Transport) dialTLSDefault(network, addr string, cfg *tls.Config) (net.C
364
368
return cn , nil
365
369
}
366
370
371
+ // disableKeepAlives reports whether connections should be closed as
372
+ // soon as possible.
373
+ func (t * Transport ) disableKeepAlives () bool {
374
+ return t .t1 != nil && t .t1 .DisableKeepAlives
375
+ }
376
+
367
377
func (t * Transport ) NewClientConn (c net.Conn ) (* ClientConn , error ) {
368
378
if VerboseLogs {
369
379
t .vlogf ("http2: Transport creating client conn to %v" , c .RemoteAddr ())
@@ -463,7 +473,7 @@ func (cc *ClientConn) CanTakeNewRequest() bool {
463
473
}
464
474
465
475
func (cc * ClientConn ) canTakeNewRequestLocked () bool {
466
- return cc .goAway == nil &&
476
+ return cc .goAway == nil && ! cc . closed &&
467
477
int64 (len (cc .streams )+ 1 ) < int64 (cc .maxConcurrentStreams ) &&
468
478
cc .nextStreamID < 2147483647
469
479
}
@@ -544,6 +554,17 @@ func commaSeparatedTrailers(req *http.Request) (string, error) {
544
554
return "" , nil
545
555
}
546
556
557
+ func (cc * ClientConn ) responseHeaderTimeout () time.Duration {
558
+ if cc .t .t1 != nil {
559
+ return cc .t .t1 .ResponseHeaderTimeout
560
+ }
561
+ // No way to do this (yet?) with just an http2.Transport. Probably
562
+ // no need. Request.Cancel this is the new way. We only need to support
563
+ // this for compatibility with the old http.Transport fields when
564
+ // we're doing transparent http2.
565
+ return 0
566
+ }
567
+
547
568
func (cc * ClientConn ) RoundTrip (req * http.Request ) (* http.Response , error ) {
548
569
trailers , err := commaSeparatedTrailers (req )
549
570
if err != nil {
@@ -623,17 +644,25 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
623
644
return nil , werr
624
645
}
625
646
647
+ var respHeaderTimer <- chan time.Time
626
648
var bodyCopyErrc chan error // result of body copy
627
649
if hasBody {
628
650
bodyCopyErrc = make (chan error , 1 )
629
651
go func () {
630
652
bodyCopyErrc <- cs .writeRequestBody (body , req .Body )
631
653
}()
654
+ } else {
655
+ if d := cc .responseHeaderTimeout (); d != 0 {
656
+ timer := time .NewTimer (d )
657
+ defer timer .Stop ()
658
+ respHeaderTimer = timer .C
659
+ }
632
660
}
633
661
634
662
readLoopResCh := cs .resc
635
663
requestCanceledCh := requestCancel (req )
636
- requestCanceled := false
664
+ bodyWritten := false
665
+
637
666
for {
638
667
select {
639
668
case re := <- readLoopResCh :
@@ -648,7 +677,7 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
648
677
// doesn't, they'll RST_STREAM us soon enough. This is a
649
678
// heuristic to avoid adding knobs to Transport. Hopefully
650
679
// we can keep it.
651
- cs .abortRequestBodyWrite ()
680
+ cs .abortRequestBodyWrite (errStopReqBodyWrite )
652
681
}
653
682
if re .err != nil {
654
683
cc .forgetStreamID (cs .ID )
@@ -657,37 +686,37 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
657
686
res .Request = req
658
687
res .TLS = cc .tlsState
659
688
return res , nil
689
+ case <- respHeaderTimer :
690
+ cc .forgetStreamID (cs .ID )
691
+ if ! hasBody || bodyWritten {
692
+ cc .writeStreamReset (cs .ID , ErrCodeCancel , nil )
693
+ } else {
694
+ cs .abortRequestBodyWrite (errStopReqBodyWriteAndCancel )
695
+ }
696
+ return nil , errTimeout
660
697
case <- requestCanceledCh :
661
698
cc .forgetStreamID (cs .ID )
662
- cs .abortRequestBodyWrite ()
663
- if ! hasBody {
699
+ if ! hasBody || bodyWritten {
664
700
cc .writeStreamReset (cs .ID , ErrCodeCancel , nil )
665
- return nil , errRequestCanceled
701
+ } else {
702
+ cs .abortRequestBodyWrite (errStopReqBodyWriteAndCancel )
666
703
}
667
- // If we have a body, wait for the body write to be
668
- // finished before sending the RST_STREAM frame.
669
- requestCanceled = true
670
- requestCanceledCh = nil // to prevent spins
671
- readLoopResCh = nil // ignore responses at this point
704
+ return nil , errRequestCanceled
672
705
case <- cs .peerReset :
673
- if requestCanceled {
674
- // They hung up on us first. No need to write a RST_STREAM.
675
- // But prioritize the request canceled error value, since
676
- // it's likely related. (same spirit as http1 code)
677
- return nil , errRequestCanceled
678
- }
679
706
// processResetStream already removed the
680
707
// stream from the streams map; no need for
681
708
// forgetStreamID.
682
709
return nil , cs .resetErr
683
710
case err := <- bodyCopyErrc :
684
- if requestCanceled {
685
- cc .writeStreamReset (cs .ID , ErrCodeCancel , nil )
686
- return nil , errRequestCanceled
687
- }
688
711
if err != nil {
689
712
return nil , err
690
713
}
714
+ bodyWritten = true
715
+ if d := cc .responseHeaderTimeout (); d != 0 {
716
+ timer := time .NewTimer (d )
717
+ defer timer .Stop ()
718
+ respHeaderTimer = timer .C
719
+ }
691
720
}
692
721
}
693
722
}
@@ -723,9 +752,14 @@ func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte)
723
752
return cc .werr
724
753
}
725
754
726
- // errAbortReqBodyWrite is an internal error value.
727
- // It doesn't escape to callers.
728
- var errAbortReqBodyWrite = errors .New ("http2: aborting request body write" )
755
+ // internal error values; they don't escape to callers
756
+ var (
757
+ // abort request body write; don't send cancel
758
+ errStopReqBodyWrite = errors .New ("http2: aborting request body write" )
759
+
760
+ // abort request body write, but send stream reset of cancel.
761
+ errStopReqBodyWriteAndCancel = errors .New ("http2: canceling request" )
762
+ )
729
763
730
764
func (cs * clientStream ) writeRequestBody (body io.Reader , bodyCloser io.Closer ) (err error ) {
731
765
cc := cs .cc
@@ -761,7 +795,13 @@ func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (
761
795
for len (remain ) > 0 && err == nil {
762
796
var allowed int32
763
797
allowed , err = cs .awaitFlowControl (len (remain ))
764
- if err != nil {
798
+ switch {
799
+ case err == errStopReqBodyWrite :
800
+ return err
801
+ case err == errStopReqBodyWriteAndCancel :
802
+ cc .writeStreamReset (cs .ID , ErrCodeCancel , nil )
803
+ return err
804
+ case err != nil :
765
805
return err
766
806
}
767
807
cc .wmu .Lock ()
@@ -821,8 +861,8 @@ func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error)
821
861
if cc .closed {
822
862
return 0 , errClientConnClosed
823
863
}
824
- if cs .stopReqBody {
825
- return 0 , errAbortReqBodyWrite
864
+ if cs .stopReqBody != nil {
865
+ return 0 , cs . stopReqBody
826
866
}
827
867
if err := cs .checkReset (); err != nil {
828
868
return 0 , err
@@ -898,7 +938,7 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
898
938
cc .writeHeader (lowKey , v )
899
939
}
900
940
}
901
- if contentLength >= 0 {
941
+ if shouldSendReqContentLength ( req . Method , contentLength ) {
902
942
cc .writeHeader ("content-length" , strconv .FormatInt (contentLength , 10 ))
903
943
}
904
944
if addGzipHeader {
@@ -910,6 +950,28 @@ func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trail
910
950
return cc .hbuf .Bytes ()
911
951
}
912
952
953
+ // shouldSendReqContentLength reports whether the http2.Transport should send
954
+ // a "content-length" request header. This logic is basically a copy of the net/http
955
+ // transferWriter.shouldSendContentLength.
956
+ // The contentLength is the corrected contentLength (so 0 means actually 0, not unknown).
957
+ // -1 means unknown.
958
+ func shouldSendReqContentLength (method string , contentLength int64 ) bool {
959
+ if contentLength > 0 {
960
+ return true
961
+ }
962
+ if contentLength < 0 {
963
+ return false
964
+ }
965
+ // For zero bodies, whether we send a content-length depends on the method.
966
+ // It also kinda doesn't matter for http2 either way, with END_STREAM.
967
+ switch method {
968
+ case "POST" , "PUT" , "PATCH" :
969
+ return true
970
+ default :
971
+ return false
972
+ }
973
+ }
974
+
913
975
// requires cc.mu be held.
914
976
func (cc * ClientConn ) encodeTrailers (req * http.Request ) []byte {
915
977
cc .hbuf .Reset ()
@@ -1032,6 +1094,8 @@ func (rl *clientConnReadLoop) cleanup() {
1032
1094
1033
1095
func (rl * clientConnReadLoop ) run () error {
1034
1096
cc := rl .cc
1097
+ closeWhenIdle := cc .t .disableKeepAlives ()
1098
+ gotReply := false // ever saw a reply
1035
1099
for {
1036
1100
f , err := cc .fr .ReadFrame ()
1037
1101
if err != nil {
@@ -1046,18 +1110,25 @@ func (rl *clientConnReadLoop) run() error {
1046
1110
if VerboseLogs {
1047
1111
cc .vlogf ("http2: Transport received %s" , summarizeFrame (f ))
1048
1112
}
1113
+ maybeClose := false // whether frame might transition us to idle
1049
1114
1050
1115
switch f := f .(type ) {
1051
1116
case * HeadersFrame :
1052
1117
err = rl .processHeaders (f )
1118
+ maybeClose = true
1119
+ gotReply = true
1053
1120
case * ContinuationFrame :
1054
1121
err = rl .processContinuation (f )
1122
+ maybeClose = true
1055
1123
case * DataFrame :
1056
1124
err = rl .processData (f )
1125
+ maybeClose = true
1057
1126
case * GoAwayFrame :
1058
1127
err = rl .processGoAway (f )
1128
+ maybeClose = true
1059
1129
case * RSTStreamFrame :
1060
1130
err = rl .processResetStream (f )
1131
+ maybeClose = true
1061
1132
case * SettingsFrame :
1062
1133
err = rl .processSettings (f )
1063
1134
case * PushPromiseFrame :
@@ -1072,6 +1143,9 @@ func (rl *clientConnReadLoop) run() error {
1072
1143
if err != nil {
1073
1144
return err
1074
1145
}
1146
+ if closeWhenIdle && gotReply && maybeClose && len (rl .activeRes ) == 0 {
1147
+ cc .closeIfIdle ()
1148
+ }
1075
1149
}
1076
1150
}
1077
1151
0 commit comments