-
Notifications
You must be signed in to change notification settings - Fork 4.5k
rpc_util: Reuse memory buffer for receiving message #5862
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
493be5d
ecdb6d2
cb8827d
bae37e3
cbaef54
f9730dd
e746250
711db78
c05feed
9390057
b1c6496
086dd28
d6c1d97
1b11bba
a44c200
6b5000f
c34da60
acec626
3cbb6b5
57b9c67
76caf74
8f33b9b
5155566
15f820e
539eef3
056b3e5
0d07cfc
7f6fdb2
ca9f6de
9be7889
25b60e3
8e8f683
96f5a27
fe1294f
5535416
2678efb
8199eeb
a70df17
86d999f
63a360e
1f4bc35
3dcd833
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -174,6 +174,7 @@ type serverOptions struct { | |||||
maxHeaderListSize *uint32 | ||||||
headerTableSize *uint32 | ||||||
numServerWorkers uint32 | ||||||
recvBufferPool SharedBufferPool | ||||||
} | ||||||
|
||||||
var defaultServerOptions = serverOptions{ | ||||||
|
@@ -182,6 +183,7 @@ var defaultServerOptions = serverOptions{ | |||||
connectionTimeout: 120 * time.Second, | ||||||
writeBufferSize: defaultWriteBufSize, | ||||||
readBufferSize: defaultReadBufSize, | ||||||
recvBufferPool: nopBufferPool{}, | ||||||
} | ||||||
var globalServerOptions []ServerOption | ||||||
|
||||||
|
@@ -552,6 +554,27 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption { | |||||
}) | ||||||
} | ||||||
|
||||||
// RecvBufferPool returns a ServerOption that configures the server | ||||||
// to use the provided shared buffer pool for parsing incoming messages. Depending | ||||||
// on the application's workload, this could result in reduced memory allocation. | ||||||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
// | ||||||
// If you are unsure about how to implement a memory pool but want to utilize one, | ||||||
// begin with grpc.NewSharedBufferPool. | ||||||
// | ||||||
// Note: The shared buffer pool feature will not be active if any of the following | ||||||
// options are used: StatsHandler, EnableTracing, or binary logging. In such | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder why this optimization is not going to work with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unfortunately, the current Line 1096 in 2cd95c7
A possible solution I've thought of is to allow users to control the Line 723 in 2cd95c7
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think there's any way around this, given our current API, unfortunately. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
// cases, the shared buffer pool will be ignored. | ||||||
// | ||||||
// # Experimental | ||||||
// | ||||||
// Notice: This API is EXPERIMENTAL and may be changed or removed in a | ||||||
// later release. | ||||||
func RecvBufferPool(bufferPool SharedBufferPool) ServerOption { | ||||||
return newFuncServerOption(func(o *serverOptions) { | ||||||
o.recvBufferPool = bufferPool | ||||||
}) | ||||||
} | ||||||
|
||||||
// serverWorkerResetThreshold defines how often the stack must be reset. Every | ||||||
// N requests, by spawning a new goroutine in its place, a worker can reset its | ||||||
// stack so that large stacks don't live in memory forever. 2^16 should allow | ||||||
|
@@ -1296,7 +1319,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. | |||||
if len(shs) != 0 || len(binlogs) != 0 { | ||||||
payInfo = &payloadInfo{} | ||||||
} | ||||||
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) | ||||||
d, err := recvAndDecompress(&parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) | ||||||
if err != nil { | ||||||
if e := t.WriteStatus(stream, status.Convert(err)); e != nil { | ||||||
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e) | ||||||
|
@@ -1506,7 +1529,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp | |||||
ctx: ctx, | ||||||
t: t, | ||||||
s: stream, | ||||||
p: &parser{r: stream}, | ||||||
p: &parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, | ||||||
codec: s.getCodec(stream.ContentSubtype()), | ||||||
maxReceiveMessageSize: s.opts.maxReceiveMessageSize, | ||||||
maxSendMessageSize: s.opts.maxSendMessageSize, | ||||||
|
Uh oh!
There was an error while loading. Please reload this page.