Skip to content

x/net/http2: closing a requestBody in the ServerHTTP method drops conn-level flow control #28634

Closed
@jared2501

Description

@jared2501

Please answer these questions before submitting your issue. Thanks!

What version of Go are you using (go version)?

go1.11 darwin/amd64

Does this issue reproduce with the latest release?

yes

What operating system and processor architecture are you using (go env)?

GOARCH="amd64"
GOBIN=""
GOCACHE="/Users/jnewman/Library/Caches/go-build"
GOEXE=""
GOFLAGS=""
GOHOSTARCH="amd64"
GOHOSTOS="darwin"
GOOS="darwin"
GOPATH="/Users/jnewman/go"
GOPROXY=""
GORACE=""
GOROOT="/usr/local/go"
GOTMPDIR=""
GOTOOLDIR="/usr/local/go/pkg/tool/darwin_amd64"
GCCGO="gccgo"
CC="clang"
CXX="clang++"
CGO_ENABLED="1"
GOMOD=""
CGO_CFLAGS="-g -O2"
CGO_CPPFLAGS=""
CGO_CXXFLAGS="-g -O2"
CGO_FFLAGS="-g -O2"
CGO_LDFLAGS="-g -O2"
PKG_CONFIG="pkg-config"
GOGCCFLAGS="-fPIC -m64 -pthread -fno-caret-diagnostics -Qunused-arguments -fmessage-length=0 -fdebug-prefix-map=/var/folders/_t/hg9f_j4x1q743pqh00kdv6m00000gn/T/go-build803039253=/tmp/go-build -gno-record-gcc-switches -fno-common"

What did you do? / What did you expect to see? / What did you see instead?

I'm using an httputil.ReverseProxy to proxy a client-streaming RPC (using go grpc). If the upstream connection that the ReverseProxy is copying to breaks then the http2 transport attempts to close the request's body. This causes the http2.pipe that back's the http2.requestBody to have it's pipeBuffer set to nil see here. When the http2 goes to retrieve the length of the requestBody so that it can return the remaining data to the connection-level flow control it discovers the length is 0 and does not return the data in the pipe to the conn-leel flow control. Therefore, the client get's stuck.

Here's some logs I added:

interceptor write. len=16393
http2 server sendWindowUpdate32. n=16384, avail=32768
smux write. id=7, len=16413
controlBuffer.get. block=true
controlBuffer.get completed. block=true
handle: *transport.incomingWindowUpdate
sendQuota. 16384
controlBuffer.get. block=false
controlBuffer.get completed. block=false
handle: *transport.incomingWindowUpdate
sendQuota. 16384
processing dataItem
down here
controlBuffer.get. block=false
controlBuffer.get completed. block=false
sendQuota. 0
interceptor write. len=16393
all sent. id=7 len=16413
controlBuffer.get. block=true
smux write. id=7, len=38
all sent. id=7 len=38
smux write. id=7, len=16413
http2 server sendWindowUpdate32. n=16384, avail=16384
controlBuffer.get completed. block=true
handle: *transport.incomingWindowUpdate
sendQuota. 16384
controlBuffer.get. block=false
controlBuffer.get completed. block=false
sendQuota. 16384
controlBuffer.get. block=false
controlBuffer.get completed. block=false
sendQuota. 16384
controlBuffer.get. block=true
controlBuffer.get completed. block=true
handle: *transport.incomingWindowUpdate
sendQuota. 16384
processing dataItem
down here
controlBuffer.get. block=false
controlBuffer.get completed. block=false
sendQuota. 0
interceptor write. len=16393
controlBuffer.get. block=true
2018/11/07 09:12:55 http: proxy error: write tcp 10.11.74.1:49006->10.111.142.1:6001: write: connection reset by peer
requestBody.Close invoked BreakWithError
setting p.b = nil, prevLen=1048576
smux write. id=7, len=31
smux write. id=5, len=31
server closeStream! Len=
trying to return closed stuff... len=0!
2018-11-07T09:12:55.613Z	INFO	remote/mux.go:567	Disconnecting from peer. peerHostID=12
2018-11-07T09:12:55.613Z	ERROR	remote/mux.go:247	Error connecting to peer. peerAddr=10.111.142.1:6001, err=error in client flux stream

Here's the diff to apply the logs:

commit 5cb80cec760b692f307ccd41e21f384fb583e0d1
Author: Jared Newman <[email protected]>
Date:   Wed Nov 7 01:08:00 2018 -0800

    wip

diff --git a/lifecycle/lifecycle.go b/lifecycle/lifecycle.go
index 0936ef4..aa4b1b4 100644
--- a/lifecycle/lifecycle.go
+++ b/lifecycle/lifecycle.go
@@ -106,7 +106,7 @@ func NewLifecycle(config *FluxConfig) *Lifecycle {
 		MeshStreamRetryInterval:      5 * time.Second,
 		GroupKeyRotationInterval:     10 * time.Minute,
 		PeerReconnectInterval:        5 * time.Second,
-		BackfillLoopSleepTimeout:     10 * time.Second,
+		BackfillLoopSleepTimeout:     5 * time.Second,
 		ExpireEventLogOpenAfter:      2 * time.Minute,
 		MoveHWMForwardPeriod:         5 * time.Second,
 		GetNextEpochRangeTimeout:     10 * time.Second,
diff --git a/remote/mux.go b/remote/mux.go
index 78c3d24..97743e5 100644
--- a/remote/mux.go
+++ b/remote/mux.go
@@ -25,6 +25,7 @@ import (
 	"golang.org/x/net/http2"
 	"google.golang.org/grpc"
 	"io"
+	"io/ioutil"
 	"math/big"
 	"net"
 	"net/http"
@@ -405,6 +406,7 @@ func (s *Server) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
 		FlushInterval: 100 * time.Millisecond,
 		Director:      func(*http.Request) {},
 		Transport: roundTripperFunc(func(req *http.Request) (*http.Response, error) {
+			req.Body = ioutil.NopCloser(req.Body)
 			return nextHopPeer.RoundTrip(priority, destHostID, environment, req)
 		}),
 	}
diff --git a/vendor/golang.org/x/net/http2/pipe.go b/vendor/golang.org/x/net/http2/pipe.go
index a614009..f45806d 100644
--- a/vendor/golang.org/x/net/http2/pipe.go
+++ b/vendor/golang.org/x/net/http2/pipe.go
@@ -8,6 +8,7 @@ import (
 	"errors"
 	"io"
 	"sync"
+	"fmt"
 )
 
 // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
@@ -117,6 +118,9 @@ func (p *pipe) closeWithError(dst *error, err error, fn func()) {
 	}
 	p.readFn = fn
 	if dst == &p.breakErr {
+		if p.b != nil{
+			fmt.Printf("setting p.b = nil, prevLen=%d\n", p.b.Len())
+		}
 		p.b = nil
 	}
 	*dst = err
diff --git a/vendor/golang.org/x/net/http2/server.go b/vendor/golang.org/x/net/http2/server.go
index b57b6e2..4ab8542 100644
--- a/vendor/golang.org/x/net/http2/server.go
+++ b/vendor/golang.org/x/net/http2/server.go
@@ -1454,6 +1454,7 @@ func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
 }
 
 func (sc *serverConn) closeStream(st *stream, err error) {
+	fmt.Printf("server closeStream! Len=\n")
 	sc.serveG.check()
 	if st.state == stateIdle || st.state == stateClosed {
 		panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state))
@@ -1478,6 +1479,7 @@ func (sc *serverConn) closeStream(st *stream, err error) {
 		}
 	}
 	if p := st.body; p != nil {
+		fmt.Printf("trying to return closed stuff... len=%d!\n", p.Len())
 		// Return any buffered unread bytes worth of conn-level flow control.
 		// See golang.org/issue/16481
 		sc.sendWindowUpdate(nil, p.Len())
@@ -2219,6 +2221,7 @@ func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
 	var ok bool
 	if st == nil {
 		ok = sc.inflow.add(n)
+		fmt.Printf("http2 server sendWindowUpdate32. n=%d, avail=%d\n", n, sc.inflow.available())
 	} else {
 		ok = st.inflow.add(n)
 	}
@@ -2240,6 +2243,7 @@ type requestBody struct {
 
 func (b *requestBody) Close() error {
 	if b.pipe != nil && !b.closed {
+		fmt.Printf("requestBody.Close invoked BreakWithError\n")
 		b.pipe.BreakWithError(errClosedBody)
 	}
 	b.closed = true
diff --git a/vendor/golang.org/x/net/http2/transport.go b/vendor/golang.org/x/net/http2/transport.go
index 2c9fe88..fbfa0eb 100644
--- a/vendor/golang.org/x/net/http2/transport.go
+++ b/vendor/golang.org/x/net/http2/transport.go
@@ -2008,6 +2008,7 @@ func (b transportResponseBody) Close() error {
 		cc.mu.Unlock()
 	}
 
+	fmt.Printf("transportResponseBody.Close invoked BreakWithError\n")
 	cs.bufPipe.BreakWithError(errClosedResponseBody)
 	cc.forgetStreamID(cs.ID)
 	return nil

Metadata

Metadata

Assignees

No one assigned

    Labels

    FrozenDueToAgeNeedsInvestigationSomeone must examine and confirm this is a valid issue and not a duplicate of an existing one.help wanted

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions