-
Notifications
You must be signed in to change notification settings - Fork 461
Implement MCP Streamable HTTP Server #228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThis change adds a comprehensive Streamable HTTP transport implementation for the Model Context Protocol (MCP) in Go, including a server with session and event stream management, client transport enhancements for resumability, extensive documentation, example programs demonstrating usage, configuration option implementations, and thorough test coverage for server behavior and origin validation. Changes
Possibly related PRs
Suggested reviewers
Tip ⚡️ Faster reviews with caching
Enjoy the performance boost—your workflow just got faster. ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (17)
server/streamable_http.go (5)
424-429
: Guardhttp.Flusher
access
w.(http.Flusher)
will panic if the underlying writer does not implementFlusher
(e.g. certain test doubles). Always assert the capability first.- w.(http.Flusher).Flush() + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + }
508-555
: Notification fan-out may silently drop messages
eventChan
is buffered with capacity 10. When the buffer is full the fallback is to block; however the select defaults to returning on<-notifDone>
.
If the client is slow, notifications will be lost without feedback.
Consider:
- Using a larger buffer or an unbounded queue.
- Back-pressure mechanisms (e.g. context cancellation).
- Logging or propagating the overflow error.
748-759
:event
argument is unused in callers – clarify or remove
writeSSEEvent
allows specifying anevent:
field but nothing in this file invokes it with a non-empty value. If custom event names are required, document and use them; otherwise drop the parameter to avoid confusion.
323-329
: Errors from notification handling are discarded
server.HandleMessage
may return an error for notifications (e.g. malformed data). Returning202 Accepted
unconditionally can hide failures.
At minimum, log the error and return400/500
when appropriate.
829-845
: Dead code –validateSession
is never usedThe helper looks correct but is not referenced anywhere. Remove it or wire it into
handlePost
/handleGet
to ensure consistent validation.examples/minimal_client/main.go (2)
24-26
: Shared context may cancel long-running requestsBoth the
initialize
andtools/call
requests reuse the same 30 s context. If the first call consumes most of the timeout, the second may fail prematurely. Prefer deriving a fresh context per operation.initCtx, cancel := context.WithTimeout(ctx, 30*time.Second) ... callCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
21-22
: IgnoreClose
error
trans.Close()
returns an error (according to transport interface) but it is discarded. Consider logging it to aid debugging of lingering connections.examples/minimal_server/main.go (1)
64-69
: Handlehttp.ErrServerClosed
gracefullyWhen
Shutdown
is invoked the listener returnshttp.ErrServerClosed
, which is expected and should not be treated as fatal.-if err := streamableServer.Start(":8080"); err != nil { - log.Fatalf("Failed to start server: %v", err) -} +if err := streamableServer.Start(":8080"); err != nil && err != http.ErrServerClosed { + log.Fatalf("Failed to start server: %v", err) +}examples/streamable_http_client/main.go (1)
1-56
: Well-structured example with clear flow and good practicesThis example demonstrates a streamable HTTP client with all essential components: transport creation, notification handling, session initialization, and response display. The code follows Go best practices with proper error handling and resource cleanup.
Consider adding more detailed comments explaining the expected behavior and outputs at key points, especially for the notification handler. For instance, explaining what kind of notifications might be received and how they're structured would help users understand the example better.
examples/streamable_http_client_complete/main.go (1)
1-131
: Comprehensive client example with complete MCP workflowThis example builds on the basic client by adding tool listing, tool invocation, and notification handling with signal management. The structure is logical and demonstrates a complete interaction flow with an MCP server.
Consider adding comments explaining the expected notification content from the echo tool, particularly in the notification handler setup (lines 27-31) or near the waiting section (lines 116-119). This would help users understand what to expect when running the example.
examples/streamable_http_server/main.go (1)
61-66
: Consider adding context check in notification goroutineThe notification goroutine doesn't check if the context is still valid before sending the notification, which could lead to errors if the server is shutting down.
go func() { time.Sleep(1 * time.Second) + // Check if context is still valid before sending notification + select { + case <-ctx.Done(): + return + default: mcpServer.SendNotificationToClient(ctx, "echo/notification", map[string]interface{}{ "message": "Echo notification: " + message, }) + } }()README-streamable-http.md (3)
22-25
: Fix bullet point formattingThe bullet points in this section have loose punctuation marks that should be fixed for better readability.
### Key Components - `StreamableHTTPServer`: The main server implementation that handles HTTP requests and responses - `streamableHTTPSession`: Represents an active session with a client - `EventStore`: Interface for storing and retrieving events for resumability - `InMemoryEventStore`: A simple in-memory implementation of the EventStore interface🧰 Tools
🪛 LanguageTool
[uncategorized] ~22-~22: Loose punctuation mark.
Context: ...Key Components -StreamableHTTPServer
: The main server implementation that han...(UNLIKELY_OPENING_PUNCTUATION)
34-37
: Consider expanding the client implementation sectionThe client implementation section is quite brief compared to the server section. Adding more details about the client's key components, options, and design considerations would make the documentation more balanced.
Consider expanding this section to include:
- Key client components
- Client options (similar to server options section)
- Design considerations specific to the client implementation
273-302
: Add a troubleshooting sectionThe documentation would benefit from a troubleshooting section that addresses common issues users might encounter, such as connection problems, session expiration, or event replay failures.
Consider adding a "Troubleshooting" section that covers:
- Common error scenarios and their solutions
- Debugging tips (e.g., enabling verbose logging)
- Best practices for error handling in both client and server implementations
server/streamable_http_test.go (3)
73-92
: Consider flattening nested conditionals for better readabilityThe nested conditionals for validating the response structure could be simplified for better readability and easier debugging when tests fail.
- if result, ok := response["result"].(map[string]interface{}); ok { - if serverInfo, ok := result["serverInfo"].(map[string]interface{}); ok { - if serverInfo["name"] != "test-server" { - t.Errorf("Expected server name test-server, got %v", serverInfo["name"]) - } - if serverInfo["version"] != "1.0.0" { - t.Errorf("Expected server version 1.0.0, got %v", serverInfo["version"]) - } - } else { - t.Errorf("Expected serverInfo in result, got none") - } - } else { - t.Errorf("Expected result in response, got none") - } + result, ok := response["result"].(map[string]interface{}) + if !ok { + t.Fatalf("Expected result in response, got none") + } + + serverInfo, ok := result["serverInfo"].(map[string]interface{}) + if !ok { + t.Fatalf("Expected serverInfo in result, got none") + } + + if serverInfo["name"] != "test-server" { + t.Errorf("Expected server name test-server, got %v", serverInfo["name"]) + } + + if serverInfo["version"] != "1.0.0" { + t.Errorf("Expected server version 1.0.0, got %v", serverInfo["version"]) + }
257-257
: Replace hard-coded sleep with a more reliable mechanismUsing a fixed sleep duration can lead to flaky tests on different environments or under different load conditions.
Consider using a more robust synchronization mechanism or at least making the sleep duration configurable:
- // Wait a bit for the stream to be established - time.Sleep(100 * time.Millisecond) + // Wait for the stream to be established + streamEstablishmentTimeout := 200 * time.Millisecond + time.Sleep(streamEstablishmentTimeout)For even better reliability, you could implement a ready signal from the server once the stream is established.
293-312
: Implement more robust SSE event parsingThe current SSE event parsing is simplified and assumes a specific format. A more robust implementation would handle multiple data lines, event IDs, and other SSE features.
Consider implementing a more comprehensive SSE parser or using the same timeout pattern recommended for the previous event reading loop.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
README-streamable-http.md
(1 hunks)examples/minimal_client/main.go
(1 hunks)examples/minimal_server/main.go
(1 hunks)examples/streamable_http_client/main.go
(1 hunks)examples/streamable_http_client_complete/main.go
(1 hunks)examples/streamable_http_server/main.go
(1 hunks)server/streamable_http.go
(1 hunks)server/streamable_http_test.go
(1 hunks)
🧰 Additional context used
🪛 LanguageTool
README-streamable-http.md
[uncategorized] ~22-~22: Loose punctuation mark.
Context: ...Key Components - StreamableHTTPServer
: The main server implementation that han...
(UNLIKELY_OPENING_PUNCTUATION)
[uncategorized] ~29-~29: Loose punctuation mark.
Context: ...rver Options - WithSessionIDGenerator
: Sets a custom session ID generator - `W...
(UNLIKELY_OPENING_PUNCTUATION)
🔇 Additional comments (6)
server/streamable_http.go (1)
448-457
: 🛠️ Refactor suggestionDeferred close order can panic – potential send-on-closed-channel
defer close(eventChan)
is registered beforedefer close(notifDone)
.
If the goroutine is still attempting to send oneventChan
whenhandleSSEResponse
returns, closingeventChan
first can trigger a panic.
Reverse the order or signal the goroutine before the channel is closed.- defer close(eventChan) - ... - notifDone := make(chan struct{}) - defer close(notifDone) + notifDone := make(chan struct{}) + defer close(notifDone) // signal first + defer close(eventChan) // close after goroutine stopsLikely an incorrect or invalid review comment.
examples/streamable_http_client_complete/main.go (1)
120-130
: Good signal handling pattern for graceful terminationThe signal handling implementation with a select statement provides a clean way to wait for either notifications or termination signals.
examples/streamable_http_server/main.go (1)
1-97
: Well-structured server example with proper lifecycle managementThe example demonstrates a complete MCP server with streamable HTTP transport, including tool registration, async notification handling, and graceful shutdown. The code follows good practices for server implementation in Go.
README-streamable-http.md (1)
1-302
: Comprehensive and well-structured documentationThe README provides thorough documentation of the MCP Streamable HTTP implementation, covering both server and client sides, with clear examples and protocol details. It aligns well with the provided example code and offers valuable context for users.
🧰 Tools
🪛 LanguageTool
[uncategorized] ~22-~22: Loose punctuation mark.
Context: ...Key Components -StreamableHTTPServer
: The main server implementation that han...(UNLIKELY_OPENING_PUNCTUATION)
[uncategorized] ~29-~29: Loose punctuation mark.
Context: ...rver Options -WithSessionIDGenerator
: Sets a custom session ID generator - `W...(UNLIKELY_OPENING_PUNCTUATION)
server/streamable_http_test.go (2)
16-33
: LGTM - Test setup looks goodThe initial setup is clear and concise, properly initializing the MCP server with required capabilities, configuring the StreamableHTTPServer, and creating a test HTTP server.
336-401
: Session termination test is well structuredThe session termination test effectively verifies both the successful termination of a session and the expected behavior when attempting to use a terminated session.
This is great. One thing though. We recently merged some code around session management so that should be used in this PR as well. https://github.com/mark3labs/mcp-go/blob/main/server/session.go |
) | ||
|
||
// streamableHTTPSession represents an active Streamable HTTP connection. | ||
type streamableHTTPSession struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should implement SessionWithTools
as shown here https://github.com/mark3labs/mcp-go/blob/main/server/session.go#L22-L31
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see #232 where I implemented it for SSEServer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, Please help to review again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems there is a race condition now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide some more details? Like how to reproduce it. I can try to debug it.
+1 Thanks for your work. I am waiting this is approved to implement my idea where http streaming would be the best option |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (2)
server/streamable_http_test.go (1)
160-178
: Add timeout to prevent test hanging.The event reading loop lacks a timeout mechanism which could cause the test to hang indefinitely if there's an issue with the event stream.
server/streamable_http.go (1)
391-392
:⚠️ Potential issueFix redundant check for stateless mode.
This check has the same issue as the previous one -
s.sessionIDGenerator()
will never return an empty string.- // Only create a session if we're not in stateless mode - if s.sessionIDGenerator() != "" { + // Only create a session if we're not in stateless mode + if !s.stateless {
🧹 Nitpick comments (8)
server/streamable_http_test.go (2)
355-356
: Simplify string creation by avoiding unnecessary fmt.Sprintf.The
fmt.Sprintf
is unnecessary when the entire string is static.- rawNotification := fmt.Sprintf(`{"jsonrpc":"2.0","method":"test/notification","params":{"message":"Hello, world!"}}`) + rawNotification := `{"jsonrpc":"2.0","method":"test/notification","params":{"message":"Hello, world!"}}`🧰 Tools
🪛 golangci-lint (1.64.8)
355-355: S1039: unnecessary use of fmt.Sprintf
(gosimple)
356-370
: Consider removing redundant manual notification handling.This code creates a fallback notification in case the actual notification doesn't have the expected format, but it's unnecessary if the notification format is consistent. The test should verify what was actually sent rather than creating a second fallback notification.
- // Create a notification with the correct format for testing - rawNotification := fmt.Sprintf(`{"jsonrpc":"2.0","method":"test/notification","params":{"message":"Hello, world!"}}`) - - // Parse the raw notification - var manualNotification map[string]interface{} - if err := json.Unmarshal([]byte(rawNotification), &manualNotification); err != nil { - t.Fatalf("Failed to decode manual notification: %v", err) - } - - // Check if message exists in params - message, ok := params["message"] - if !ok { - // If message doesn't exist in params, use the manual notification for testing - manualParams := manualNotification["params"].(map[string]interface{}) - message = manualParams["message"] - t.Logf("Using manual notification for testing") - } + // Check if message exists in params + message, ok := params["message"] + if !ok { + t.Errorf("Expected message in params, but not found") + return + }server/streamable_http.go (6)
22-22
: Remove unused field lastEventID.The
lastEventID
field is declared but never used in the code.type streamableHTTPSession struct { sessionID string notificationChannel chan mcp.JSONRPCNotification initialized atomic.Bool - lastEventID string eventStore EventStore sessionTools sync.Map // Maps tool name to ServerTool }
🧰 Tools
🪛 golangci-lint (1.64.8)
22-22: field
lastEventID
is unused(unused)
214-214
: Remove unused field streamMapping.The
streamMapping
field is declared but only used in the unusedwriteSSEEvent
method.type StreamableHTTPServer struct { server *MCPServer baseURL string basePath string endpoint string sessions sync.Map // Maps sessionID to ClientSession srv *http.Server contextFunc SSEContextFunc sessionIDGenerator func() string enableJSONResponse bool eventStore EventStore standaloneStreamID string - streamMapping sync.Map // Maps streamID to response writer requestToStreamMap sync.Map // Maps requestID to streamID }
🧰 Tools
🪛 golangci-lint (1.64.8)
214-214: field
streamMapping
is unused(unused)
683-711
: Remove unused method writeSSEEvent.This method is never called in the codebase. It appears to be leftover code from an earlier implementation approach.
- // writeSSEEvent writes an SSE event to the given stream - func (s *StreamableHTTPServer) writeSSEEvent(streamID string, event string, message mcp.JSONRPCMessage) error { - // Get the stream channel - streamChanI, ok := s.streamMapping.Load(streamID) - if !ok { - return fmt.Errorf("stream not found: %s", streamID) - } - - streamChan, ok := streamChanI.(chan string) - if !ok { - return fmt.Errorf("invalid stream channel type") - } - - // Marshal the message - data, err := json.Marshal(message) - if err != nil { - return err - } - - // Create the event data - eventData := fmt.Sprintf("event: %s\ndata: %s\n\n", event, data) - - // Send the event to the channel - select { - case streamChan <- eventData: - return nil - default: - return fmt.Errorf("stream channel full") - } - }🧰 Tools
🪛 golangci-lint (1.64.8)
683-683: func
(*StreamableHTTPServer).writeSSEEvent
is unused(unused)
763-778
: Remove unused method validateSession.This method is defined but never called in the codebase.
- // validateSession checks if the session ID is valid and the session is initialized - func (s *StreamableHTTPServer) validateSession(sessionID string) bool { - // Check if the session ID is valid - if sessionID == "" { - return false - } - - // Check if the session exists - if sessionValue, ok := s.sessions.Load(sessionID); ok { - // Check if the session is initialized - if httpSession, ok := sessionValue.(*streamableHTTPSession); ok { - return httpSession.Initialized() - } - } - - return false - }🧰 Tools
🪛 golangci-lint (1.64.8)
763-763: func
(*StreamableHTTPServer).validateSession
is unused(unused)
496-497
: Use structured logging instead of fmt.Printf.Direct use of
fmt.Printf
for error logging is not ideal for a library. Consider using a proper logging interface or returning errors to the caller.- // Log the error but continue - fmt.Printf("Error replaying events: %v\n", err) + // If the server has a logger, use it + if s.server.logger != nil { + s.server.logger.Printf("Error replaying events: %v", err) + }Apply similar changes to other instances of
fmt.Printf
throughout the code.
621-622
: Simplify string creation by avoiding unnecessary fmt.Sprintf.The
fmt.Sprintf
is unnecessary when the entire string is static.- initialEvent := fmt.Sprintf("data: {\"jsonrpc\": \"2.0\", \"method\": \"connection/established\"}\n\n") + initialEvent := "data: {\"jsonrpc\": \"2.0\", \"method\": \"connection/established\"}\n\n"🧰 Tools
🪛 golangci-lint (1.64.8)
621-621: S1039: unnecessary use of fmt.Sprintf
(gosimple)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
server/streamable_http.go
(1 hunks)server/streamable_http_test.go
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
server/streamable_http.go (4)
mcp/types.go (4)
JSONRPCNotification
(206-209)JSONRPCMessage
(89-89)Request
(103-116)JSONRPCError
(219-232)server/server.go (1)
ServerTool
(50-53)server/session.go (2)
ClientSession
(11-20)SessionWithTools
(23-31)server/sse.go (1)
SSEContextFunc
(37-37)
🪛 golangci-lint (1.64.8)
server/streamable_http.go
22-22: field lastEventID
is unused
(unused)
214-214: field streamMapping
is unused
(unused)
683-683: func (*StreamableHTTPServer).writeSSEEvent
is unused
(unused)
763-763: func (*StreamableHTTPServer).validateSession
is unused
(unused)
621-621: S1039: unnecessary use of fmt.Sprintf
(gosimple)
server/streamable_http_test.go
355-355: S1039: unnecessary use of fmt.Sprintf
(gosimple)
🔇 Additional comments (4)
server/streamable_http_test.go (1)
324-332
: LGTM! Good use of timeout and channel select pattern.The code efficiently waits for the notification to be read with a reasonable timeout, preventing test hangs.
server/streamable_http.go (3)
18-25
: Implement SessionWithTools interface as requested in PR comments.Good implementation of the
SessionWithTools
interface for the streamable HTTP session. This addresses the comment from reviewer 'ezynda3' about implementing the interface fromserver/session.go
.🧰 Tools
🪛 golangci-lint (1.64.8)
22-22: field
lastEventID
is unused(unused)
401-401
: Good implementation of session initialization.The code correctly initializes new sessions, which addresses a past review comment about sessions never being marked as initialized.
43-66
: LGTM! Clean implementation of SessionWithTools interface.The implementation of
GetSessionTools
andSetSessionTools
is thread-safe using async.Map
and properly handles tool mapping conversions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (8)
server/streamable_http.go (8)
220-223
: Remove unused field streamMappingThe
streamMapping
field is only used in thewriteSSEEvent
method, which itself is unused according to static analysis.standaloneStreamID string - streamMapping sync.Map // Maps streamID to response writer requestToStreamMap sync.Map // Maps requestID to streamID statelessMode bool
🧰 Tools
🪛 golangci-lint (1.64.8)
221-221: field
streamMapping
is unused(unused)
628-629
: Simplify string formattingUse a simple string literal instead of
fmt.Sprintf
when no formatting is needed.- initialEvent := fmt.Sprintf("data: {\"jsonrpc\": \"2.0\", \"method\": \"connection/established\"}\n\n") + initialEvent := "data: {\"jsonrpc\": \"2.0\", \"method\": \"connection/established\"}\n\n"🧰 Tools
🪛 golangci-lint (1.64.8)
628-628: S1039: unnecessary use of fmt.Sprintf
(gosimple)
690-718
: Remove unused writeSSEEvent methodThe
writeSSEEvent
method is never called in the codebase. Consider removing it unless it's intended for future use.-// writeSSEEvent writes an SSE event to the given stream -func (s *StreamableHTTPServer) writeSSEEvent(streamID string, event string, message mcp.JSONRPCMessage) error { - // Get the stream channel - streamChanI, ok := s.streamMapping.Load(streamID) - if !ok { - return fmt.Errorf("stream not found: %s", streamID) - } - - streamChan, ok := streamChanI.(chan string) - if !ok { - return fmt.Errorf("invalid stream channel type") - } - - // Marshal the message - data, err := json.Marshal(message) - if err != nil { - return err - } - - // Create the event data - eventData := fmt.Sprintf("event: %s\ndata: %s\n\n", event, data) - - // Send the event to the channel - select { - case streamChan <- eventData: - return nil - default: - return fmt.Errorf("stream channel full") - } -}🧰 Tools
🪛 golangci-lint (1.64.8)
690-690: func
(*StreamableHTTPServer).writeSSEEvent
is unused(unused)
770-785
: Remove unused validateSession methodThe
validateSession
method is never called in the codebase. Consider removing it unless it's intended for future use.-// validateSession checks if the session ID is valid and the session is initialized -func (s *StreamableHTTPServer) validateSession(sessionID string) bool { - // Check if the session ID is valid - if sessionID == "" { - return false - } - - // Check if the session exists - if sessionValue, ok := s.sessions.Load(sessionID); ok { - // Check if the session is initialized - if httpSession, ok := sessionValue.(*streamableHTTPSession); ok { - return httpSession.Initialized() - } - } - - return false -}🧰 Tools
🪛 golangci-lint (1.64.8)
770-770: func
(*StreamableHTTPServer).validateSession
is unused(unused)
22-22
: Remove unused field lastEventIDThe
lastEventID
field instreamableHTTPSession
is declared but never used.notificationChannel chan mcp.JSONRPCNotification initialized atomic.Bool - lastEventID string eventStore EventStore
🧰 Tools
🪛 golangci-lint (1.64.8)
22-22: field
lastEventID
is unused(unused)
502-504
: Add structured logging instead of fmt.PrintfReplace
fmt.Printf
with a proper logging mechanism for error reporting.- // Log the error but continue - fmt.Printf("Error replaying events: %v\n", err) + // Log the error but continue + // Consider using a proper logger like logrus or zap + // Or if you prefer to keep it simple: + log.Printf("Error replaying events: %v", err)
522-523
: Add structured logging instead of fmt.PrintfSimilar to the previous comment, replace
fmt.Printf
with a proper logging mechanism.- // Log the error but continue - fmt.Printf("Error storing event: %v\n", storeErr) + // Log the error but continue + // Consider using a proper logger like logrus or zap + // Or if you prefer to keep it simple: + log.Printf("Error storing event: %v", storeErr)
559-560
: Add structured logging instead of fmt.PrintfAgain, replace
fmt.Printf
with a proper logging mechanism.- // Log the error but continue - fmt.Printf("Error storing event: %v\n", storeErr) + // Log the error but continue + // Consider using a proper logger like logrus or zap + // Or if you prefer to keep it simple: + log.Printf("Error storing event: %v", storeErr)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
server/streamable_http.go
(1 hunks)
🧰 Additional context used
🪛 golangci-lint (1.64.8)
server/streamable_http.go
22-22: field lastEventID
is unused
(unused)
221-221: field streamMapping
is unused
(unused)
690-690: func (*StreamableHTTPServer).writeSSEEvent
is unused
(unused)
770-770: func (*StreamableHTTPServer).validateSession
is unused
(unused)
628-628: S1039: unnecessary use of fmt.Sprintf
(gosimple)
🔇 Additional comments (6)
server/streamable_http.go (6)
17-70
: Implementation satisfies the SessionWithTools interface correctly.The session struct and methods are well-implemented. This appropriately addresses the requirements from previous comments to implement the
SessionWithTools
interface.🧰 Tools
🪛 golangci-lint (1.64.8)
22-22: field
lastEventID
is unused(unused)
72-166
: Good implementation of the event store for resumability.The
EventStore
interface andInMemoryEventStore
implementation provide a solid foundation for event storage and replay, which is essential for session resumability. The implementation correctly uses mutex locking for thread safety and has proper error handling.
168-206
: Well-structured configuration options using the functional options pattern.The server configuration options are well-designed using the functional options pattern, which provides flexibility and clear intent when configuring the server.
387-390
: Correctly implemented stateless mode check.The stateless mode check has been properly implemented, fixing a previous issue where the check was using
s.sessionIDGenerator() != ""
, which would never work correctly.
407-409
: Session is now correctly initialized.The session is now properly initialized before being stored and registered, addressing a previous issue where sessions were never marked as initialized.
1-787
: Overall implementation looks solid with good attention to detail.The implementation of the Streamable HTTP transport protocol is well-structured and covers all the key aspects including session management, event streaming, resumability, and proper HTTP protocol handling. A few minor improvements have been suggested, but the core functionality is robust.
🧰 Tools
🪛 golangci-lint (1.64.8)
22-22: field
lastEventID
is unused(unused)
221-221: field
streamMapping
is unused(unused)
690-690: func
(*StreamableHTTPServer).writeSSEEvent
is unused(unused)
770-770: func
(*StreamableHTTPServer).validateSession
is unused(unused)
628-628: S1039: unnecessary use of fmt.Sprintf
(gosimple)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @tendant!
There are a number of features in SSEServer that are not present in the new StreamableHTTPServer that I think we should add:
// Keep-alive support
WithKeepAliveInterval(keepAliveInterval time.Duration)
WithKeepAlive(keepAlive bool)
// Dynamic path support
WithDynamicBasePath(fn DynamicBasePathFunc)
// Explicit handler mounting
func (s *SSEServer) SSEHandler() http.Handler
func (s *SSEServer) MessageHandler() http.Handler
These were added in #80 + #169 (for Ping support) and #214 (for dynamic path support) IIRC (if it helps to review them independently).
These are not defined in any interface, If we need similar features, we will have to use different function name if they are in the same Is this acceptable? Doesn't look like a good solution. Same logic applies to If we would like to support those options across multiple type of servers, it might better to create an option interface first. |
Good point! I'll work on a PR for that this evening. Then once it lands it ought to be straightforward to use here. |
Sounds good. Please let me know as soon as it is in main branch, I will update this PR to use the new interface. |
Alternatively, we could move the different implementations (SSE and HTTP Streamable) into their own dedicated packages. This way, the options would be isolated per implementation and could be treated as internal details, rather than shared across all server types. |
server/streamable_http.go
Outdated
// Make sure the notification is properly formatted as a JSON-RPC message | ||
// The test expects a specific format with jsonrpc, method, and params fields | ||
fmt.Fprintf(w, "data: %s\n\n", data) | ||
w.(http.Flusher).Flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Go's net/http package, the http.ResponseWriter is not safe for concurrent use. If multiple goroutines write to or flush the same response writer at the same time, it can result in data races.
Writing to the response is safe only if done entirely within the HTTP handler's goroutine, just like how flush is handled within the handleSSE goroutine in the current main branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 9
🧹 Nitpick comments (4)
server/streamable_http.go (4)
118-166
:ReplayEventsAfter
has O(streams × events) lookup – consider indexingFor every replay you iterate all streams and all events just to locate
lastEventID
.
On a busy long-lived server this can degrade badly.Maintain an auxiliary map
eventID → (streamID, index)
when storing events to
reduce the lookup to O(1).
280-299
:basePath
cannot be configured – path matching brittle
endpoint := s.basePath + s.endpoint
assumes callers will mutate the private
field manually. Provide an explicit option, e.g.WithBasePath
, and document
how to combine with a reverse-proxy prefix.
653-653
: Minor: simplify constant write
fmt.Sprintf
is unnecessary for a static string.- initialEvent := fmt.Sprintf("data: {\"jsonrpc\": \"2.0\", \"method\": \"connection/established\"}\n\n") + const initialEvent = "data: {\"jsonrpc\": \"2.0\", \"method\": \"connection/established\"}\n\n"🧰 Tools
🪛 golangci-lint (1.64.8)
653-653: S1039: unnecessary use of fmt.Sprintf
(gosimple)
745-784
:splitAndTrim
re-implements header parsingGo’s
mime.ParseMediaType
ornet/http
’sHeader.Values
can handle quoted
values & params robustly. Consider leveraging std-lib to avoid subtle bugs.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
server/streamable_http.go
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
server/streamable_http.go (5)
mcp/types.go (4)
JSONRPCNotification
(206-209)JSONRPCMessage
(89-89)Request
(103-116)JSONRPCError
(219-232)server/server.go (1)
ServerTool
(50-53)server/session.go (2)
ClientSession
(11-20)SessionWithTools
(23-31)server/sse.go (1)
SSEContextFunc
(37-37)mcptest/mcptest.go (2)
Server
(20-38)NewServer
(41-50)
🪛 golangci-lint (1.64.8)
server/streamable_http.go
22-22: field lastEventID
is unused
(unused)
221-221: field streamMapping
is unused
(unused)
715-715: func (*StreamableHTTPServer).writeSSEEvent
is unused
(unused)
795-795: func (*StreamableHTTPServer).validateSession
is unused
(unused)
653-653: S1039: unnecessary use of fmt.Sprintf
(gosimple)
460-460: SA4009: argument ctx is overwritten before first use
(staticcheck)
584-584: SA4009(related information): assignment to ctx
(staticcheck)
// Shutdown gracefully stops the Streamable HTTP server, closing all active sessions | ||
// and shutting down the HTTP server. | ||
func (s *StreamableHTTPServer) Shutdown(ctx context.Context) error { | ||
if s.srv != nil { | ||
s.sessions.Range(func(key, value interface{}) bool { | ||
if session, ok := value.(ClientSession); ok { | ||
if httpSession, ok := session.(*streamableHTTPSession); ok { | ||
close(httpSession.notificationChannel) | ||
} | ||
} | ||
s.sessions.Delete(key) | ||
return true | ||
}) | ||
|
||
return s.srv.Shutdown(ctx) | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sessions are not unregistered during shutdown
You close the notification channel but never call server.UnregisterSession
,
potentially leaving dangling references in the core server.
Add the unregister call inside the loop before deleting:
if session, ok := value.(ClientSession); ok {
s.server.UnregisterSession(ctx, session.SessionID())
...
}
server/streamable_http.go
Outdated
server *MCPServer | ||
baseURL string | ||
basePath string | ||
endpoint string | ||
sessions sync.Map // Maps sessionID to ClientSession | ||
srv *http.Server | ||
contextFunc SSEContextFunc | ||
sessionIDGenerator func() string | ||
enableJSONResponse bool | ||
eventStore EventStore | ||
standaloneStreamID string | ||
streamMapping sync.Map // Maps streamID to response writer | ||
requestToStreamMap sync.Map // Maps requestID to streamID | ||
statelessMode bool | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused fields & helpers clutter the public surface
streamMapping
, requestToStreamMap
, writeSSEEvent
, and validateSession
are never referenced. Please either hook them up or delete to keep the code
lean and avoid misleading future contributors.
🧰 Tools
🪛 golangci-lint (1.64.8)
221-221: field streamMapping
is unused
(unused)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I've just pushed #253 up for review. I specifically included a stub struct for |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
server/streamable_http.go (1)
84-92
: 🛠️ Refactor suggestion
SetSessionTools
is still not concurrency-safe – races on map replacementReassigning
s.sessionTools = sync.Map{}
swaps the whole map object while other goroutines may still hold references obtained fromGetSessionTools
, re-introducing the race that was flagged in an earlier review.Re-use the existing map and delete keys in-place:
func (s *streamableHTTPSession) SetSessionTools(tools map[string]ServerTool) { - // Clear existing tools - s.sessionTools = sync.Map{} - - // Add new tools - for name, tool := range tools { - s.sessionTools.Store(name, tool) - } + // Clear existing tools safely + s.sessionTools.Range(func(k, _ interface{}) bool { + s.sessionTools.Delete(k) + return true + }) + // Add new tools + for name, tool := range tools { + s.sessionTools.Store(name, tool) + } }This eliminates the data race without allocating a new
sync.Map
.
🧹 Nitpick comments (7)
client/transport/streamable_http.go (1)
423-430
: Added GetLastEventId method for retrieving the last event ID.This method provides a clean API for accessing the last event ID, which is useful for clients that want to persist this value or check resumability status.
Consider adding a corresponding SetLastEventId method for API completeness and to allow clients to manually set the last event ID when needed (e.g., from a persisted value).
+// SetLastEventId sets the last event ID for resumability +func (c *StreamableHTTP) SetLastEventId(id string) { + c.lastEventID.Store(id) +}server/streamable_http_origin_validation_test.go (1)
65-67
: Clarify wildcard matching rule in the comment.The comment on line 66 correctly explains that the domain itself doesn't match the wildcard pattern, but it could be clarified to avoid confusion.
- "https://trusted-domain.com", // This doesn't match *.trusted-domain.com (needs a subdomain) + "https://trusted-domain.com", // This doesn't match *.trusted-domain.com (wildcard matches subdomains only, not the base domain)README-streamable-http.md (1)
76-82
: Add more details about origin validation.While the Origin header is mentioned in the HTTP Headers section, it would be helpful to add more details about the origin validation feature since it's an important security feature that prevents DNS rebinding attacks.
Consider adding a new section under "Implementation Notes" or "Security Considerations" that explains:
- How origin validation works
- How to configure the origin allowlist
- Support for wildcards in allowed origins
- Special handling for localhost and loopback addresses
- Behavior when no Origin header is present
server/streamable_http.go (4)
20-26
: Dead fieldlastEventID
– remove or implement
lastEventID
is never read or written, so it only pads the struct and confuses future maintainers. Either persist it when events are sent/replayed or delete the field for now.🧰 Tools
🪛 golangci-lint (1.64.8)
24-24: field
lastEventID
is unused(unused)
251-253
: Unused infrastructure (streamMapping
,writeSSEEvent
) – delete until needed
streamMapping
and its helperwriteSSEEvent
are never referenced. Keeping dormant code:
- Bloats the API surface.
- Risks bit-rot and subtle bugs when eventually wired up.
Unless a follow-up patch consumes these, drop them to keep the implementation lean.
Also applies to: 829-857
🧰 Tools
🪛 golangci-lint (1.64.8)
251-251: field
streamMapping
is unused(unused)
698-699
: Shadowingctx
hides the parameter – minor readability nit
ctx, cancel := context.WithTimeout(r.Context(), …)
shadows the function parameterctx
, triggeringstaticcheck
SA4009. Renaming avoids the warning and clarifies intent:-ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) +streamCtx, cancel := context.WithTimeout(r.Context(), 5*time.Second) … -case <-ctx.Done(): +case <-streamCtx.Done():🧰 Tools
🪛 golangci-lint (1.64.8)
698-698: SA4009(related information): assignment to ctx
(staticcheck)
764-765
: Unnecessaryfmt.Sprintf
allocation
initialEvent
is a static string; usingfmt.Sprintf
adds avoidable overhead.-initialEvent := fmt.Sprintf("data: {\"jsonrpc\": \"2.0\", \"method\": \"connection/established\"}\n\n") +initialEvent := "data: {\"jsonrpc\": \"2.0\", \"method\": \"connection/established\"}\n\n"🧰 Tools
🪛 golangci-lint (1.64.8)
764-764: S1039: unnecessary use of fmt.Sprintf
(gosimple)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
README-streamable-http.md
(1 hunks)client/transport/streamable_http.go
(8 hunks)server/streamable_http.go
(1 hunks)server/streamable_http_origin_test.go
(1 hunks)server/streamable_http_origin_validation_test.go
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
server/streamable_http_origin_validation_test.go (1)
server/streamable_http.go (3)
StreamableHTTPServer
(239-255)NewStreamableHTTPServer
(258-280)WithOriginAllowlist
(231-235)
client/transport/streamable_http.go (1)
client/client.go (1)
Client
(16-25)
server/streamable_http.go (5)
mcp/types.go (4)
JSONRPCNotification
(206-209)JSONRPCMessage
(89-89)Request
(103-116)JSONRPCError
(219-232)server/server.go (1)
ServerTool
(50-53)server/session.go (2)
ClientSession
(11-20)SessionWithTools
(23-31)server/http_transport_options.go (2)
StreamableHTTPOption
(43-46)HTTPContextFunc
(14-14)mcptest/mcptest.go (1)
Server
(20-38)
🪛 golangci-lint (1.64.8)
server/streamable_http.go
24-24: field lastEventID
is unused
(unused)
251-251: field streamMapping
is unused
(unused)
829-829: func (*StreamableHTTPServer).writeSSEEvent
is unused
(unused)
958-958: func (*StreamableHTTPServer).validateSession
is unused
(unused)
764-764: S1039: unnecessary use of fmt.Sprintf
(gosimple)
537-537: SA4009: argument ctx is overwritten before first use
(staticcheck)
698-698: SA4009(related information): assignment to ctx
(staticcheck)
🪛 LanguageTool
README-streamable-http.md
[uncategorized] ~30-~30: Loose punctuation mark.
Context: ...Key Components - StreamableHTTPServer
: The main server implementation that han...
(UNLIKELY_OPENING_PUNCTUATION)
[uncategorized] ~37-~37: Loose punctuation mark.
Context: ...rver Options - WithSessionIDGenerator
: Sets a custom session ID generator - `W...
(UNLIKELY_OPENING_PUNCTUATION)
[uncategorized] ~49-~49: Loose punctuation mark.
Context: ... ### Client Options - WithHTTPHeaders
: Sets custom HTTP headers for all reques...
(UNLIKELY_OPENING_PUNCTUATION)
🔇 Additional comments (15)
server/streamable_http_origin_test.go (3)
11-21
: Well-structured test setup for origin validation.The test creates a streamable HTTP server with a properly configured origin allowlist that includes both exact URLs and wildcard patterns. This setup is essential for verifying that the server correctly validates origin headers for cross-origin requests.
24-34
: Comprehensive test cases covering all origin scenarios.The test cases effectively cover all important origin validation scenarios:
- Exact match validation
- Wildcard subdomain matching
- Localhost allowance (security best practice)
- Invalid origin rejection
- No origin header handling
This thorough coverage ensures the origin validation feature works correctly to prevent DNS rebinding attacks.
36-66
: Well-implemented test execution with proper HTTP request handling.The test properly creates JSON-RPC requests, sets appropriate headers, and verifies the expected status codes. The use of a timeout in the HTTP client prevents tests from hanging indefinitely, and the proper deferred cleanup ensures resources are released.
client/transport/streamable_http.go (6)
45-46
: Documentation update reflects improved resumability support.The comment accurately reflects the current state of resumability implementation, which has been improved but is not yet complete.
55-56
: Added lastEventID field for resumability support.This field is essential for tracking the last event ID received from SSE streams, which enables the client to request resumption from the last known event when reconnecting after a disconnection.
78-79
: Proper initialization of the lastEventID field.Good practice to initialize the atomic value with an empty string to avoid nil checks later when retrieving the value.
176-180
: Added Last-Event-Id header to support resumability in requests.This implementation correctly adds the Last-Event-Id header to requests when a last event ID is available, which is necessary for the server to know from which event to resume the stream.
338-342
: Added logic to store the last event ID from SSE events.This correctly captures and stores the event ID from SSE messages, which is essential for resumability. The atomic storage ensures thread safety.
383-388
: Added Last-Event-Id header to notifications for consistency.Including the Last-Event-Id header in notification requests maintains consistency with regular requests and enables proper resumability for both types of communication.
server/streamable_http_origin_validation_test.go (3)
8-25
: Comprehensive test cases for origin validation.The test cases cover all important scenarios for origin validation:
- Empty origins
- Exact matches
- Non-matching origins
- Wildcard subdomain patterns
- Multi-level subdomains
- Localhost and loopback IP addresses
- Multiple allowlist entries
- Empty allowlists (which should allow all origins)
- Invalid URLs
This thorough coverage ensures the origin validation logic works correctly in all scenarios.
27-37
: Well-implemented test execution with clear error messages.The test properly creates a StreamableHTTPServer instance with the appropriate allowlist for each test case and checks if the isValidOrigin method returns the expected result. The error messages are descriptive and include both the input and expected output, which makes debugging failures easier.
39-56
: Good test for the WithOriginAllowlist option.This test verifies that the WithOriginAllowlist option correctly sets the allowlist on the StreamableHTTPServer instance. It properly checks both the length and content of the allowlist.
README-streamable-http.md (3)
5-16
: Comprehensive feature list for the implementation.The feature list provides a clear overview of the capabilities of the MCP Streamable HTTP implementation, covering all the important aspects like session management, SSE streaming, JSON responses, resumability, notifications, session termination, and security.
17-23
: Transparent about current limitations.The documentation honestly discloses the current limitations of the implementation, which is important for users to understand what features are not fully supported or implemented.
24-42
: Clear documentation of server implementation details.The section provides good information about the key components and available options for the server implementation. The use of bullet points makes it easy to scan and understand.
According to the static analysis hints, there might be loose punctuation marks in the bullet points. While the markdown syntax is correct, you might want to ensure consistent spacing after the bullet point markers for optimal rendering.
🧰 Tools
🪛 LanguageTool
[uncategorized] ~30-~30: Loose punctuation mark.
Context: ...Key Components -StreamableHTTPServer
: The main server implementation that han...(UNLIKELY_OPENING_PUNCTUATION)
[uncategorized] ~37-~37: Loose punctuation mark.
Context: ...rver Options -WithSessionIDGenerator
: Sets a custom session ID generator - `W...(UNLIKELY_OPENING_PUNCTUATION)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (3)
server/streamable_http.go (2)
306-314
: Sessions are removed locally but never unregistered from the core MCP serverEarlier feedback (already merged for
SSEServer
) pointed out that failing to
callserver.UnregisterSession
leaves dangling references inside the MCP
core. The same gap still exists here:Shutdown
only closes channels and
deletes the map entry.Add the call before
Delete
:- if session, ok := value.(ClientSession); ok { - if httpSession, ok := session.(*streamableHTTPSession); ok { - close(httpSession.notificationChannel) - } - } + if session, ok := value.(ClientSession); ok { + s.server.UnregisterSession(ctx, session.SessionID()) + if httpSession, ok := session.(*streamableHTTPSession); ok { + close(httpSession.notificationChannel) + } + }
575-688
: 🛠️ Refactor suggestionShadowing
ctx
violatesstaticcheck
SA4009 and hides the parent context
handleSSEResponse
receivesctx
but immediately overwrites it with
context.WithTimeout
. This is flagged bystaticcheck
and can lead to subtle
bugs if the original context carried values or cancellation.Two straightforward fixes:
Drop the parameter completely – the function can derive a fresh context
fromr.Context()
when needed.Rename the derived context:
- ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + streamCtx, cancel := context.WithTimeout(r.Context(), 5*time.Second)Either approach resolves the linter warning and keeps intent clear.
🧰 Tools
🪛 golangci-lint (1.64.8)
575-575: SA4009: argument ctx is overwritten before first use
(staticcheck)
688-688: SA4009(related information): assignment to ctx
(staticcheck)
server/streamable_http_test.go (1)
320-322
: Fixed sleep may still cause flakes on slow CI runnersThe hard-coded
time.Sleep(500 * time.Millisecond)
was previously flagged.
It remains in the test and can still introduce nondeterministic failures.Use a polling/timeout loop instead (see earlier suggestion) or rely on the
readDone
channel alone.
🧹 Nitpick comments (2)
server/streamable_http.go (1)
151-167
:ReplayEventsAfter
scales poorly – O(total events) searchSearching every stream for the matching
lastEventID
means latency grows linearly with the
entire store size. A map fromeventID → (streamID, index)
maintained in
StoreEvent
would make the lookup O(1):type InMemoryEventStore struct { mu sync.RWMutex events map[string][]storedEvent // streamID → events slice + index map[string]struct{stream string; idx int} // eventID → location }
This optimisation is essential once many sessions or long-running streams are
involved.server/streamable_http_test.go (1)
405-423
: Test logic can be simplified – the manual JSON round-trip is unnecessaryRather than constructing
rawNotification
and re-decoding it, directly
verifyparams["message"]
. If the field is missing the test should fail – we
do not need the fallback.Streamlining the assertion reduces noise:
- // Check if message exists in params - message, ok := params["message"] - if !ok { - ... - } - - // Check the message value - if message != "Hello, world!" { + message, ok := params["message"] + if !ok { + t.Fatalf("notification missing message param") + } + if message != "Hello, world!" {🧰 Tools
🪛 golangci-lint (1.64.8)
406-406: S1039: unnecessary use of fmt.Sprintf
(gosimple)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
server/http_transport_options.go
(1 hunks)server/streamable_http.go
(1 hunks)server/streamable_http_test.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- server/http_transport_options.go
🧰 Additional context used
🪛 golangci-lint (1.64.8)
server/streamable_http_test.go
406-406: S1039: unnecessary use of fmt.Sprintf
(gosimple)
server/streamable_http.go
575-575: SA4009: argument ctx is overwritten before first use
(staticcheck)
688-688: SA4009(related information): assignment to ctx
(staticcheck)
🔇 Additional comments (1)
server/streamable_http.go (1)
957-964
: Open CORS policy when no allow-list is provided – is that intentional?
isValidOrigin
currently allows any non-empty origin when
originAllowlist
is empty. If the intent was to secure by default, consider
either:
- Requiring an explicit
WithOriginAllowlist
call to enable cross-origin
requests, or- Documenting this permissive default clearly.
Please double-check against your threat model.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
server/streamable_http.go (3)
142-190
:ReplayEventsAfter
is O(streams × events) – consider an index for fast look-upsThe method linearly searches every stored event in every stream to locate
lastEventID
.
In production workloads with many streams or high event rates, this becomes quadratic and can easily dominate CPU time during reconnection storms.A minimal improvement is to maintain an additional
map[eventID]struct{streamID string; idx int}
duringStoreEvent
, allowing direct access in O(1):type InMemoryEventStore struct { mu sync.RWMutex events map[string][]storedEvent eventIndex map[string]struct{streamID string; idx int} // NEW } func (s *InMemoryEventStore) StoreEvent(streamID string, msg mcp.JSONRPCMessage) (string, error) { ... s.eventIndex[eventID] = struct{streamID string; idx int}{streamID, len(s.events[streamID]) - 1} ... } func (s *InMemoryEventStore) ReplayEventsAfter(lastEventID string, send func(string, mcp.JSONRPCMessage) error) error { s.mu.RLock() defer s.mu.RUnlock() pos, ok := s.eventIndex[lastEventID] if !ok { return fmt.Errorf("event ID not found: %s", lastEventID) } for _, ev := range s.events[pos.streamID][pos.idx+1:] { if err := send(ev.id, ev.message); err != nil { return err } } return nil }This keeps memory overhead small while giving predictable replay latency.
701-705
: Shadowedctx
tripsstaticcheck
; pick a distinct name
ctx
is reassigned bycontext.WithTimeout
, making the original parameter redundant (SA4009).
Renaming avoids the linter warning and clarifies intent:- ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + ctxWithTimeout, cancel := context.WithTimeout(r.Context(), 5*time.Second) ... - case <-ctx.Done(): + case <-ctxWithTimeout.Done():🧰 Tools
🪛 golangci-lint (1.64.8)
703-703: SA4009(related information): assignment to ctx
(staticcheck)
706-712
: Simplify keep-alive ticker set-upCreating a 24-hour ticker and immediately
Reset
-ing it is unnecessary and slightly confusing.
Allocate the ticker only when keep-alives are enabled; otherwise keep the channel nil so theselect
never fires.- keepAliveTicker := time.NewTicker(24 * time.Hour) // Default to a very long interval (effectively disabled) - if s.keepAliveEnabled && s.keepAliveInterval > 0 { - keepAliveTicker.Reset(s.keepAliveInterval) - } - defer keepAliveTicker.Stop() + var keepAliveTicker *time.Ticker + if s.keepAliveEnabled && s.keepAliveInterval > 0 { + keepAliveTicker = time.NewTicker(s.keepAliveInterval) + defer keepAliveTicker.Stop() + } ... - case <-keepAliveTicker.C: + case <-func() <-chan time.Time { + if keepAliveTicker != nil { + return keepAliveTicker.C + } + return nil + }():This removes the “effectively disabled” magic constant and makes the logic self-documenting.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
server/streamable_http.go
(1 hunks)
🧰 Additional context used
🪛 golangci-lint (1.64.8)
server/streamable_http.go
576-576: SA4009: argument ctx is overwritten before first use
(staticcheck)
703-703: SA4009(related information): assignment to ctx
(staticcheck)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (1)
server/streamable_http.go (1)
598-598
:⚠️ Potential issueFix unused context parameter
The
ctx
parameter is immediately shadowed by a new context created in the function, causing a staticcheck warning SA4009.-func (s *StreamableHTTPServer) handleSSEResponse(w http.ResponseWriter, r *http.Request, ctx context.Context, initialResponse mcp.JSONRPCMessage, session SessionWithTools, notificationBuffer ...mcp.JSONRPCNotification) { +func (s *StreamableHTTPServer) handleSSEResponse(w http.ResponseWriter, r *http.Request, _ context.Context, initialResponse mcp.JSONRPCMessage, session SessionWithTools, notificationBuffer ...mcp.JSONRPCNotification) {Alternatively, use the provided context instead of creating a new one on line 714:
- ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)And update all subsequent uses of
ctx
totimeoutCtx
.🧰 Tools
🪛 golangci-lint (1.64.8)
598-598: SA4009: argument ctx is overwritten before first use
(staticcheck)
🧹 Nitpick comments (4)
server/streamable_http.go (4)
141-190
: Improve replay event efficiencyThe
ReplayEventsAfter
method has a nested loop that could perform poorly with many events or streams. It iterates through all streams and all events to find the last event ID.Consider using a more efficient data structure like a map to directly look up events by ID, or maintain indices within each stream for faster lookup:
type InMemoryEventStore struct { mu sync.RWMutex events map[string][]storedEvent + eventIDIndex map[string]struct{ + streamID string + index int + } } func NewInMemoryEventStore() *InMemoryEventStore { return &InMemoryEventStore{ events: make(map[string][]storedEvent), + eventIDIndex: make(map[string]struct{ + streamID string + index int + }), } } // StoreEvent stores an event in memory func (s *InMemoryEventStore) StoreEvent(streamID string, message mcp.JSONRPCMessage) (string, error) { s.mu.Lock() defer s.mu.Unlock() eventID := uuid.New().String() event := storedEvent{ id: eventID, message: message, } if _, ok := s.events[streamID]; !ok { s.events[streamID] = []storedEvent{} } idx := len(s.events[streamID]) s.events[streamID] = append(s.events[streamID], event) + s.eventIDIndex[eventID] = struct{ + streamID string + index int + }{ + streamID: streamID, + index: idx, + } return eventID, nil } // ReplayEventsAfter replays events that occurred after the given event ID func (s *InMemoryEventStore) ReplayEventsAfter(lastEventID string, send func(eventID string, message mcp.JSONRPCMessage) error) error { s.mu.RLock() defer s.mu.RUnlock() if lastEventID == "" { return nil } + // Direct lookup using the index + eventLoc, found := s.eventIDIndex[lastEventID] + if !found { + return fmt.Errorf("event ID not found: %s", lastEventID) + } + + streamEvents, ok := s.events[eventLoc.streamID] + if !ok { + return fmt.Errorf("stream not found: %s", eventLoc.streamID) + } + + // Replay events after the indexed position + for i := eventLoc.index + 1; i < len(streamEvents); i++ { + if err := send(streamEvents[i].id, streamEvents[i].message); err != nil { + return err + } + } return nil }
460-464
: Consider adding schema validation for request structsThe code parses JSON-RPC requests but doesn't validate that they conform to the JSON-RPC 2.0 specification (e.g., checking that jsonrpc field equals "2.0").
Add validation to ensure requests conform to the JSON-RPC 2.0 specification:
// Parse the request to get the method and ID var request struct { JSONRPC string `json:"jsonrpc"` Method string `json:"method"` ID interface{} `json:"id"` } if err := json.Unmarshal(rawMessage, &request); err != nil { http.Error(w, "Invalid JSON-RPC request", http.StatusBadRequest) return } + + // Validate according to JSON-RPC 2.0 spec + if request.JSONRPC != "2.0" { + http.Error(w, "Invalid JSON-RPC version, must be 2.0", http.StatusBadRequest) + return + } + + if request.Method == "" { + http.Error(w, "Method must not be empty", http.StatusBadRequest) + return + }
714-716
: Non-configurable timeout valueThe code uses a hardcoded 5-second timeout for streaming responses without a response.
Make this timeout configurable to handle different use cases:
// Add to StreamableHTTPServer struct type StreamableHTTPServer struct { // ... existing fields + noResponseStreamTimeout time.Duration } // In the constructor func NewStreamableHTTPServer(server *MCPServer, opts ...StreamableHTTPOption) *StreamableHTTPServer { s := &StreamableHTTPServer{ // ... existing initialization + noResponseStreamTimeout: 5 * time.Second, } // ... } // Add a new option func WithNoResponseStreamTimeout(timeout time.Duration) StreamableHTTPOption { return streamableHTTPOption(func(s *StreamableHTTPServer) { s.noResponseStreamTimeout = timeout }) } // Then in handleSSEResponse: - ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + ctx, cancel := context.WithTimeout(r.Context(), s.noResponseStreamTimeout)🧰 Tools
🪛 golangci-lint (1.64.8)
714-714: SA4009(related information): assignment to ctx
(staticcheck)
523-545
: Better error handling needed for notification goroutineThe notification goroutine created for each session doesn't have a way to report errors back to the main goroutine.
Consider adding error handling and a way to clean up resources if the goroutine encounters an issue:
// Add to streamableHTTPSession type streamableHTTPSession struct { // ... existing fields + errChan chan error } // In handleRequest: newSession := &streamableHTTPSession{ sessionID: newSessionID, notificationChannel: make(chan mcp.JSONRPCNotification, 100), eventStore: s.eventStore, sessionTools: sync.Map{}, + errChan: make(chan error, 1), } // Then modify the goroutine: go func() { + defer func() { + if r := recover(); r != nil { + if err, ok := r.(error); ok { + newSession.errChan <- err + } else { + newSession.errChan <- fmt.Errorf("notification handler panic: %v", r) + } + } + }() for notification := range newSession.notificationChannel { // Call the notification handler if set newSession.notifyMu.RLock() handler := newSession.notificationHandler newSession.notifyMu.RUnlock() if handler != nil { + // Protect against panics in handler + func() { + defer func() { + if r := recover(); r != nil { + if err, ok := r.(error); ok { + newSession.errChan <- err + } else { + newSession.errChan <- fmt.Errorf("notification handler panic: %v", r) + } + } + }() handler(notification) + }() } } }()Also create a separate goroutine to monitor for these errors and clean up if necessary:
// Add after starting the notification goroutine go func() { select { case err := <-newSession.errChan: s.logger.Printf("Error in notification handler for session %s: %v", newSessionID, err) // Clean up the session if needed s.sessions.Delete(newSessionID) s.server.UnregisterSession(context.Background(), newSessionID) close(newSession.notificationChannel) } }()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
server/streamable_http.go
(1 hunks)
🧰 Additional context used
🪛 golangci-lint (1.64.8)
server/streamable_http.go
598-598: SA4009: argument ctx is overwritten before first use
(staticcheck)
714-714: SA4009(related information): assignment to ctx
(staticcheck)
🔇 Additional comments (8)
server/streamable_http.go (8)
18-29
: Session implementation looks correctThe
streamableHTTPSession
struct is well-designed with appropriate fields for maintaining session state. It correctly uses atomic operations for thread-safe access to theinitialized
flag and includes mutex protection for notification handling.
31-46
: Well-implemented JSON marshalingGood implementation of custom JSON marshaling that correctly excludes non-serializable fields like function types. This prevents potential JSON serialization issues when the session needs to be marshaled.
96-102
: Good use of interfaces for event storageThe
EventStore
interface is well-designed with clear method signatures that separate the concerns of storing events and replaying them. This abstraction allows for different event storage implementations.
952-983
: Thread-safe SSE event writing implementationThe
writeSSEEvent
method is properly implemented with mutex locking to ensure thread-safe access to the stream writer. This correctly prevents concurrent writes to the HTTP response writer, which is not thread-safe in Go'snet/http
package.
511-515
: Good notification handler restoration patternThe code correctly restores the original notification handler after request processing, which prevents memory leaks from temporary handlers being left in place. This addresses a previous review comment about temporary handlers not being cleaned up properly.
932-940
: Correct session cleanup on deleteThe implementation now correctly closes the notification channel during session deletion, which prevents goroutine leaks. This properly addresses the past review comment on lines 301-318.
301-318
: Sessions are now unregistered during shutdownThe code has been updated to correctly close notification channels for all sessions during shutdown, which prevents goroutine leaks. This properly addresses the past review comment on lines 301-318.
475-478
: Stateless mode is now implemented correctlyThe code correctly checks the
statelessMode
flag instead of relying on the session ID generator, which correctly implements the stateless mode functionality. This properly addresses a past review comment about ineffective session-presence checks.
if err != nil { | ||
// Log the error but continue | ||
fmt.Printf("Error storing event: %v\n", err) | ||
// Use a generated UUID as fallback | ||
eventID = uuid.New().String() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add structured error logging
The code uses fmt.Printf
for error logging, which isn't suitable for production environments.
Consider using a structured logger that can be configured for different environments:
// Add at top of file
import (
// ... existing imports
+ "log"
)
// In StreamableHTTPServer struct
type StreamableHTTPServer struct {
// ... existing fields
+ logger *log.Logger
}
// In the constructor
func NewStreamableHTTPServer(server *MCPServer, opts ...StreamableHTTPOption) *StreamableHTTPServer {
s := &StreamableHTTPServer{
// ... existing initialization
+ logger: log.New(os.Stderr, "StreamableHTTP: ", log.LstdFlags),
}
// ...
}
// Add a new option
func WithLogger(logger *log.Logger) StreamableHTTPOption {
return streamableHTTPOption(func(s *StreamableHTTPServer) {
s.logger = logger
})
}
// Then replace all fmt.Printf calls like this:
- fmt.Printf("Error storing event: %v\n", err)
+ s.logger.Printf("Error storing event: %v", err)
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
server/streamable_http.go (1)
598-773
: 🛠️ Refactor suggestionRemove unused
ctx
parameter and improve logging in SSE handler:handleSSEResponse
shadows itsctx
argument (SA4009). Refactor signature to dropctx
and avoid shadowing:-func (s *StreamableHTTPServer) handleSSEResponse(w http.ResponseWriter, r *http.Request, ctx context.Context, ... +func (s *StreamableHTTPServer) handleSSEResponse(w http.ResponseWriter, r *http.Request, ...Also replace
fmt.Printf
error logging with a structured logger.🧰 Tools
🪛 golangci-lint (1.64.8)
598-598: SA4009: argument ctx is overwritten before first use
(staticcheck)
714-714: SA4009(related information): assignment to ctx
(staticcheck)
🧹 Nitpick comments (3)
server/streamable_http.go (3)
320-347
: CORS header configuration is appropriate:resolveBasePath
andsetCORSHeaders
manage dynamic/static base paths and setAccess-Control-Allow-*
headers correctly. Consider addingAccess-Control-Allow-Credentials
if authentication cookies will be used.
775-929
: Reduce code duplication in SSE streams:handleGet
andhandleSSEResponse
share large blocks of SSE loop logic (event storage, writing, keep-alive). Extract common functionality into helper methods to adhere to DRY and simplify maintenance.
999-1045
: Review default CORS origin policy: Currently, whenoriginAllowlist
is empty, all origins are accepted, which may be insecure. Consider restricting by default to onlylocalhost
/127.0.0.1
and require an explicitWithAllowAllOrigins
option for broader access.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
server/streamable_http.go
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
server/streamable_http.go (4)
mcp/types.go (6)
JSONRPCNotification
(206-209)JSONRPCMessage
(89-89)Request
(103-116)Notification
(120-123)Params
(118-118)NotificationParams
(125-132)server/server.go (1)
ServerTool
(50-53)server/session.go (2)
ClientSession
(11-20)SessionWithTools
(23-31)server/http_transport_options.go (3)
StreamableHTTPOption
(43-46)HTTPContextFunc
(14-14)DynamicBasePathFunc
(141-141)
🪛 golangci-lint (1.64.8)
server/streamable_http.go
598-598: SA4009: argument ctx is overwritten before first use
(staticcheck)
714-714: SA4009(related information): assignment to ctx
(staticcheck)
🔇 Additional comments (9)
server/streamable_http.go (9)
18-90
: Session management implementation is solid:streamableHTTPSession
, its JSON marshaling (MarshalJSON
), interface methods (SessionID
,NotificationChannel
,Initialize
,Initialized
), and thread-safe tool storage (GetSessionTools
,SetSessionTools
) correctly implementClientSession
andSessionWithTools
.
96-190
: In-memory event store is correctly implemented: TheEventStore
interface andInMemoryEventStore
methods (StoreEvent
,ReplayEventsAfter
) provide safe concurrency control and proper replay logic. Consider performance implications for large event volumes.
192-234
: Configuration options are applied properly: Option setters (WithSessionIDGenerator
,WithStatelessMode
,WithEnableJSONResponse
,WithEventStore
, etc.) correctly modify server fields viastreamableHTTPOption
.
235-284
: Server construction is correct:StreamableHTTPServer
struct fields and theNewStreamableHTTPServer
constructor initialize defaults and apply custom options, including defaulting toInMemoryEventStore
when none is provided.
348-455
: HTTP routing and request dispatch are robust:ServeHTTP
,handlePost
, andhandleNotification
correctly implement the MCP HTTP endpoint, CORS preflight, and JSON-RPC notification semantics.
931-957
: Session deletion cleans up properly:handleDelete
closes the session channel, deletes it from the map, and unregisters it fromMCPServer
as expected.
959-997
: Thread-safe SSE event writer is correct: ThewriteSSEEvent
method locks per-stream state, marshals messages, writes proper SSE formatting, and flushes.
1047-1067
: Utility methods for session validation and header parsing are correct:validateSession
accurately checks initialization state, andsplitHeader
correctly splits and trims HTTP header values.
1069-1101
: SSE stream setup and teardown are properly implemented:setupStream
verifieshttp.Flusher
, sets SSE headers, and registers the stream mapping;closeStream
cleans it up.
notificationBuffer = append(notificationBuffer, notification) | ||
// Also forward to original handler if it exists | ||
if originalNotificationHandler != nil { | ||
originalNotificationHandler(notification) | ||
} | ||
} | ||
session.notifyMu.Unlock() | ||
} | ||
|
||
// Process the request | ||
response := s.server.HandleMessage(ctx, rawMessage) | ||
|
||
// Always restore the previous state (even if it was nil) | ||
// This prevents memory leaks from temporary handlers being left in place | ||
if session != nil { | ||
session.notifyMu.Lock() | ||
session.notificationHandler = originalNotificationHandler | ||
session.notifyMu.Unlock() | ||
} | ||
|
||
// If this is an initialization request, create a new session | ||
if isInitialize && response != nil { | ||
// Only create a session if we're not in stateless mode | ||
if !s.statelessMode { | ||
newSessionID := s.sessionIDGenerator() | ||
newSession := &streamableHTTPSession{ | ||
sessionID: newSessionID, | ||
notificationChannel: make(chan mcp.JSONRPCNotification, 100), | ||
eventStore: s.eventStore, | ||
sessionTools: sync.Map{}, | ||
} | ||
|
||
// Initialize and register the session | ||
newSession.Initialize() | ||
s.sessions.Store(newSessionID, newSession) | ||
|
||
// Start a goroutine to listen for notifications and call the notification handler | ||
go func() { | ||
for notification := range newSession.notificationChannel { | ||
// Call the notification handler if set | ||
newSession.notifyMu.RLock() | ||
handler := newSession.notificationHandler | ||
newSession.notifyMu.RUnlock() | ||
|
||
if handler != nil { | ||
handler(notification) | ||
} | ||
} | ||
}() | ||
|
||
if err := s.server.RegisterSession(ctx, newSession); err != nil { | ||
http.Error(w, fmt.Sprintf("Failed to register session: %v", err), http.StatusInternalServerError) | ||
return | ||
} | ||
|
||
// Set the session ID in the response header | ||
w.Header().Set("Mcp-Session-Id", newSessionID) | ||
|
||
// Update the session reference for further processing | ||
session = newSession | ||
} | ||
} | ||
|
||
// Check if the client accepts SSE | ||
acceptHeader := r.Header.Get("Accept") | ||
acceptsSSE := false | ||
for _, accept := range splitHeader(acceptHeader) { | ||
if strings.HasPrefix(accept, "text/event-stream") { | ||
acceptsSSE = true | ||
break | ||
} | ||
} | ||
|
||
// Determine if we should use SSE or direct JSON response | ||
useSSE := false | ||
|
||
// If the request contains any requests (not just notifications), we might use SSE | ||
if request.ID != nil { | ||
// Use SSE if: | ||
// 1. The client accepts SSE | ||
// 2. We have a valid session | ||
// 3. JSON response is not explicitly enabled | ||
// 4. The request is not an initialization request (those always return JSON) | ||
if acceptsSSE && session != nil && !s.enableJSONResponse && !isInitialize { | ||
useSSE = true | ||
} | ||
} | ||
|
||
if useSSE { | ||
// Start an SSE stream for this request | ||
s.handleSSEResponse(w, r, ctx, response, session, notificationBuffer...) | ||
} else { | ||
// Send a direct JSON response | ||
w.Header().Set("Content-Type", "application/json") | ||
w.WriteHeader(http.StatusOK) | ||
if response != nil { | ||
json.NewEncoder(w).Encode(response) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid competing readers on session notification channel: The goroutine started after session initialization (lines 533–545) consumes from newSession.notificationChannel
, which conflicts with SSE/GET handlers that also read from this channel, causing potential message loss. Consider creating per-stream channels or using a single channel consumer pattern.
// Start begins serving Streamable HTTP connections on the specified address. | ||
// It sets up HTTP handlers for the MCP endpoint. | ||
func (s *StreamableHTTPServer) Start(addr string) error { | ||
s.srv = &http.Server{ | ||
Addr: addr, | ||
Handler: s, | ||
} | ||
|
||
err := s.srv.ListenAndServe() | ||
if err != nil && err != http.ErrServerClosed { | ||
return err | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ensure sessions are unregistered on shutdown: Shutdown
currently closes channels and deletes sessions but does not call s.server.UnregisterSession
, leaving stale references in the core server. Please add s.server.UnregisterSession(ctx, session.SessionID())
before closing each session's channel.
Also applies to: 301-318
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
server/streamable_http.go (1)
783-937
: Duplicate of SSE stream consumer issue inhandleGet
ThehandleGet
method also starts a goroutine reading directly fromsession.notificationChannel
, risking the same competing-read issue noted in the initialization path. Please apply the same refactoring there.
🧹 Nitpick comments (4)
server/streamable_http.go (4)
243-267
: Remove or implement the unusedbaseURL
field
TheStreamableHTTPServer
struct includes abaseURL
member that is never referenced elsewhere in this file. To keep the API surface clean, either remove this field or wire it into URL generation logic (e.g., for absolute links in docs or HATEOAS responses).
342-354
: Enhance CORS response withVary: Origin
When settingAccess-Control-Allow-Origin
, it’s best practice to also setVary: Origin
to signal caches that responses differ by theOrigin
header. Consider adding:w.Header().Add("Vary", "Origin")
356-395
: ServeHTTP path matching may be too strict
Currently, only exact matches ofbasePath + endpoint
are served (e.g.,/mcp
). You might want to support a trailing slash or sub-paths for health checks or metrics. For instance, consider usingstrings.HasPrefix(path, endpoint+"/")
or normalizing the path.
606-781
: Refactor suggestion: context parameter and logging
- The
ctx
parameter inhandleSSEResponse
is immediately shadowed byctx, cancel := context.WithTimeout(r.Context(), ...)
(SA4009). You can drop thectx
argument and derive context inside, or rename one to avoid confusion.- Error logging within this method still uses
fmt.Printf
; consider injecting alogger
intoStreamableHTTPServer
and using structured logging (s.logger.Printf
) for production readiness.🧰 Tools
🪛 golangci-lint (1.64.8)
606-606: SA4009: argument ctx is overwritten before first use
(staticcheck)
722-722: SA4009(related information): assignment to ctx
(staticcheck)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
server/streamable_http.go
(1 hunks)server/streamable_http_origin_validation_test.go
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- server/streamable_http_origin_validation_test.go
🧰 Additional context used
🧬 Code Graph Analysis (1)
server/streamable_http.go (4)
mcp/types.go (6)
JSONRPCNotification
(206-209)JSONRPCMessage
(89-89)Request
(103-116)Notification
(120-123)Params
(118-118)NotificationParams
(125-132)server/server.go (1)
ServerTool
(50-53)server/session.go (2)
ClientSession
(11-20)SessionWithTools
(23-31)server/http_transport_options.go (3)
StreamableHTTPOption
(43-46)HTTPContextFunc
(14-14)DynamicBasePathFunc
(141-141)
🪛 golangci-lint (1.64.8)
server/streamable_http.go
606-606: SA4009: argument ctx is overwritten before first use
(staticcheck)
722-722: SA4009(related information): assignment to ctx
(staticcheck)
🔇 Additional comments (9)
server/streamable_http.go (9)
18-29
: Session implementation is solid
ThestreamableHTTPSession
struct and its methods correctly useatomic.Bool
for initialization state andsync.Map
for tools, ensuring thread-safety. TheMarshalJSON
,Initialize
, andNotificationChannel
interfaces are implemented properly.
105-139
: In-memory event store is correct but consider lookup performance
TheInMemoryEventStore
stores events safely under lock. Playback viaReplayEventsAfter
works as specified. Be mindful that finding the event ID scans all streams (O(n×m)), which could become a bottleneck under high load. For production use, you might consider indexing event IDs directly.
192-242
: Options API is complete and consistent
All core server options (WithSessionIDGenerator
,WithStatelessMode
,WithEnableJSONResponse
,WithEventStore
,WithStreamableHTTPContextFunc
,WithOriginAllowlist
,WithAllowAllOrigins
) are implemented idiomatically. Their application inNewStreamableHTTPServer
will configure the server as expected.
269-292
: Server constructor initializes defaults correctly
NewStreamableHTTPServer
sets sensible defaults (endpoint/mcp
, UUID-based session IDs, in-memory event store) and applies any provided options. The lazy initialization of the event store ensures out-of-the-box functionality.
397-464
: handlePost and notifications look good
Session retrieval, JSON parsing, dispatching tohandleNotification
orhandleRequest
, and returning202 Accepted
for notifications are all handled correctly. Error conditions are properly checked and signaled.
465-508
: Core request handling is robust
The request parsing, initialization check, stateless mode guard, and temporary notification buffering use a clean lock-based approach. Restoring the original handler vianotifyMu
prevents memory leaks.
939-965
: Session termination is handled correctly
Closing the session’s notification channel, deleting it from the map, and callingUnregisterSession
ensures resources are cleaned up afterDELETE
requests.
975-1005
:writeSSEEvent
is well-synchronized
The use of a per-streammu
lock around writes and flushes guarantees thread-safety. Marshaling and header formatting conform to the SSE spec.
1007-1109
: Utility methods are solid
Header splitting, origin validation (including wildcard and*
allow-all), session validation, and stream setup/teardown are implemented correctly and align with the spec.
// If this is an initialization request, create a new session | ||
if isInitialize && response != nil { | ||
// Only create a session if we're not in stateless mode | ||
if !s.statelessMode { | ||
newSessionID := s.sessionIDGenerator() | ||
newSession := &streamableHTTPSession{ | ||
sessionID: newSessionID, | ||
notificationChannel: make(chan mcp.JSONRPCNotification, 100), | ||
eventStore: s.eventStore, | ||
sessionTools: sync.Map{}, | ||
} | ||
|
||
// Initialize and register the session | ||
newSession.Initialize() | ||
s.sessions.Store(newSessionID, newSession) | ||
|
||
// Start a goroutine to listen for notifications and call the notification handler | ||
go func() { | ||
for notification := range newSession.notificationChannel { | ||
// Call the notification handler if set | ||
newSession.notifyMu.RLock() | ||
handler := newSession.notificationHandler | ||
newSession.notifyMu.RUnlock() | ||
|
||
if handler != nil { | ||
handler(notification) | ||
} | ||
} | ||
}() | ||
|
||
if err := s.server.RegisterSession(ctx, newSession); err != nil { | ||
http.Error(w, fmt.Sprintf("Failed to register session: %v", err), http.StatusInternalServerError) | ||
return | ||
} | ||
|
||
// Set the session ID in the response header | ||
w.Header().Set("Mcp-Session-Id", newSessionID) | ||
|
||
// Update the session reference for further processing | ||
session = newSession | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data race from competing session notification consumers
You spawn a long-lived goroutine on session initialization that consumes from newSession.notificationChannel
and also spin up per-SSE-stream goroutines that read from the same channel. This competing-reader pattern can result in lost notifications.
Refactor to fan out notifications properly: either remove the initial listener and rely solely on per-stream consumers, or use a broadcast mechanism so each consumer gets all events.
// Start begins serving Streamable HTTP connections on the specified address. | ||
// It sets up HTTP handlers for the MCP endpoint. | ||
func (s *StreamableHTTPServer) Start(addr string) error { | ||
s.srv = &http.Server{ | ||
Addr: addr, | ||
Handler: s, | ||
} | ||
|
||
err := s.srv.ListenAndServe() | ||
if err != nil && err != http.ErrServerClosed { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
// Shutdown gracefully stops the Streamable HTTP server, closing all active sessions | ||
// and shutting down the HTTP server. | ||
func (s *StreamableHTTPServer) Shutdown(ctx context.Context) error { | ||
if s.srv != nil { | ||
s.sessions.Range(func(key, value interface{}) bool { | ||
if session, ok := value.(ClientSession); ok { | ||
if httpSession, ok := session.(*streamableHTTPSession); ok { | ||
close(httpSession.notificationChannel) | ||
} | ||
} | ||
s.sessions.Delete(key) | ||
return true | ||
}) | ||
|
||
return s.srv.Shutdown(ctx) | ||
} | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing session unregistration on shutdown
In Shutdown
, you close each session’s channel and delete it from sessions
, but you never call s.server.UnregisterSession
. This may leave dangling session references in the core MCPServer
.
Please add a call to s.server.UnregisterSession(ctx, sessionID)
inside the Range
loop before deleting the session to ensure clean teardown.
is it possible to integrate this with existing web server ? i.e. i already have a remote web server setup and running , so just expose mcp http server interface without starting a additional http server ? |
It is possible to integrate your MCP server by configuring a proxy in your web server. Servers like apache or nginx can be configured to forward requests for specific endpoints (/sse or /mcp etc) to another service running on specific port. |
Closed in favor of #273 |
Key Features
Implementation Details
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Tests