-
Notifications
You must be signed in to change notification settings - Fork 461
added the configurable items sse_read_timeout and headers to mcp-client #55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe pull request enhances the Changes
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
client/sse.go (2)
21-33
: Document the new features in function commentsWhile the implementation is good, it would be helpful to document the new configuration options in the function comments, especially for
NewSSEMCPClient
,WithHeaders
, andWithSSEReadTimeout
.+// Option defines a function type that modifies the SSEMCPClient configuration. type Option func(*SSEMCPClient) +// WithHeaders sets custom HTTP headers that will be included in all requests sent by the client. func WithHeaders(headers map[string]string) Option { return func(sc *SSEMCPClient) { sc.headers = headers } } +// WithSSEReadTimeout sets the timeout duration for SSE read operations. +// If not set, a default timeout will be used. func WithSSEReadTimeout(timeout time.Duration) Option { return func(sc *SSEMCPClient) { sc.sseReadTimeout = timeout } } // SSEMCPClient implements the MCPClient interface using Server-Sent Events (SSE). // It maintains a persistent HTTP connection to receive server-pushed events // while sending requests over regular HTTP POST calls. The client handles // automatic reconnection and message routing between requests and responses.And update the
NewSSEMCPClient
documentation:-// NewSSEMCPClient creates a new SSE-based MCP client with the given base URL. -// Returns an error if the URL is invalid. +// NewSSEMCPClient creates a new SSE-based MCP client with the given base URL and options. +// Options can include custom HTTP headers and SSE read timeout settings. +// Returns an error if the URL is invalid. func NewSSEMCPClient(baseURL string, options ...Option) (*SSEMCPClient, error) {Also applies to: 43-43, 53-53, 58-76, 126-166, 305-308
21-33
: Consider adding validation for headersThe
WithHeaders
option accepts any map of headers without validation. While this provides flexibility, it could lead to overriding essential headers like "Content-Type" or "Accept".Consider adding validation or merging logic to prevent overriding critical headers:
func WithHeaders(headers map[string]string) Option { return func(sc *SSEMCPClient) { + // Create a new map if headers is nil + if sc.headers == nil { + sc.headers = make(map[string]string) + } + + // Copy headers, skipping those that would override critical ones + for k, v := range headers { + // Skip critical headers that shouldn't be overridden + if k == "Content-Type" || k == "Accept" || k == "Connection" { + continue + } + sc.headers[k] = v + } - sc.headers = headers } } // In sendRequest method req.Header.Set("Content-Type", "application/json") // set custom http headers for k, v := range c.headers { - req.Header.Set(k, v) + // Don't override headers already set + if v != "" && req.Header.Get(k) == "" { + req.Header.Set(k, v) + } }Also applies to: 305-308
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
client/sse.go
(3 hunks)
🔇 Additional comments (4)
client/sse.go (4)
21-33
: Well-implemented functional options patternThe addition of the functional options pattern is a good approach for configuring the
SSEMCPClient
. This pattern provides flexibility for adding new configuration options in the future without breaking existing code.
43-43
: Appropriate field additions for new configurationsThe new fields
headers
andsseReadTimeout
align well with the PR objective to support custom headers and read timeout configuration.Also applies to: 53-53
58-76
: Constructor properly updated to support optionsThe constructor has been correctly modified to accept and apply functional options, while maintaining backward compatibility by making the options parameter variadic.
305-308
: Headers implementation looks goodThe custom headers implementation in the
sendRequest
method is clean and straightforward.
According to the mcp's structural definition of the tool, inputSchema.properties cannot be null
I introduced another commit in the current pr, sorry, I'm not familiar with the whole git process. New submission to solve the problem (junjiem/dify-plugin-agent-mcp_sse#1). As a result, when I used dify to interconnect with the |
hi @ezynda3 could you please help check this PR? I hope we can work together as soon as possible. At present, there are already some conflicts. Do you need me to solve them? Thank you very much! |
Yes please fix the merge conflicts and I can have a look at the PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
client/sse.go
(3 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
client/sse.go (3)
client/types.go (1)
RPCResponse
(5-8)mcp/types.go (2)
JSONRPCNotification
(192-195)ServerCapabilities
(319-342)client/stdio.go (16)
c
(86-92)c
(96-102)c
(107-170)c
(176-224)c
(226-229)c
(231-285)c
(287-307)c
(309-329)c
(331-342)c
(344-350)c
(352-358)c
(360-375)c
(377-387)c
(389-404)c
(406-416)c
(418-424)
🔇 Additional comments (5)
client/sse.go (5)
37-39
: LGTM: New client configuration fieldsThe addition of
headers
andsseReadTimeout
fields to theSSEMCPClient
struct provides the necessary structure for the new configurable options.
42-54
: Well-implemented functional options patternThe implementation of the functional options pattern with
WithHeaders
andWithSSEReadTimeout
is clean and follows Go best practices. This approach provides a flexible, extensible way to configure the client.
58-78
: Good default values in constructorThe constructor properly initializes the new fields with sensible defaults (30-second timeout and empty headers map) before applying the provided options. This ensures the client behaves correctly even without explicit configuration.
134-175
: Improve timeout handling in readSSE methodThe current implementation creates a single timeout context at the beginning of the method, which means:
- The timeout applies to the entire SSE connection
- After the timeout period, the connection will be terminated regardless of activity
This might not be the expected behavior. Consider implementing a per-read timeout instead.
For a better implementation, consider:
- Using a parent context that's canceled when the client is closed
- Setting timeouts for individual read operations
func (c *SSEMCPClient) readSSE(reader io.ReadCloser) { defer reader.Close() - ctx, cancel := context.WithTimeout(context.Background(), c.sseReadTimeout) + // Create a parent context that can be canceled when the client is closed + ctx, cancel := context.WithCancel(context.Background()) defer cancel() + + // Cancel context when client is closed + go func() { + select { + case <-c.done: + cancel() + } + }() br := bufio.NewReader(reader) var event, data string for { select { case <-ctx.Done(): return default: + // Set timeout for this specific read operation + readCtx, readCancel := context.WithTimeout(ctx, c.sseReadTimeout) + readDone := make(chan struct{}) + + go func() { + defer close(readDone) + line, err := br.ReadString('\n') + // Process line here... + }() + + select { + case <-readCtx.Done(): + if err := readCtx.Err(); err != context.Canceled { + fmt.Printf("SSE read timed out: %v\n", err) + return + } + case <-readDone: + // Read completed successfully + } + readCancel()
304-307
: LGTM: Custom headers implementationThe implementation for setting custom HTTP headers is straightforward and effective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
client/sse.go (1)
131-138
: 🛠️ Refactor suggestionConsider implementing per-read timeouts instead of a single timeout.
The current implementation creates a single timeout context at the beginning of the method, which means:
- If the timeout is meant to apply to the entire SSE connection, it will terminate after
sseReadTimeout
regardless of activity- If the timeout is very short, it might disconnect even during active communications
Consider implementing a per-read timeout that resets on successful reads:
-ctx, cancel := context.WithTimeout(context.Background(), c.sseReadTimeout) -defer cancel() +// Create a parent context that can be canceled when the client is closed +ctx, cancel := context.WithCancel(context.Background()) +defer cancel() + +// Start a goroutine that will cancel the context when c.done is closed +go func() { + select { + case <-c.done: + cancel() + } +}() for { select { case <-ctx.Done(): + if errors.Is(ctx.Err(), context.Canceled) { + // Client was closed, normal termination + return + } + // Timeout occurred + fmt.Printf("SSE read timed out: %v\n", ctx.Err()) return default: + // Use a timeout for this specific read + readCtx, readCancel := context.WithTimeout(ctx, c.sseReadTimeout) + lineCh := make(chan string, 1) + errCh := make(chan error, 1) + + go func() { + line, err := br.ReadString('\n') + if err != nil { + errCh <- err + return + } + lineCh <- line + }() + + select { + case <-readCtx.Done(): + readCancel() + return + case err := <-errCh: + readCancel() + // Handle error as in the current implementation + // ... + case line := <-lineCh: + readCancel() + // Process line as in the current implementation + // ... + } } }
🧹 Nitpick comments (3)
client/sse.go (3)
42-54
: Make a defensive copy of headers in WithHeaders function.The
WithHeaders
function directly assigns the provided map to the client's headers field. If the caller modifies the map after creating the option, it will affect the client's behavior.Consider making a defensive copy:
func WithHeaders(headers map[string]string) ClientOption { return func(sc *SSEMCPClient) { - sc.headers = headers + // Make a defensive copy + for k, v := range headers { + sc.headers[k] = v + } } }
70-70
: Validate timeout value to prevent immediate timeouts.The default timeout of 30 seconds is reasonable, but there's no validation to ensure that a user-provided timeout isn't too short or zero.
Consider adding validation in the
WithSSEReadTimeout
function:func WithSSEReadTimeout(timeout time.Duration) ClientOption { return func(sc *SSEMCPClient) { + // Ensure timeout is at least 1 second to prevent immediate timeouts + if timeout < time.Second { + timeout = time.Second + } sc.sseReadTimeout = timeout } }
304-307
: Check for potential header conflicts.The current implementation adds all headers from the map without checking for potential conflicts with headers set elsewhere in the code.
Consider adding a check for important headers:
// set custom http headers for k, v := range c.headers { + // Skip Content-Type if it's already set to prevent overriding + if strings.ToLower(k) == "content-type" && req.Header.Get("Content-Type") != "" { + continue + } req.Header.Set(k, v) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
client/sse.go
(3 hunks)
🧰 Additional context used
🧬 Code Definitions (1)
client/sse.go (2)
client/types.go (1)
RPCResponse
(5-8)mcp/types.go (2)
JSONRPCNotification
(192-195)ServerCapabilities
(319-342)
🔇 Additional comments (2)
client/sse.go (2)
38-39
: Good addition of configuration fields to the struct.The addition of
headers
andsseReadTimeout
fields to theSSEMCPClient
struct is a good enhancement that allows for more flexibility in client configuration.
58-78
: Good implementation of the functional options pattern.The updated constructor correctly implements the functional options pattern, setting reasonable defaults and applying user-provided options.
Added the option design mode to mcp-client and added the configurable items sse_read_timeout and headers.
The readSSE() method and sendRequest() method take effect with the SSEMCPClient object, respectively.
When I was looking at Python-sdk, I found that the mcp-client can specify headers and sse_read_timeout configurations when initializing objects. In my usage scenario, it is necessary for the mcp-client object to carry a custom request header. I think this is a universal scenario.
Summary by CodeRabbit
New Features
Changes
Properties
field is always included in the output for theToolInputSchema
.