Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions server/streamable_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ func WithLogger(logger util.Logger) StreamableHTTPOption {
// - Batching of requests/notifications/responses in arrays.
// - Stream Resumability
type StreamableHTTPServer struct {
server *MCPServer
sessionTools *sessionToolsStore
server *MCPServer
sessionTools *sessionToolsStore
sessionRequestIDs sync.Map // sessionId --> last requestID(*atomic.Int64)

httpServer *http.Server
mu sync.RWMutex
Expand Down Expand Up @@ -389,15 +390,16 @@ func (s *StreamableHTTPServer) handleGet(w http.ResponseWriter, r *http.Request)
go func() {
ticker := time.NewTicker(s.listenHeartbeatInterval)
defer ticker.Stop()
message := mcp.JSONRPCRequest{
JSONRPC: "2.0",
Request: mcp.Request{
Method: "ping",
},
}
for {
select {
case <-ticker.C:
message := mcp.JSONRPCRequest{
JSONRPC: "2.0",
ID: mcp.NewRequestId(s.nextRequestID(sessionID)),
Request: mcp.Request{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requestID will always be 1, because handleGet will always create a new session. for the same sessionID, the requestID should be unique.

From specification: The session ID SHOULD be globally unique ...

therefore, we consider sessions with the same sessionID to be the same session.

Copy link
Contributor Author

@cryo-zd cryo-zd May 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requestID will always be 1, because handleGet will always create a new session. for the same sessionID, the requestID should be unique.

From specification: The session ID SHOULD be globally unique ...
therefore, we consider sessions with the same sessionID to be the same session.

sorry for my mistake, I have updated it to make requestID unique and monotonically increasing within each session. What do you think of this change ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, thank you for the quick fix on this issue. I have verified that it makes the clients I tested on happy.

However I am not sure this implementation is per the spec. In the spec they state:

If present, the ID MUST be globally unique across all streams within that session—or all streams with that specific client, if session management is not in use.

In the current implementation the ID is unique for ping requests but doesn't account for other requests happening in the session like tool lists or calls, resulting in multiple requests with the same ID happening for a given session.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. The problem is that the ID of the result returned by our handlePost function uses the value of RequestID from the client request. And I think we should change the ID value instead of using RequestID directly.

In other words, these event IDs should be assigned by servers on a per-stream basis, to act as a cursor within that particular stream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. The problem is that the ID of the result returned by our handlePost function uses the value of RequestID from the client request. And I think we should change the ID value instead of using RequestID directly.

In other words, these event IDs should be assigned by servers on a per-stream basis, to act as a cursor within that particular stream.

[difference between mcp-go's writeSSEEvent and typescript-sdk's (link )WriteSSEEvent]

func writeSSEEvent(w io.Writer, data any) error {
	jsonData, err := json.Marshal(data)
	if err != nil {
		return fmt.Errorf("failed to marshal data: %w", err)
	}
         // only message and data field, no field "event ID", id inside jsonrpcresponse/jsonrpcrequest != event ID
	_, err = fmt.Fprintf(w, "event: message\ndata: %s\n\n", jsonData)
	if err != nil {
		return fmt.Errorf("failed to write SSE event: %w", err)
	}
	return nil
}
  private writeSSEEvent(res: ServerResponse, message: JSONRPCMessage, eventId?: string): boolean {
    let eventData = `event: message\n`;
    // Include event ID if provided - this is important for resumability<===[event ID]
    if (eventId) {
      eventData += `id: ${eventId}\n`;
    }
    eventData += `data: ${JSON.stringify(message)}\n\n`;

    return res.write(eventData);
  }

Method: "ping",
},
}
select {
case writeChan <- message:
case <-done:
Expand Down Expand Up @@ -447,6 +449,9 @@ func (s *StreamableHTTPServer) handleDelete(w http.ResponseWriter, r *http.Reque
// remove the session relateddata from the sessionToolsStore
s.sessionTools.set(sessionID, nil)

// remove current session's requstID information
s.sessionRequestIDs.Delete(sessionID)

w.WriteHeader(http.StatusOK)
}

Expand Down Expand Up @@ -478,6 +483,13 @@ func (s *StreamableHTTPServer) writeJSONRPCError(
}
}

// nextRequestID gets the next incrementing requestID for the current session
func (s *StreamableHTTPServer) nextRequestID(sessionID string) int64 {
actual, _ := s.sessionRequestIDs.LoadOrStore(sessionID, new(atomic.Int64))
counter := actual.(*atomic.Int64)
return counter.Add(1)
}

// --- session ---

type sessionToolsStore struct {
Expand Down