Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 36 additions & 14 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package rpc
import (
"context"
"io"
"sync"
"sync/atomic"

mapset "github.com/deckarep/golang-set/v2"
"github.com/ethereum/go-ethereum/log"
)

Expand All @@ -45,13 +45,19 @@ const (
type Server struct {
services serviceRegistry
idgen func() ID
run int32
codecs mapset.Set[*ServerCodec]

mutex sync.Mutex
codecs map[ServerCodec]struct{}
run int32
}

// NewServer creates a new server instance with no registered handlers.
func NewServer() *Server {
server := &Server{idgen: randomIDGenerator(), codecs: mapset.NewSet[*ServerCodec](), run: 1}
server := &Server{
idgen: randomIDGenerator(),
codecs: make(map[ServerCodec]struct{}),
run: 1,
}
// Register the default service providing meta information about the RPC service such
// as the services and methods it offers.
rpcService := &RPCService{server}
Expand All @@ -75,20 +81,34 @@ func (s *Server) RegisterName(name string, receiver interface{}) error {
func (s *Server) ServeCodec(codec ServerCodec, options CodecOption) {
defer codec.close()

// Don't serve if server is stopped.
if atomic.LoadInt32(&s.run) == 0 {
if !s.trackCodec(codec) {
return
}

// Add the codec to the set so it can be closed by Stop.
s.codecs.Add(&codec)
defer s.codecs.Remove(&codec)
defer s.untrackCodec(codec)

c := initClient(codec, s.idgen, &s.services)
<-codec.closed()
c.Close()
}

func (s *Server) trackCodec(codec ServerCodec) bool {
s.mutex.Lock()
defer s.mutex.Unlock()

if atomic.LoadInt32(&s.run) == 0 {
return false // Don't serve if server is stopped.
}
s.codecs[codec] = struct{}{}
return true
}

func (s *Server) untrackCodec(codec ServerCodec) {
s.mutex.Lock()
defer s.mutex.Unlock()

delete(s.codecs, codec)
}

// serveSingleRequest reads and processes a single RPC request from the given codec. This
// is used to serve HTTP connections. Subscriptions and reverse calls are not allowed in
// this mode.
Expand Down Expand Up @@ -120,12 +140,14 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
// requests to finish, then closes all codecs which will cancel pending requests and
// subscriptions.
func (s *Server) Stop() {
s.mutex.Lock()
defer s.mutex.Unlock()

if atomic.CompareAndSwapInt32(&s.run, 1, 0) {
log.Debug("RPC server shutting down")
s.codecs.Each(func(c *ServerCodec) bool {
(*c).close()
return true
})
for codec := range s.codecs {
codec.close()
}
}
}

Expand Down