From c8731d5f72744cd4cd49ee5c7aae2315f343e4dc Mon Sep 17 00:00:00 2001 From: winter_wang Date: Thu, 10 Apr 2025 11:16:21 +0800 Subject: [PATCH 01/10] refactor(client): improve error handling in StdioMCPClient - Fix resource cleanup in error cases - Improve error handling in waitUntilReadyOrExit - Add proper pipe cleanup on command start failure --- client/stdio.go | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/client/stdio.go b/client/stdio.go index 8e0845dca..edcd9de62 100644 --- a/client/stdio.go +++ b/client/stdio.go @@ -11,10 +11,16 @@ import ( "os/exec" "sync" "sync/atomic" + "time" "github.com/mark3labs/mcp-go/mcp" ) +const ( + readyTimeout = 5 * time.Second + readyCheckTimeout = 100 * time.Millisecond +) + // StdioMCPClient implements the MCPClient interface using stdio communication. // It launches a subprocess and communicates with it via standard input/output streams // using JSON-RPC messages. The client handles message routing between requests and @@ -74,20 +80,46 @@ func NewStdioMCPClient( } if err := cmd.Start(); err != nil { + _ = stdin.Close() + _ = stdout.Close() + _ = stderr.Close() return nil, fmt.Errorf("failed to start command: %w", err) } + waitErrChain := make(chan error, 1) + go func() { + waitErrChain <- cmd.Wait() + }() + // Start reading responses in a goroutine and wait for it to be ready ready := make(chan struct{}) go func() { close(ready) client.readResponses() }() - <-ready + if err := waitUntilReadyOrExit(ready, waitErrChain, readyTimeout); err != nil { + return nil, err + } return client, nil } +func waitUntilReadyOrExit(ready <-chan struct{}, waitErr <-chan error, timeout time.Duration) error { + select { + case err := <-waitErr: + return fmt.Errorf("process exited early: %w", err) + case <-ready: + select { + case err := <-waitErr: + return fmt.Errorf("process exited after ready: %w", err) + case <-time.After(readyCheckTimeout): + return nil + } + case <-time.After(timeout): + return errors.New("timeout waiting for process ready") + } +} + // Close shuts down the stdio client, closing the stdin pipe and waiting for the subprocess to exit. // Returns an error if there are issues closing stdin or waiting for the subprocess to terminate. func (c *StdioMCPClient) Close() error { From ad295e3df1d3f1084b2efa7c4a538b3a17dfecb9 Mon Sep 17 00:00:00 2001 From: winter_wang Date: Thu, 10 Apr 2025 14:21:15 +0800 Subject: [PATCH 02/10] increase readyCheckTimeout --- client/stdio.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/stdio.go b/client/stdio.go index edcd9de62..c9ac5b338 100644 --- a/client/stdio.go +++ b/client/stdio.go @@ -18,7 +18,7 @@ import ( const ( readyTimeout = 5 * time.Second - readyCheckTimeout = 100 * time.Millisecond + readyCheckTimeout = 3 * time.Second ) // StdioMCPClient implements the MCPClient interface using stdio communication. From 1b4f98d5a588e67138cb80455452c904f37aeab1 Mon Sep 17 00:00:00 2001 From: winter_wang Date: Thu, 10 Apr 2025 14:30:56 +0800 Subject: [PATCH 03/10] adjust readyCheckTimeout --- client/stdio.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/stdio.go b/client/stdio.go index c9ac5b338..ccaf1b296 100644 --- a/client/stdio.go +++ b/client/stdio.go @@ -18,7 +18,7 @@ import ( const ( readyTimeout = 5 * time.Second - readyCheckTimeout = 3 * time.Second + readyCheckTimeout = 1 * time.Second ) // StdioMCPClient implements the MCPClient interface using stdio communication. From b312c37634320564539e92cd1d0a04bff5b51d99 Mon Sep 17 00:00:00 2001 From: winter_wang Date: Thu, 10 Apr 2025 15:28:17 +0800 Subject: [PATCH 04/10] fix data race issue --- client/stdio.go | 48 +++++++++++++++++++++++------------------------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/client/stdio.go b/client/stdio.go index ccaf1b296..389e4c0ee 100644 --- a/client/stdio.go +++ b/client/stdio.go @@ -26,18 +26,19 @@ const ( // using JSON-RPC messages. The client handles message routing between requests and // responses, and supports asynchronous notifications. type StdioMCPClient struct { - cmd *exec.Cmd - stdin io.WriteCloser - stdout *bufio.Reader - stderr io.ReadCloser - requestID atomic.Int64 - responses map[int64]chan RPCResponse - mu sync.RWMutex - done chan struct{} - initialized bool - notifications []func(mcp.JSONRPCNotification) - notifyMu sync.RWMutex - capabilities mcp.ServerCapabilities + cmd *exec.Cmd + stdin io.WriteCloser + stdout *bufio.Reader + stderr io.ReadCloser + requestID atomic.Int64 + responses map[int64]chan RPCResponse + mu sync.RWMutex + done chan struct{} + initialized bool + notifications []func(mcp.JSONRPCNotification) + notifyMu sync.RWMutex + capabilities mcp.ServerCapabilities + processExitErr chan error } // NewStdioMCPClient creates a new stdio-based MCP client that communicates with a subprocess. @@ -71,24 +72,21 @@ func NewStdioMCPClient( } client := &StdioMCPClient{ - cmd: cmd, - stdin: stdin, - stderr: stderr, - stdout: bufio.NewReader(stdout), - responses: make(map[int64]chan RPCResponse), - done: make(chan struct{}), + cmd: cmd, + stdin: stdin, + stderr: stderr, + stdout: bufio.NewReader(stdout), + responses: make(map[int64]chan RPCResponse), + done: make(chan struct{}), + processExitErr: make(chan error, 1), } if err := cmd.Start(); err != nil { - _ = stdin.Close() - _ = stdout.Close() - _ = stderr.Close() return nil, fmt.Errorf("failed to start command: %w", err) } - waitErrChain := make(chan error, 1) go func() { - waitErrChain <- cmd.Wait() + client.processExitErr <- cmd.Wait() }() // Start reading responses in a goroutine and wait for it to be ready @@ -98,7 +96,7 @@ func NewStdioMCPClient( client.readResponses() }() - if err := waitUntilReadyOrExit(ready, waitErrChain, readyTimeout); err != nil { + if err := waitUntilReadyOrExit(ready, client.processExitErr, readyTimeout); err != nil { return nil, err } return client, nil @@ -130,7 +128,7 @@ func (c *StdioMCPClient) Close() error { if err := c.stderr.Close(); err != nil { return fmt.Errorf("failed to close stderr: %w", err) } - return c.cmd.Wait() + return <-c.processExitErr } // Stderr returns a reader for the stderr output of the subprocess. From cddc72306c133a32e41c2ebf7a46b9513e36d7ab Mon Sep 17 00:00:00 2001 From: winter_wang Date: Thu, 17 Apr 2025 11:41:00 +0800 Subject: [PATCH 05/10] fix --- client/transport/stdio.go | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/client/transport/stdio.go b/client/transport/stdio.go index 04e7c5bb4..f564690e7 100644 --- a/client/transport/stdio.go +++ b/client/transport/stdio.go @@ -10,6 +10,7 @@ import ( "os" "os/exec" "sync" + "sync/atomic" "time" "github.com/mark3labs/mcp-go/mcp" @@ -38,7 +39,8 @@ type Stdio struct { done chan struct{} onNotification func(mcp.JSONRPCNotification) notifyMu sync.RWMutex - processExitErr chan error + processExited chan struct{} + exitErr atomic.Value } // NewStdio creates a new stdio transport to communicate with a subprocess. @@ -55,9 +57,9 @@ func NewStdio( args: args, env: env, - responses: make(map[int64]chan *JSONRPCResponse), - done: make(chan struct{}), - processExitErr: make(chan error, 1), + responses: make(map[int64]chan *JSONRPCResponse), + done: make(chan struct{}), + processExited: make(chan struct{}), } return client @@ -96,7 +98,11 @@ func (c *Stdio) Start(ctx context.Context) error { } go func() { - c.processExitErr <- cmd.Wait() + err := cmd.Wait() + if err != nil { + c.exitErr.Store(err) + } + close(c.processExited) }() // Start reading responses in a goroutine and wait for it to be ready @@ -105,20 +111,21 @@ func (c *Stdio) Start(ctx context.Context) error { close(ready) c.readResponses() }() - if err := waitUntilReadyOrExit(ready, c.processExitErr, readyTimeout); err != nil { + + if err := waitUntilReadyOrExit(ready, c.processExited, readyTimeout); err != nil { return err } return nil } -func waitUntilReadyOrExit(ready <-chan struct{}, waitErr <-chan error, timeout time.Duration) error { +func waitUntilReadyOrExit(ready <-chan struct{}, exited <-chan struct{}, timeout time.Duration) error { select { - case err := <-waitErr: - return fmt.Errorf("process exited early: %w", err) + case <-exited: + return errors.New("process exited before signalling readiness") case <-ready: select { - case err := <-waitErr: - return fmt.Errorf("process exited after ready: %w", err) + case <-exited: + return errors.New("process exited after readiness") case <-time.After(readyCheckTimeout): return nil } @@ -137,7 +144,11 @@ func (c *Stdio) Close() error { if err := c.stderr.Close(); err != nil { return fmt.Errorf("failed to close stderr: %w", err) } - return <-c.processExitErr + <-c.processExited + if err, ok := c.exitErr.Load().(error); ok && err != nil { + return err + } + return nil } // OnNotification registers a handler function to be called when notifications are received. From b21c542ce608e261ef3e2edff705f9c90f3cfa19 Mon Sep 17 00:00:00 2001 From: winter_wang Date: Tue, 6 May 2025 16:12:25 +0800 Subject: [PATCH 06/10] sync --- client/transport/sse_test.go | 1 + server/sse.go | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/client/transport/sse_test.go b/client/transport/sse_test.go index 0227bfbb7..b8b59d06a 100644 --- a/client/transport/sse_test.go +++ b/client/transport/sse_test.go @@ -501,4 +501,5 @@ func TestSSEErrors(t *testing.T) { t.Errorf("Expected error when sending request after close, got nil") } }) + } diff --git a/server/sse.go b/server/sse.go index 59c82b519..018657e6f 100644 --- a/server/sse.go +++ b/server/sse.go @@ -655,6 +655,10 @@ func (s *SSEServer) MessageHandler() http.Handler { // ServeHTTP implements the http.Handler interface. func (s *SSEServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if s.dynamicBasePathFunc != nil { + http.Error(w, (&ErrDynamicPathConfig{Method: "ServeHTTP"}).Error(), http.StatusInternalServerError) + return + } path := r.URL.Path // Use exact path matching rather than Contains ssePath := s.CompleteSsePath() From 9be83891142124ba1fb934af6f4d4068518b0a0c Mon Sep 17 00:00:00 2001 From: winter_wang Date: Tue, 6 May 2025 16:15:45 +0800 Subject: [PATCH 07/10] sync --- client/stdio_test.go | 2 -- client/transport/stdio_test.go | 6 ------ 2 files changed, 8 deletions(-) diff --git a/client/stdio_test.go b/client/stdio_test.go index 48514d91c..78499115b 100644 --- a/client/stdio_test.go +++ b/client/stdio_test.go @@ -45,13 +45,11 @@ func TestStdioMCPClient(t *testing.T) { } tempFile.Close() mockServerPath := tempFile.Name() - // Add .exe suffix on Windows if runtime.GOOS == "windows" { os.Remove(mockServerPath) // Remove the empty file first mockServerPath += ".exe" } - if compileErr := compileTestServer(mockServerPath); compileErr != nil { t.Fatalf("Failed to compile mock server: %v", compileErr) } diff --git a/client/transport/stdio_test.go b/client/transport/stdio_test.go index 51e90b0f0..d60fa1e24 100644 --- a/client/transport/stdio_test.go +++ b/client/transport/stdio_test.go @@ -44,13 +44,11 @@ func TestStdio(t *testing.T) { } tempFile.Close() mockServerPath := tempFile.Name() - // Add .exe suffix on Windows if runtime.GOOS == "windows" { os.Remove(mockServerPath) // Remove the empty file first mockServerPath += ".exe" } - if compileErr := compileTestServer(mockServerPath); compileErr != nil { t.Fatalf("Failed to compile mock server: %v", compileErr) } @@ -329,13 +327,11 @@ func TestStdioErrors(t *testing.T) { } tempFile.Close() mockServerPath := tempFile.Name() - // Add .exe suffix on Windows if runtime.GOOS == "windows" { os.Remove(mockServerPath) // Remove the empty file first mockServerPath += ".exe" } - if compileErr := compileTestServer(mockServerPath); compileErr != nil { t.Fatalf("Failed to compile mock server: %v", compileErr) } @@ -368,13 +364,11 @@ func TestStdioErrors(t *testing.T) { } tempFile.Close() mockServerPath := tempFile.Name() - // Add .exe suffix on Windows if runtime.GOOS == "windows" { os.Remove(mockServerPath) // Remove the empty file first mockServerPath += ".exe" } - if compileErr := compileTestServer(mockServerPath); compileErr != nil { t.Fatalf("Failed to compile mock server: %v", compileErr) } From 7a5c8b32e992fd21d410e00fb7d2ec1922a3d2e3 Mon Sep 17 00:00:00 2001 From: winter_wang Date: Tue, 6 May 2025 16:17:46 +0800 Subject: [PATCH 08/10] sync --- client/stdio_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/client/stdio_test.go b/client/stdio_test.go index 78499115b..48514d91c 100644 --- a/client/stdio_test.go +++ b/client/stdio_test.go @@ -45,11 +45,13 @@ func TestStdioMCPClient(t *testing.T) { } tempFile.Close() mockServerPath := tempFile.Name() + // Add .exe suffix on Windows if runtime.GOOS == "windows" { os.Remove(mockServerPath) // Remove the empty file first mockServerPath += ".exe" } + if compileErr := compileTestServer(mockServerPath); compileErr != nil { t.Fatalf("Failed to compile mock server: %v", compileErr) } From c65164d298ed01cb4b19f4946509eeb17ad3f67c Mon Sep 17 00:00:00 2001 From: winter_wang Date: Tue, 6 May 2025 18:16:36 +0800 Subject: [PATCH 09/10] fix --- client/transport/stdio.go | 43 ++++++++++++++++++++++----------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/client/transport/stdio.go b/client/transport/stdio.go index d5aea3782..edc2e2be0 100644 --- a/client/transport/stdio.go +++ b/client/transport/stdio.go @@ -39,7 +39,6 @@ type Stdio struct { done chan struct{} onNotification func(mcp.JSONRPCNotification) notifyMu sync.RWMutex - processExited chan struct{} exitErr atomic.Value } @@ -71,9 +70,8 @@ func NewStdio( args: args, env: env, - responses: make(map[int64]chan *JSONRPCResponse), - done: make(chan struct{}), - processExited: make(chan struct{}), + responses: make(map[int64]chan *JSONRPCResponse), + done: make(chan struct{}), } return client @@ -83,7 +81,14 @@ func (c *Stdio) Start(ctx context.Context) error { if err := c.spawnCommand(ctx); err != nil { return err } - return nil + + // Start reading responses in a goroutine and wait for it to be ready + ready := make(chan struct{}) + go func() { + close(ready) + c.readResponses() + }() + return waitUntilReadyOrExit(ready, c.done, readyTimeout) } // spawnCommand spawns a new process running c.command. @@ -121,27 +126,25 @@ func (c *Stdio) spawnCommand(ctx context.Context) error { if err := cmd.Start(); err != nil { return fmt.Errorf("failed to start command: %w", err) } + go func() { err := cmd.Wait() if err != nil { c.exitErr.Store(err) } - close(c.processExited) - }() - - // Start reading responses in a goroutine and wait for it to be ready - ready := make(chan struct{}) - go func() { - close(ready) - c.readResponses() + tryCloseDone(c.done) }() - - if err := waitUntilReadyOrExit(ready, c.processExited, readyTimeout); err != nil { - return err - } return nil } +func tryCloseDone(done chan struct{}) { + select { + case <-done: + return + default: + } + close(done) +} func waitUntilReadyOrExit(ready <-chan struct{}, exited <-chan struct{}, timeout time.Duration) error { select { case <-exited: @@ -161,14 +164,15 @@ func waitUntilReadyOrExit(ready <-chan struct{}, exited <-chan struct{}, timeout // Close shuts down the stdio client, closing the stdin pipe and waiting for the subprocess to exit. // Returns an error if there are issues closing stdin or waiting for the subprocess to terminate. func (c *Stdio) Close() error { - close(c.done) + // cancel all in-flight request + tryCloseDone(c.done) if err := c.stdin.Close(); err != nil { return fmt.Errorf("failed to close stdin: %w", err) } if err := c.stderr.Close(); err != nil { return fmt.Errorf("failed to close stderr: %w", err) } - <-c.processExited + if err, ok := c.exitErr.Load().(error); ok && err != nil { return err } @@ -270,6 +274,7 @@ func (c *Stdio) SendRequest( deleteResponseChan() return nil, fmt.Errorf("failed to write request: %w", err) } + select { case <-ctx.Done(): deleteResponseChan() From 87dcc1951821479fab6b80cdaaeceeba01e4e324 Mon Sep 17 00:00:00 2001 From: winter_wang Date: Tue, 6 May 2025 18:23:18 +0800 Subject: [PATCH 10/10] fix --- client/transport/stdio.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/transport/stdio.go b/client/transport/stdio.go index edc2e2be0..8da8f6bc0 100644 --- a/client/transport/stdio.go +++ b/client/transport/stdio.go @@ -179,8 +179,8 @@ func (c *Stdio) Close() error { return nil } -// OnNotification registers a handler function to be called when notifications are received. -// Multiple handlers can be registered and will be called in the order they were added. +// SetNotificationHandler sets the handler function to be called when a notification is received. +// Only one handler can be set at a time; setting a new one replaces the previous handler. func (c *Stdio) SetNotificationHandler( handler func(notification mcp.JSONRPCNotification), ) {