Skip to content

Commit cc777fc

Browse files
lcgashlvchenguangzhcoderabbitai[bot]
authored
feat: add ping for sse server (#80)
* feat: add ping for sse server * fix ping message * Update server/sse.go Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * Update server/sse.go Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * Update sse.go * Update sse.go * Update sse.go * Update sse.go * fix ping message --------- Co-authored-by: lvchenguang <[email protected]> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent c7390fe commit cc777fc

File tree

1 file changed

+52
-13
lines changed

1 file changed

+52
-13
lines changed

server/sse.go

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strings"
1111
"sync"
1212
"sync/atomic"
13+
"time"
1314

1415
"github.com/google/uuid"
1516
"github.com/mark3labs/mcp-go/mcp"
@@ -52,15 +53,18 @@ var _ ClientSession = (*sseSession)(nil)
5253
// SSEServer implements a Server-Sent Events (SSE) based MCP server.
5354
// It provides real-time communication capabilities over HTTP using the SSE protocol.
5455
type SSEServer struct {
55-
server *MCPServer
56-
baseURL string
57-
basePath string
58-
messageEndpoint string
59-
useFullURLForMessageEndpoint bool
60-
sseEndpoint string
61-
sessions sync.Map
62-
srv *http.Server
63-
contextFunc SSEContextFunc
56+
server *MCPServer
57+
baseURL string
58+
basePath string
59+
useFullURLForMessageEndpoint bool
60+
messageEndpoint string
61+
sseEndpoint string
62+
sessions sync.Map
63+
srv *http.Server
64+
contextFunc SSEContextFunc
65+
66+
keepAlive bool
67+
keepAliveInterval time.Duration
6468
}
6569

6670
// SSEOption defines a function type for configuring SSEServer
@@ -130,6 +134,19 @@ func WithHTTPServer(srv *http.Server) SSEOption {
130134
}
131135
}
132136

137+
func WithKeepAliveInterval(keepAliveInterval time.Duration) SSEOption {
138+
return func(s *SSEServer) {
139+
s.keepAlive = true
140+
s.keepAliveInterval = keepAliveInterval
141+
}
142+
}
143+
144+
func WithKeepAlive(keepAlive bool) SSEOption {
145+
return func(s *SSEServer) {
146+
s.keepAlive = keepAlive
147+
}
148+
}
149+
133150
// WithContextFunc sets a function that will be called to customise the context
134151
// to the server using the incoming request.
135152
func WithSSEContextFunc(fn SSEContextFunc) SSEOption {
@@ -141,10 +158,12 @@ func WithSSEContextFunc(fn SSEContextFunc) SSEOption {
141158
// NewSSEServer creates a new SSE server instance with the given MCP server and options.
142159
func NewSSEServer(server *MCPServer, opts ...SSEOption) *SSEServer {
143160
s := &SSEServer{
144-
server: server,
145-
sseEndpoint: "/sse",
146-
messageEndpoint: "/messages",
147-
useFullURLForMessageEndpoint: true,
161+
server: server,
162+
sseEndpoint: "/sse",
163+
messageEndpoint: "/message",
164+
useFullURLForMessageEndpoint: true,
165+
keepAlive: false,
166+
keepAliveInterval: 10 * time.Second,
148167
}
149168

150169
// Apply all options
@@ -255,6 +274,26 @@ func (s *SSEServer) handleSSE(w http.ResponseWriter, r *http.Request) {
255274
}
256275
}()
257276

277+
// Start keep alive : ping
278+
if s.keepAlive {
279+
go func() {
280+
ticker := time.NewTicker(s.keepAliveInterval)
281+
defer ticker.Stop()
282+
for {
283+
select {
284+
case <-ticker.C:
285+
//: ping - 2025-03-27 07:44:38.682659+00:00
286+
session.eventQueue <- fmt.Sprintf(":ping - %s\n\n", time.Now().Format(time.RFC3339))
287+
case <-session.done:
288+
return
289+
case <-r.Context().Done():
290+
return
291+
}
292+
}
293+
}()
294+
}
295+
296+
258297
// Send the initial endpoint event
259298
fmt.Fprintf(w, "event: endpoint\ndata: %s\r\n\r\n", s.GetMessageEndpointForClient(sessionID))
260299
flusher.Flush()

0 commit comments

Comments
 (0)