Skip to content

Commit ae42148

Browse files
authored
rpc: fix connection tracking set in Server (#26180)
rpc: fix connection tracking in Server When upgrading to mapset/v2 with generics, the set element type used in rpc.Server had to be changed to *ServerCodec because ServerCodec is not 'comparable'. While the distinction is technically correct, we know all possible ServerCodec types, and all of them are comparable. So just use a map instead.
1 parent 9afc681 commit ae42148

File tree

1 file changed

+36
-14
lines changed

1 file changed

+36
-14
lines changed

rpc/server.go

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ package rpc
1919
import (
2020
"context"
2121
"io"
22+
"sync"
2223
"sync/atomic"
2324

24-
mapset "github.com/deckarep/golang-set/v2"
2525
"github.com/ethereum/go-ethereum/log"
2626
)
2727

@@ -45,13 +45,19 @@ const (
4545
type Server struct {
4646
services serviceRegistry
4747
idgen func() ID
48-
run int32
49-
codecs mapset.Set[*ServerCodec]
48+
49+
mutex sync.Mutex
50+
codecs map[ServerCodec]struct{}
51+
run int32
5052
}
5153

5254
// NewServer creates a new server instance with no registered handlers.
5355
func NewServer() *Server {
54-
server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet[*ServerCodec](), run: 1}
56+
server := &Server{
57+
idgen: randomIDGenerator(),
58+
codecs: make(map[ServerCodec]struct{}),
59+
run: 1,
60+
}
5561
// Register the default service providing meta information about the RPC service such
5662
// as the services and methods it offers.
5763
rpcService := &RPCService{server}
@@ -75,20 +81,34 @@ func (s *Server) RegisterName(name string, receiver interface{}) error {
7581
func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
7682
defer codec.close()
7783

78-
// Don't serve if server is stopped.
79-
if atomic.LoadInt32(&s.run) == 0 {
84+
if !s.trackCodec(codec) {
8085
return
8186
}
82-
83-
// Add the codec to the set so it can be closed by Stop.
84-
s.codecs.Add(&codec)
85-
defer s.codecs.Remove(&codec)
87+
defer s.untrackCodec(codec)
8688

8789
c := initClient(codec, s.idgen, &s.services)
8890
<-codec.closed()
8991
c.Close()
9092
}
9193

94+
func (s *Server) trackCodec(codec ServerCodec) bool {
95+
s.mutex.Lock()
96+
defer s.mutex.Unlock()
97+
98+
if atomic.LoadInt32(&s.run) == 0 {
99+
return false // Don't serve if server is stopped.
100+
}
101+
s.codecs[codec] = struct{}{}
102+
return true
103+
}
104+
105+
func (s *Server) untrackCodec(codec ServerCodec) {
106+
s.mutex.Lock()
107+
defer s.mutex.Unlock()
108+
109+
delete(s.codecs, codec)
110+
}
111+
92112
// serveSingleRequest reads and processes a single RPC request from the given codec. This
93113
// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in
94114
// this mode.
@@ -120,12 +140,14 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
120140
// requests to finish, then closes all codecs which will cancel pending requests and
121141
// subscriptions.
122142
func (s *Server) Stop() {
143+
s.mutex.Lock()
144+
defer s.mutex.Unlock()
145+
123146
if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
124147
log.Debug("RPC server shutting down")
125-
s.codecs.Each(func(c *ServerCodec) bool {
126-
(*c).close()
127-
return true
128-
})
148+
for codec := range s.codecs {
149+
codec.close()
150+
}
129151
}
130152
}
131153

0 commit comments

Comments
 (0)