From 0c405ef70eb095fdd82cccfb23f79f30b6d04c2a Mon Sep 17 00:00:00 2001 From: Mathieu Rene Date: Fri, 28 Mar 2025 17:03:24 -0400 Subject: [PATCH] fix: remove sse read timeout to avoid ignoring future sse messages --- client/sse.go | 51 ++++++++++++++++++++------------------------------- 1 file changed, 20 insertions(+), 31 deletions(-) diff --git a/client/sse.go b/client/sse.go index cf4a1028e..4fa9ea4b8 100644 --- a/client/sse.go +++ b/client/sse.go @@ -23,20 +23,19 @@ import ( // while sending requests over regular HTTP POST calls. The client handles // automatic reconnection and message routing between requests and responses. type SSEMCPClient struct { - baseURL *url.URL - endpoint *url.URL - httpClient *http.Client - requestID atomic.Int64 - responses map[int64]chan RPCResponse - mu sync.RWMutex - done chan struct{} - initialized bool - notifications []func(mcp.JSONRPCNotification) - notifyMu sync.RWMutex - endpointChan chan struct{} - capabilities mcp.ServerCapabilities - headers map[string]string - sseReadTimeout time.Duration + baseURL *url.URL + endpoint *url.URL + httpClient *http.Client + requestID atomic.Int64 + responses map[int64]chan RPCResponse + mu sync.RWMutex + done chan struct{} + initialized bool + notifications []func(mcp.JSONRPCNotification) + notifyMu sync.RWMutex + endpointChan chan struct{} + capabilities mcp.ServerCapabilities + headers map[string]string } type ClientOption func(*SSEMCPClient) @@ -47,12 +46,6 @@ func WithHeaders(headers map[string]string) ClientOption { } } -func WithSSEReadTimeout(timeout time.Duration) ClientOption { - return func(sc *SSEMCPClient) { - sc.sseReadTimeout = timeout - } -} - // NewSSEMCPClient creates a new SSE-based MCP client with the given base URL. // Returns an error if the URL is invalid. func NewSSEMCPClient(baseURL string, options ...ClientOption) (*SSEMCPClient, error) { @@ -62,13 +55,12 @@ func NewSSEMCPClient(baseURL string, options ...ClientOption) (*SSEMCPClient, er } smc := &SSEMCPClient{ - baseURL: parsedURL, - httpClient: &http.Client{}, - responses: make(map[int64]chan RPCResponse), - done: make(chan struct{}), - endpointChan: make(chan struct{}), - sseReadTimeout: 30 * time.Second, - headers: make(map[string]string), + baseURL: parsedURL, + httpClient: &http.Client{}, + responses: make(map[int64]chan RPCResponse), + done: make(chan struct{}), + endpointChan: make(chan struct{}), + headers: make(map[string]string), } for _, opt := range options { @@ -128,12 +120,9 @@ func (c *SSEMCPClient) readSSE(reader io.ReadCloser) { br := bufio.NewReader(reader) var event, data string - ctx, cancel := context.WithTimeout(context.Background(), c.sseReadTimeout) - defer cancel() - for { select { - case <-ctx.Done(): + case <-c.done: return default: line, err := br.ReadString('\n')