Skip to content

Commit d7f77dc

Browse files
committed
Revert "http2: Send WindowUpdates when remaining bytes are below a threshold"
This reverts commit 2e0b12c. The calculation for when to return flow control doesn't properly take data in server read buffers into account, resulting in flow control credit being returned too quickly without backpressure. Fixes golang/go#56315 For golang/go#28732 Change-Id: I573afd1a37d8a711da47f05f38f4de04183fb941 Reviewed-on: https://go-review.googlesource.com/c/net/+/448055 TryBot-Result: Gopher Robot <[email protected]> Run-TryBot: Damien Neil <[email protected]> Reviewed-by: Roland Shoemaker <[email protected]>
1 parent 702349b commit d7f77dc

File tree

2 files changed

+41
-111
lines changed

2 files changed

+41
-111
lines changed

http2/server.go

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -869,7 +869,9 @@ func (sc *serverConn) serve() {
869869

870870
// Each connection starts with initialWindowSize inflow tokens.
871871
// If a higher value is configured, we add more tokens.
872-
sc.sendWindowUpdate(nil)
872+
if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
873+
sc.sendWindowUpdate(nil, int(diff))
874+
}
873875

874876
if err := sc.readPreface(); err != nil {
875877
sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
@@ -1469,8 +1471,7 @@ func (sc *serverConn) processFrame(f Frame) error {
14691471
if sc.inflow.available() < int32(f.Length) {
14701472
return sc.countError("data_flow", streamError(f.Header().StreamID, ErrCodeFlowControl))
14711473
}
1472-
sc.inflow.take(int32(f.Length))
1473-
sc.sendWindowUpdate(nil) // conn-level
1474+
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
14741475
}
14751476
return nil
14761477
}
@@ -1599,7 +1600,7 @@ func (sc *serverConn) closeStream(st *stream, err error) {
15991600
if p := st.body; p != nil {
16001601
// Return any buffered unread bytes worth of conn-level flow control.
16011602
// See golang.org/issue/16481
1602-
sc.sendWindowUpdate(nil)
1603+
sc.sendWindowUpdate(nil, p.Len())
16031604

16041605
p.CloseWithError(err)
16051606
}
@@ -1737,7 +1738,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
17371738
// sendWindowUpdate, which also schedules sending the
17381739
// frames.
17391740
sc.inflow.take(int32(f.Length))
1740-
sc.sendWindowUpdate(nil) // conn-level
1741+
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
17411742

17421743
if st != nil && st.resetQueued {
17431744
// Already have a stream error in flight. Don't send another.
@@ -1755,7 +1756,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
17551756
return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
17561757
}
17571758
sc.inflow.take(int32(f.Length))
1758-
sc.sendWindowUpdate(nil) // conn-level
1759+
sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
17591760

17601761
st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
17611762
// RFC 7540, sec 8.1.2.6: A request or response is also malformed if the
@@ -1773,7 +1774,7 @@ func (sc *serverConn) processData(f *DataFrame) error {
17731774
if len(data) > 0 {
17741775
wrote, err := st.body.Write(data)
17751776
if err != nil {
1776-
sc.sendWindowUpdate32(nil, int32(f.Length)-int32(wrote))
1777+
sc.sendWindowUpdate(nil, int(f.Length)-wrote)
17771778
return sc.countError("body_write_err", streamError(id, ErrCodeStreamClosed))
17781779
}
17791780
if wrote != len(data) {
@@ -2318,43 +2319,28 @@ func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {
23182319

23192320
func (sc *serverConn) noteBodyRead(st *stream, n int) {
23202321
sc.serveG.check()
2321-
sc.sendWindowUpdate(nil) // conn-level
2322+
sc.sendWindowUpdate(nil, n) // conn-level
23222323
if st.state != stateHalfClosedRemote && st.state != stateClosed {
23232324
// Don't send this WINDOW_UPDATE if the stream is closed
23242325
// remotely.
2325-
sc.sendWindowUpdate(st)
2326+
sc.sendWindowUpdate(st, n)
23262327
}
23272328
}
23282329

23292330
// st may be nil for conn-level
2330-
func (sc *serverConn) sendWindowUpdate(st *stream) {
2331+
func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
23312332
sc.serveG.check()
2332-
2333-
var n int32
2334-
if st == nil {
2335-
if avail, windowSize := sc.inflow.n, sc.srv.initialConnRecvWindowSize(); avail > windowSize/2 {
2336-
return
2337-
} else {
2338-
n = windowSize - avail
2339-
}
2340-
} else {
2341-
if avail, windowSize := st.inflow.n, sc.srv.initialStreamRecvWindowSize(); avail > windowSize/2 {
2342-
return
2343-
} else {
2344-
n = windowSize - avail
2345-
}
2346-
}
23472333
// "The legal range for the increment to the flow control
23482334
// window is 1 to 2^31-1 (2,147,483,647) octets."
23492335
// A Go Read call on 64-bit machines could in theory read
23502336
// a larger Read than this. Very unlikely, but we handle it here
23512337
// rather than elsewhere for now.
23522338
const maxUint31 = 1<<31 - 1
2353-
for n >= maxUint31 {
2339+
for n > maxUint31 {
23542340
sc.sendWindowUpdate32(st, maxUint31)
23552341
n -= maxUint31
23562342
}
2357-
sc.sendWindowUpdate32(st, n)
2343+
sc.sendWindowUpdate32(st, int32(n))
23582344
}
23592345

23602346
// st may be nil for conn-level

http2/server_test.go

Lines changed: 28 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -809,6 +809,9 @@ func TestServer_Request_Post_Body_ContentLength_TooSmall(t *testing.T) {
809809
EndHeaders: true,
810810
})
811811
st.writeData(1, true, []byte("12345"))
812+
// Return flow control bytes back, since the data handler closed
813+
// the stream.
814+
st.wantWindowUpdate(0, 5)
812815
})
813816
}
814817

@@ -1244,41 +1247,6 @@ func TestServer_Handler_Sends_WindowUpdate(t *testing.T) {
12441247

12451248
st.greet()
12461249

1247-
st.writeHeaders(HeadersFrameParam{
1248-
StreamID: 1, // clients send odd numbers
1249-
BlockFragment: st.encodeHeader(":method", "POST"),
1250-
EndStream: false, // data coming
1251-
EndHeaders: true,
1252-
})
1253-
updateSize := 1 << 20 / 2 // the conn & stream size before a WindowUpdate
1254-
st.writeData(1, false, bytes.Repeat([]byte("a"), updateSize-10))
1255-
st.writeData(1, false, bytes.Repeat([]byte("b"), 10))
1256-
puppet.do(readBodyHandler(t, strings.Repeat("a", updateSize-10)))
1257-
puppet.do(readBodyHandler(t, strings.Repeat("b", 10)))
1258-
1259-
st.wantWindowUpdate(0, uint32(updateSize))
1260-
st.wantWindowUpdate(1, uint32(updateSize))
1261-
1262-
st.writeData(1, false, bytes.Repeat([]byte("a"), updateSize-10))
1263-
st.writeData(1, true, bytes.Repeat([]byte("c"), 15)) // END_STREAM here
1264-
puppet.do(readBodyHandler(t, strings.Repeat("a", updateSize-10)))
1265-
puppet.do(readBodyHandler(t, strings.Repeat("c", 15)))
1266-
1267-
st.wantWindowUpdate(0, uint32(updateSize+5))
1268-
}
1269-
1270-
func TestServer_Handler_Sends_WindowUpdate_SmallStream(t *testing.T) {
1271-
puppet := newHandlerPuppet()
1272-
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
1273-
puppet.act(w, r)
1274-
}, func(s *Server) {
1275-
s.MaxUploadBufferPerStream = 6
1276-
})
1277-
defer st.Close()
1278-
defer puppet.done()
1279-
1280-
st.greet()
1281-
12821250
st.writeHeaders(HeadersFrameParam{
12831251
StreamID: 1, // clients send odd numbers
12841252
BlockFragment: st.encodeHeader(":method", "POST"),
@@ -1287,14 +1255,18 @@ func TestServer_Handler_Sends_WindowUpdate_SmallStream(t *testing.T) {
12871255
})
12881256
st.writeData(1, false, []byte("abcdef"))
12891257
puppet.do(readBodyHandler(t, "abc"))
1290-
puppet.do(readBodyHandler(t, "d"))
1291-
puppet.do(readBodyHandler(t, "ef"))
1258+
st.wantWindowUpdate(0, 3)
1259+
st.wantWindowUpdate(1, 3)
12921260

1293-
st.wantWindowUpdate(1, 6)
1261+
puppet.do(readBodyHandler(t, "def"))
1262+
st.wantWindowUpdate(0, 3)
1263+
st.wantWindowUpdate(1, 3)
12941264

12951265
st.writeData(1, true, []byte("ghijkl")) // END_STREAM here
12961266
puppet.do(readBodyHandler(t, "ghi"))
12971267
puppet.do(readBodyHandler(t, "jkl"))
1268+
st.wantWindowUpdate(0, 3)
1269+
st.wantWindowUpdate(0, 3) // no more stream-level, since END_STREAM
12981270
}
12991271

13001272
// the version of the TestServer_Handler_Sends_WindowUpdate with padding.
@@ -1323,45 +1295,12 @@ func TestServer_Handler_Sends_WindowUpdate_Padding(t *testing.T) {
13231295
st.wantWindowUpdate(1, 5)
13241296

13251297
puppet.do(readBodyHandler(t, "abc"))
1326-
puppet.do(readBodyHandler(t, "def"))
1327-
}
1328-
1329-
// This is a regression test to make sure the correct window increment size is
1330-
// calculated for a stream.
1331-
// See https://go.dev/issue/56315#issuecomment-1287642591.
1332-
func TestServer_Handler_Sends_WindowUpdate_IncrementSize(t *testing.T) {
1333-
maxSizePerConn := initialWindowSize * 2
1334-
maxSizePerStream := maxSizePerConn*2 + 100
1298+
st.wantWindowUpdate(0, 3)
1299+
st.wantWindowUpdate(1, 3)
13351300

1336-
puppet := newHandlerPuppet()
1337-
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
1338-
puppet.act(w, r)
1339-
}, func(s *Server) {
1340-
s.MaxUploadBufferPerConnection = int32(maxSizePerConn)
1341-
s.MaxUploadBufferPerStream = int32(maxSizePerStream)
1342-
})
1343-
defer st.Close()
1344-
defer puppet.done()
1345-
1346-
st.greet()
1347-
1348-
st.writeHeaders(HeadersFrameParam{
1349-
StreamID: 1,
1350-
BlockFragment: st.encodeHeader(":method", "POST"),
1351-
EndStream: false,
1352-
EndHeaders: true,
1353-
})
1354-
1355-
st.writeData(1, false, bytes.Repeat([]byte("a"), maxSizePerConn/2))
1356-
puppet.do(readBodyHandler(t, strings.Repeat("a", maxSizePerConn/2)))
1357-
st.wantWindowUpdate(0, uint32(maxSizePerConn/2))
1358-
1359-
st.writeData(1, false, bytes.Repeat([]byte("b"), maxSizePerConn/2+100))
1360-
puppet.do(readBodyHandler(t, strings.Repeat("b", maxSizePerConn/2+100)))
1361-
st.wantWindowUpdate(0, uint32(maxSizePerConn/2+100))
1362-
st.wantWindowUpdate(1, uint32(maxSizePerConn+100))
1363-
1364-
st.writeData(1, true, nil) // END_STREAM here
1301+
puppet.do(readBodyHandler(t, "def"))
1302+
st.wantWindowUpdate(0, 3)
1303+
st.wantWindowUpdate(1, 3)
13651304
}
13661305

13671306
func TestServer_Send_GoAway_After_Bogus_WindowUpdate(t *testing.T) {
@@ -2357,6 +2296,8 @@ func TestServer_Response_Automatic100Continue(t *testing.T) {
23572296
// gigantic and/or sensitive "foo" payload now.
23582297
st.writeData(1, true, []byte(msg))
23592298

2299+
st.wantWindowUpdate(0, uint32(len(msg)))
2300+
23602301
hf = st.wantHeaders()
23612302
if hf.StreamEnded() {
23622303
t.Fatal("expected data to follow")
@@ -2544,6 +2485,9 @@ func TestServer_NoCrash_HandlerClose_Then_ClientClose(t *testing.T) {
25442485
// it did before.
25452486
st.writeData(1, true, []byte("foo"))
25462487

2488+
// Get our flow control bytes back, since the handler didn't get them.
2489+
st.wantWindowUpdate(0, uint32(len("foo")))
2490+
25472491
// Sent after a peer sends data anyway (admittedly the
25482492
// previous RST_STREAM might've still been in-flight),
25492493
// but they'll get the more friendly 'cancel' code
@@ -3986,6 +3930,7 @@ func TestServer_Rejects_TooSmall(t *testing.T) {
39863930
EndHeaders: true,
39873931
})
39883932
st.writeData(1, true, []byte("12345"))
3933+
st.wantWindowUpdate(0, 5)
39893934
st.wantRSTStream(1, ErrCodeProtocol)
39903935
})
39913936
}
@@ -4312,6 +4257,7 @@ func TestServerWindowUpdateOnBodyClose(t *testing.T) {
43124257
st.writeData(1, false, []byte(content[5:]))
43134258
blockCh <- true
43144259

4260+
increments := len(content)
43154261
for {
43164262
f, err := st.readFrame()
43174263
if err == io.EOF {
@@ -4320,12 +4266,10 @@ func TestServerWindowUpdateOnBodyClose(t *testing.T) {
43204266
if err != nil {
43214267
t.Fatal(err)
43224268
}
4323-
if rs, ok := f.(*RSTStreamFrame); ok && rs.StreamID == 1 {
4324-
break
4325-
}
43264269
if wu, ok := f.(*WindowUpdateFrame); ok && wu.StreamID == 0 {
4327-
if e, a := uint32(3), wu.Increment; e != a {
4328-
t.Errorf("Increment=%d, want %d", a, e)
4270+
increments -= int(wu.Increment)
4271+
if increments == 0 {
4272+
break
43294273
}
43304274
}
43314275
}
@@ -4468,22 +4412,22 @@ func TestServerSendsEarlyHints(t *testing.T) {
44684412

44694413
func TestProtocolErrorAfterGoAway(t *testing.T) {
44704414
st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
4471-
w.WriteHeader(200)
4472-
w.(http.Flusher).Flush()
44734415
io.Copy(io.Discard, r.Body)
44744416
})
44754417
defer st.Close()
44764418

44774419
st.greet()
4420+
content := "some content"
44784421
st.writeHeaders(HeadersFrameParam{
44794422
StreamID: 1,
44804423
BlockFragment: st.encodeHeader(
44814424
":method", "POST",
4482-
"content-length", "1",
4425+
"content-length", strconv.Itoa(len(content)),
44834426
),
44844427
EndStream: false,
44854428
EndHeaders: true,
44864429
})
4430+
st.writeData(1, false, []byte(content[:5]))
44874431

44884432
_, err := st.readFrame()
44894433
if err != nil {

0 commit comments

Comments
 (0)