diff --git a/www/docs/pages/transports/http.mdx b/www/docs/pages/transports/http.mdx index 1d0430d6a..7dc38a857 100644 --- a/www/docs/pages/transports/http.mdx +++ b/www/docs/pages/transports/http.mdx @@ -43,7 +43,7 @@ import ( func main() { s := server.NewMCPServer("StreamableHTTP API Server", "1.0.0", server.WithToolCapabilities(true), - server.WithResourceCapabilities(true), + server.WithResourceCapabilities(true, true), ) // Add RESTful tools @@ -60,7 +60,7 @@ func main() { mcp.WithDescription("Create a new user"), mcp.WithString("name", mcp.Required()), mcp.WithString("email", mcp.Required()), - mcp.WithInteger("age", mcp.Minimum(0)), + mcp.WithNumber("age", mcp.Min(0)), ), handleCreateUser, ) @@ -69,8 +69,8 @@ func main() { mcp.NewTool("search_users", mcp.WithDescription("Search users with filters"), mcp.WithString("query", mcp.Description("Search query")), - mcp.WithInteger("limit", mcp.Default(10), mcp.Maximum(100)), - mcp.WithInteger("offset", mcp.Default(0), mcp.Minimum(0)), + mcp.WithNumber("limit", mcp.DefaultNumber(10), mcp.Max(100)), + mcp.WithNumber("offset", mcp.DefaultNumber(0), mcp.Min(0)), ), handleSearchUsers, ) @@ -95,7 +95,10 @@ func main() { } func handleGetUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { - userID := req.Params.Arguments["user_id"].(string) + userID := req.GetString("user_id", "") + if userID == "" { + return nil, fmt.Errorf("user_id is required") + } // Simulate database lookup user, err := getUserFromDB(userID) @@ -103,13 +106,18 @@ func handleGetUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolR return nil, fmt.Errorf("user not found: %s", userID) } - return mcp.NewToolResultJSON(user), nil + return mcp.NewToolResultText(fmt.Sprintf(`{"id":"%s","name":"%s","email":"%s","age":%d}`, + user.ID, user.Name, user.Email, user.Age)), nil } func handleCreateUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { - name := req.Params.Arguments["name"].(string) - email := req.Params.Arguments["email"].(string) - age := int(req.Params.Arguments["age"].(float64)) + name := req.GetString("name", "") + email := req.GetString("email", "") + age := req.GetInt("age", 0) + + if name == "" || email == "" { + return nil, fmt.Errorf("name and email are required") + } // Validate input if !isValidEmail(email) { @@ -129,11 +137,8 @@ func handleCreateUser(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallTo return nil, fmt.Errorf("failed to create user: %w", err) } - return mcp.NewToolResultJSON(map[string]interface{}{ - "id": user.ID, - "message": "User created successfully", - "user": user, - }), nil + return mcp.NewToolResultText(fmt.Sprintf(`{"id":"%s","message":"User created successfully","user":{"id":"%s","name":"%s","email":"%s","age":%d}}`, + user.ID, user.ID, user.Name, user.Email, user.Age)), nil } // Helper functions and types for the examples @@ -156,7 +161,6 @@ func getUserFromDB(userID string) (*User, error) { } func isValidEmail(email string) bool { - // Simple email validation return strings.Contains(email, "@") && strings.Contains(email, ".") } @@ -171,9 +175,9 @@ func saveUserToDB(user *User) error { } func handleSearchUsers(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { - query := getStringParam(req.Params.Arguments, "query", "") - limit := int(getFloatParam(req.Params.Arguments, "limit", 10)) - offset := int(getFloatParam(req.Params.Arguments, "offset", 0)) + query := req.GetString("query", "") + limit := req.GetInt("limit", 10) + offset := req.GetInt("offset", 0) // Search users with pagination users, total, err := searchUsersInDB(query, limit, offset) @@ -181,16 +185,11 @@ func handleSearchUsers(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallT return nil, fmt.Errorf("search failed: %w", err) } - return mcp.NewToolResultJSON(map[string]interface{}{ - "users": users, - "total": total, - "limit": limit, - "offset": offset, - "query": query, - }), nil + return mcp.NewToolResultText(fmt.Sprintf(`{"users":[{"id":"1","name":"John Doe","email":"john@example.com","age":30},{"id":"2","name":"Jane Smith","email":"jane@example.com","age":25}],"total":%d,"limit":%d,"offset":%d,"query":"%s"}`, + total, limit, offset, query)), nil } -func handleUserResource(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) { +func handleUserResource(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) { userID := extractUserIDFromURI(req.Params.URI) user, err := getUserFromDB(userID) @@ -198,27 +197,16 @@ func handleUserResource(ctx context.Context, req mcp.ReadResourceRequest) (*mcp. return nil, fmt.Errorf("user not found: %s", userID) } - return mcp.NewResourceResultJSON(user), nil -} - -// Additional helper functions for parameter handling -func getStringParam(args map[string]interface{}, key, defaultValue string) string { - if val, ok := args[key]; ok && val != nil { - if str, ok := val.(string); ok { - return str - } - } - return defaultValue + return []mcp.ResourceContents{ + mcp.TextResourceContents{ + URI: req.Params.URI, + MIMEType: "application/json", + Text: fmt.Sprintf(`{"id":"%s","name":"%s","email":"%s","age":%d}`, user.ID, user.Name, user.Email, user.Age), + }, + }, nil } -func getFloatParam(args map[string]interface{}, key string, defaultValue float64) float64 { - if val, ok := args[key]; ok && val != nil { - if f, ok := val.(float64); ok { - return f - } - } - return defaultValue -} +// Additional helper functions func searchUsersInDB(query string, limit, offset int) ([]*User, int, error) { // Placeholder implementation @@ -231,9 +219,8 @@ func searchUsersInDB(query string, limit, offset int) ([]*User, int, error) { func extractUserIDFromURI(uri string) string { // Extract user ID from URI like "users://123" - parts := strings.Split(uri, "://") - if len(parts) > 1 { - return parts[1] + if len(uri) > 8 && uri[:8] == "users://" { + return uri[8:] } return uri } @@ -244,39 +231,24 @@ func extractUserIDFromURI(uri string) string { ```go func main() { s := server.NewMCPServer("Advanced StreamableHTTP Server", "1.0.0", - server.WithAllCapabilities(), - server.WithRecovery(), - server.WithHooks(&server.Hooks{ - OnToolCall: logToolCall, - OnResourceRead: logResourceRead, - }), + server.WithResourceCapabilities(true, true), + server.WithPromptCapabilities(true), + server.WithToolCapabilities(true), + server.WithLogging(), ) - // Configure StreamableHTTP-specific options - streamableHTTPOptions := server.StreamableHTTPOptions{ - BasePath: "/api/v1/mcp", - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - IdleTimeout: 60 * time.Second, - MaxBodySize: 10 * 1024 * 1024, // 10MB - EnableCORS: true, - AllowedOrigins: []string{"https://myapp.com", "http://localhost:3000"}, - AllowedMethods: []string{"GET", "POST", "OPTIONS"}, - AllowedHeaders: []string{"Content-Type", "Authorization"}, - EnableGzip: true, - TrustedProxies: []string{"10.0.0.0/8", "172.16.0.0/12"}, - } - - // Add middleware - addStreamableHTTPMiddleware(s) - - // Add comprehensive tools + // Add comprehensive tools and resources addCRUDTools(s) addBatchTools(s) addAnalyticsTools(s) log.Println("Starting advanced StreamableHTTP server on :8080") - httpServer := server.NewStreamableHTTPServer(s, streamableHTTPOptions...) + httpServer := server.NewStreamableHTTPServer(s, + server.WithEndpointPath("/api/v1/mcp"), + server.WithHeartbeatInterval(30*time.Second), + server.WithStateLess(false), + ) + if err := httpServer.Start(":8080"); err != nil { log.Fatal(err) } diff --git a/www/docs/pages/transports/inprocess.mdx b/www/docs/pages/transports/inprocess.mdx index dce982357..bfdb1536b 100644 --- a/www/docs/pages/transports/inprocess.mdx +++ b/www/docs/pages/transports/inprocess.mdx @@ -56,19 +56,34 @@ func main() { ) // Create in-process client - client := client.NewInProcessClient(s) - defer client.Close() + mcpClient, err := client.NewInProcessClient(s) + if err != nil { + log.Fatal(err) + } + defer mcpClient.Close() ctx := context.Background() // Initialize - if err := client.Initialize(ctx); err != nil { + _, err = mcpClient.Initialize(ctx, mcp.InitializeRequest{ + Params: mcp.InitializeRequestParams{ + ProtocolVersion: "2024-11-05", + Capabilities: mcp.ClientCapabilities{ + Tools: &mcp.ToolsCapability{}, + }, + ClientInfo: mcp.Implementation{ + Name: "test-client", + Version: "1.0.0", + }, + }, + }) + if err != nil { log.Fatal(err) } // Use the calculator - result, err := client.CallTool(ctx, mcp.CallToolRequest{ - Params: mcp.CallToolRequestParams{ + result, err := mcpClient.CallTool(ctx, mcp.CallToolRequest{ + Params: mcp.CallToolParams{ Name: "calculate", Arguments: map[string]interface{}{ "operation": "add", @@ -81,13 +96,18 @@ func main() { log.Fatal(err) } - fmt.Printf("Result: %s\n", result.Content[0].Text) + // Extract text from the first content item + if len(result.Content) > 0 { + if textContent, ok := mcp.AsTextContent(result.Content[0]); ok { + fmt.Printf("Result: %s\n", textContent.Text) + } + } } func handleCalculate(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { - operation := req.Params.Arguments["operation"].(string) - x := req.Params.Arguments["x"].(float64) - y := req.Params.Arguments["y"].(float64) + operation := req.GetString("operation", "") + x := req.GetFloat("x", 0) + y := req.GetFloat("y", 0) var result float64 switch operation { @@ -134,7 +154,11 @@ func NewApplication(config *Config) *Application { app.addApplicationTools() // Create in-process client for internal use - app.mcpClient = client.NewInProcessClient(app.mcpServer) + var err error + app.mcpClient, err = client.NewInProcessClient(app.mcpServer) + if err != nil { + panic(err) + } return app } @@ -151,12 +175,8 @@ func (app *Application) addApplicationTools() { mcp.WithDescription("Get current application status"), ), func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { - status := map[string]interface{}{ - "app_name": app.config.AppName, - "debug": app.config.Debug, - "status": "running", - } - return mcp.NewToolResultJSON(status), nil + return mcp.NewToolResultText(fmt.Sprintf(`{"app_name":"%s","debug":%t,"status":"running"}`, + app.config.AppName, app.config.Debug)), nil }, ) @@ -168,8 +188,8 @@ func (app *Application) addApplicationTools() { mcp.WithString("value", mcp.Required()), ), func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { - key := req.Params.Arguments["key"].(string) - value := req.Params.Arguments["value"].(string) + key := req.GetString("key", "") + value := req.GetString("value", "") // Update configuration based on key switch key { @@ -189,7 +209,7 @@ func (app *Application) addApplicationTools() { func (app *Application) ProcessWithMCP(ctx context.Context, operation string) (interface{}, error) { // Use MCP tools internally for processing result, err := app.mcpClient.CallTool(ctx, mcp.CallToolRequest{ - Params: mcp.CallToolRequestParams{ + Params: mcp.CallToolParams{ Name: "calculate", Arguments: map[string]interface{}{ "operation": operation, @@ -202,7 +222,14 @@ func (app *Application) ProcessWithMCP(ctx context.Context, operation string) (i return nil, err } - return result.Content[0].Text, nil + // Extract text from the first content item + if len(result.Content) > 0 { + if textContent, ok := mcp.AsTextContent(result.Content[0]); ok { + return textContent.Text, nil + } + } + + return "no result", nil } // Usage example @@ -216,7 +243,19 @@ func main() { ctx := context.Background() // Initialize the embedded MCP client - if err := app.mcpClient.Initialize(ctx); err != nil { + _, err := app.mcpClient.Initialize(ctx, mcp.InitializeRequest{ + Params: mcp.InitializeRequestParams{ + ProtocolVersion: "2024-11-05", + Capabilities: mcp.ClientCapabilities{ + Tools: &mcp.ToolsCapability{}, + }, + ClientInfo: mcp.Implementation{ + Name: "embedded-client", + Version: "1.0.0", + }, + }, + }) + if err != nil { log.Fatal(err) } diff --git a/www/docs/pages/transports/sse.mdx b/www/docs/pages/transports/sse.mdx index 930bb0eba..f09104ec9 100644 --- a/www/docs/pages/transports/sse.mdx +++ b/www/docs/pages/transports/sse.mdx @@ -39,7 +39,7 @@ import ( func main() { s := server.NewMCPServer("SSE Server", "1.0.0", server.WithToolCapabilities(true), - server.WithResourceCapabilities(true), + server.WithResourceCapabilities(true, true), ) // Add real-time tools @@ -47,7 +47,7 @@ func main() { mcp.NewTool("stream_data", mcp.WithDescription("Stream data with real-time updates"), mcp.WithString("source", mcp.Required()), - mcp.WithInteger("count", mcp.Default(10)), + mcp.WithNumber("count", mcp.DefaultNumber(10)), ), handleStreamData, ) @@ -55,7 +55,7 @@ func main() { s.AddTool( mcp.NewTool("monitor_system", mcp.WithDescription("Monitor system metrics in real-time"), - mcp.WithInteger("duration", mcp.Default(60)), + mcp.WithNumber("duration", mcp.DefaultNumber(60)), ), handleSystemMonitor, ) @@ -73,19 +73,18 @@ func main() { // Start SSE server log.Println("Starting SSE server on :8080") - if err := server.ServeSSE(s, ":8080"); err != nil { + sseServer := server.NewSSEServer(s) + if err := sseServer.Start(":8080"); err != nil { log.Fatal(err) } } func handleStreamData(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { - source := req.Params.Arguments["source"].(string) - count := int(req.Params.Arguments["count"].(float64)) + source := req.GetString("source", "") + count := req.GetInt("count", 10) - // Get notifier for real-time updates (hypothetical functions) - // Note: These functions would be provided by the SSE transport implementation - notifier := getNotifierFromContext(ctx) // Hypothetical function - sessionID := getSessionIDFromContext(ctx) // Hypothetical function + // Get server from context for notifications + mcpServer := server.ServerFromContext(ctx) // Stream data with progress updates var results []map[string]interface{} @@ -102,23 +101,22 @@ func handleStreamData(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallTo results = append(results, data) // Send progress notification - if notifier != nil { - // Note: ProgressNotification would be defined by the MCP protocol - notifier.SendProgress(sessionID, map[string]interface{}{ + if mcpServer != nil { + err := mcpServer.SendNotificationToClient(ctx, "notifications/progress", map[string]interface{}{ "progress": i + 1, "total": count, "message": fmt.Sprintf("Processed %d/%d items from %s", i+1, count, source), }) + if err != nil { + log.Printf("Failed to send notification: %v", err) + } } time.Sleep(100 * time.Millisecond) } - return mcp.NewToolResultJSON(map[string]interface{}{ - "source": source, - "results": results, - "count": len(results), - }), nil + return mcp.NewToolResultText(fmt.Sprintf(`{"source":"%s","results":%v,"count":%d}`, + source, results, len(results))), nil } // Helper functions for the examples @@ -130,21 +128,10 @@ func generateData(source string, index int) map[string]interface{} { } } -func getNotifierFromContext(ctx context.Context) interface{} { - // Placeholder implementation - would be provided by SSE transport - return nil -} - -func getSessionIDFromContext(ctx context.Context) string { - // Placeholder implementation - would be provided by SSE transport - return "session_123" -} - func handleSystemMonitor(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { - duration := int(req.Params.Arguments["duration"].(float64)) + duration := req.GetInt("duration", 60) - notifier := getNotifierFromContext(ctx) - sessionID := getSessionIDFromContext(ctx) + mcpServer := server.ServerFromContext(ctx) // Monitor system for specified duration ticker := time.NewTicker(5 * time.Second) @@ -158,20 +145,19 @@ func handleSystemMonitor(ctx context.Context, req mcp.CallToolRequest) (*mcp.Cal case <-ctx.Done(): return nil, ctx.Err() case <-timeout: - return mcp.NewToolResultJSON(map[string]interface{}{ - "duration": duration, - "metrics": metrics, - "samples": len(metrics), - }), nil + return mcp.NewToolResultText(fmt.Sprintf(`{"duration":%d,"metrics":%v,"samples":%d}`, + duration, metrics, len(metrics))), nil case <-ticker.C: // Collect current metrics currentMetrics := collectSystemMetrics() metrics = append(metrics, currentMetrics) // Send real-time update - if notifier != nil { - // Note: SendCustom would be a method on the notifier interface - // notifier.SendCustom(sessionID, "system_metrics", currentMetrics) + if mcpServer != nil { + err := mcpServer.SendNotificationToClient(ctx, "system_metrics", currentMetrics) + if err != nil { + log.Printf("Failed to send system metrics notification: %v", err) + } } } } @@ -186,9 +172,15 @@ func collectSystemMetrics() map[string]interface{} { } } -func handleCurrentMetrics(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) { +func handleCurrentMetrics(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) { metrics := collectSystemMetrics() - return mcp.NewResourceResultJSON(metrics), nil + return []mcp.ResourceContents{ + mcp.TextResourceContents{ + URI: req.Params.URI, + MIMEType: "application/json", + Text: fmt.Sprintf(`{"cpu":%.1f,"memory":%.1f,"disk":%.1f}`, metrics["cpu"], metrics["memory"], metrics["disk"]), + }, + }, nil } ``` @@ -197,50 +189,29 @@ func handleCurrentMetrics(ctx context.Context, req mcp.ReadResourceRequest) (*mc ```go func main() { s := server.NewMCPServer("Advanced SSE Server", "1.0.0", - server.WithAllCapabilities(), - server.WithRecovery(), - server.WithHooks(&server.Hooks{ - OnSessionStart: func(sessionID string) { - log.Printf("SSE client connected: %s", sessionID) - broadcastUserCount() - }, - OnSessionEnd: func(sessionID string) { - log.Printf("SSE client disconnected: %s", sessionID) - broadcastUserCount() - }, - }), + server.WithResourceCapabilities(true, true), + server.WithPromptCapabilities(true), + server.WithToolCapabilities(true), + server.WithLogging(), ) - // Configure SSE-specific options - sseOptions := server.SSEOptions{ - BasePath: "/mcp", - AllowedOrigins: []string{"http://localhost:3000", "https://myapp.com"}, - HeartbeatInterval: 30 * time.Second, - MaxConnections: 100, - ConnectionTimeout: 5 * time.Minute, - EnableCompression: true, - } - // Add collaborative tools addCollaborativeTools(s) addRealTimeResources(s) log.Println("Starting advanced SSE server on :8080") - if err := server.ServeSSEWithOptions(s, ":8080", sseOptions); err != nil { + sseServer := server.NewSSEServer(s, + server.WithStaticBasePath("/mcp"), + server.WithKeepAliveInterval(30*time.Second), + server.WithBaseURL("http://localhost:8080"), + ) + + if err := sseServer.Start(":8080"); err != nil { log.Fatal(err) } } // Helper functions for the advanced example -func broadcastUserCount() { - // Placeholder implementation - log.Println("Broadcasting user count update") -} - -func addCollaborativeToolsPlaceholder(s *server.MCPServer) { - // Placeholder implementation - would add collaborative tools -} - func addRealTimeResources(s *server.MCPServer) { // Placeholder implementation - would add real-time resources } @@ -262,7 +233,7 @@ func addCollaborativeTools(s *server.MCPServer) { mcp.NewTool("send_message", mcp.WithDescription("Send a message to all connected clients"), mcp.WithString("message", mcp.Required()), - mcp.WithString("channel", mcp.Default("general")), + mcp.WithString("channel", mcp.DefaultString("general")), ), handleSendMessage, ) @@ -281,240 +252,71 @@ func addCollaborativeTools(s *server.MCPServer) { ## Configuration -### Base URLs and Paths - -```go -// Custom SSE endpoint configuration -sseOptions := server.SSEOptions{ - BasePath: "/api/mcp", // SSE endpoint will be /api/mcp/sse - - // Additional HTTP endpoints - HealthPath: "/api/health", - MetricsPath: "/api/metrics", - StatusPath: "/api/status", -} - -// Start server with custom paths -server.ServeSSEWithOptions(s, ":8080", sseOptions) -``` - -**Resulting endpoints:** -- SSE stream: `http://localhost:8080/api/mcp/sse` -- Health check: `http://localhost:8080/api/health` -- Metrics: `http://localhost:8080/api/metrics` -- Status: `http://localhost:8080/api/status` - -### CORS Configuration - -```go -sseOptions := server.SSEOptions{ - // Allow specific origins - AllowedOrigins: []string{ - "http://localhost:3000", - "https://myapp.com", - "https://*.myapp.com", - }, - - // Allow all origins (development only) - AllowAllOrigins: true, - - // Custom CORS headers - AllowedHeaders: []string{ - "Authorization", - "Content-Type", - "X-API-Key", - }, - - // Allow credentials - AllowCredentials: true, -} -``` +### SSE Server Options -### Connection Management +The SSE server can be configured with various options: ```go -sseOptions := server.SSEOptions{ - // Connection limits - MaxConnections: 100, - MaxConnectionsPerIP: 10, +sseServer := server.NewSSEServer(s, + // Set the base path for SSE endpoints + server.WithStaticBasePath("/api/mcp"), - // Timeouts - ConnectionTimeout: 5 * time.Minute, - WriteTimeout: 30 * time.Second, - ReadTimeout: 30 * time.Second, + // Configure keep-alive interval + server.WithKeepAliveInterval(30*time.Second), - // Heartbeat to keep connections alive - HeartbeatInterval: 30 * time.Second, + // Set base URL for client connections + server.WithBaseURL("http://localhost:8080"), - // Buffer sizes - WriteBufferSize: 4096, - ReadBufferSize: 4096, + // Configure SSE and message endpoints + server.WithSSEEndpoint("/sse"), + server.WithMessageEndpoint("/message"), - // Compression - EnableCompression: true, - CompressionLevel: 6, -} + // Add context function for request processing + server.WithSSEContextFunc(func(ctx context.Context, r *http.Request) context.Context { + // Add custom context values from headers + return ctx + }), +) ``` -## Session Handling - -### Multi-Client State Management - -```go -type SessionManager struct { - sessions map[string]*ClientSession - mutex sync.RWMutex - notifier *SSENotifier -} - -type ClientSession struct { - ID string - UserID string - ConnectedAt time.Time - LastSeen time.Time - Subscriptions map[string]bool - Metadata map[string]interface{} -} - -func NewSessionManager() *SessionManager { - return &SessionManager{ - sessions: make(map[string]*ClientSession), - notifier: NewSSENotifier(), - } -} +**Resulting endpoints:** +- SSE stream: `http://localhost:8080/api/mcp/sse` +- Message endpoint: `http://localhost:8080/api/mcp/message` -func (sm *SessionManager) OnSessionStart(sessionID string) { - sm.mutex.Lock() - defer sm.mutex.Unlock() - - session := &ClientSession{ - ID: sessionID, - ConnectedAt: time.Now(), - LastSeen: time.Now(), - Subscriptions: make(map[string]bool), - Metadata: make(map[string]interface{}), - } - - sm.sessions[sessionID] = session - - // Notify other clients - sm.notifier.BroadcastExcept(sessionID, "user_joined", map[string]interface{}{ - "session_id": sessionID, - "timestamp": time.Now().Unix(), - }) -} +## Real-Time Notifications -func (sm *SessionManager) OnSessionEnd(sessionID string) { - sm.mutex.Lock() - defer sm.mutex.Unlock() - - delete(sm.sessions, sessionID) - - // Notify other clients - sm.notifier.Broadcast("user_left", map[string]interface{}{ - "session_id": sessionID, - "timestamp": time.Now().Unix(), - }) -} +SSE transport enables real-time server-to-client communication through notifications. Use the server context to send notifications: -func (sm *SessionManager) GetActiveSessions() []ClientSession { - sm.mutex.RLock() - defer sm.mutex.RUnlock() +```go +func handleRealtimeTool(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { + // Get the MCP server from context + mcpServer := server.ServerFromContext(ctx) - var sessions []ClientSession - for _, session := range sm.sessions { - sessions = append(sessions, *session) + // Send a notification to the client + if mcpServer != nil { + err := mcpServer.SendNotificationToClient(ctx, "custom_event", map[string]interface{}{ + "message": "Real-time update", + "timestamp": time.Now().Unix(), + }) + if err != nil { + log.Printf("Failed to send notification: %v", err) + } } - return sessions + return mcp.NewToolResultText(`{"status":"notification_sent"}`), nil } ``` -### Real-Time Notifications - -```go -type SSENotifier struct { - clients map[string]chan mcp.Notification - mutex sync.RWMutex -} +### Session Management -func NewSSENotifier() *SSENotifier { - return &SSENotifier{ - clients: make(map[string]chan mcp.Notification), - } -} - -func (n *SSENotifier) RegisterClient(sessionID string) <-chan mcp.Notification { - n.mutex.Lock() - defer n.mutex.Unlock() - - ch := make(chan mcp.Notification, 100) - n.clients[sessionID] = ch - return ch -} - -func (n *SSENotifier) UnregisterClient(sessionID string) { - n.mutex.Lock() - defer n.mutex.Unlock() - - if ch, exists := n.clients[sessionID]; exists { - close(ch) - delete(n.clients, sessionID) - } -} - -func (n *SSENotifier) SendToClient(sessionID string, notification mcp.Notification) { - n.mutex.RLock() - defer n.mutex.RUnlock() - - if ch, exists := n.clients[sessionID]; exists { - select { - case ch <- notification: - default: - // Channel full, drop notification - } - } -} +The SSE server automatically handles session management. You can send events to specific sessions using the server's notification methods: -func (n *SSENotifier) Broadcast(eventType string, data interface{}) { - notification := mcp.Notification{ - Type: eventType, - Data: data, - } - - n.mutex.RLock() - defer n.mutex.RUnlock() - - for _, ch := range n.clients { - select { - case ch <- notification: - default: - // Channel full, skip this client - } - } -} +```go +// Send notification to current client session +mcpServer.SendNotificationToClient(ctx, "progress_update", progressData) -func (n *SSENotifier) BroadcastExcept(excludeSessionID, eventType string, data interface{}) { - notification := mcp.Notification{ - Type: eventType, - Data: data, - } - - n.mutex.RLock() - defer n.mutex.RUnlock() - - for sessionID, ch := range n.clients { - if sessionID == excludeSessionID { - continue - } - - select { - case ch <- notification: - default: - // Channel full, skip this client - } - } -} +// Send notification to all connected clients (if supported) +// Note: Check the server implementation for broadcast capabilities ``` ## Next Steps diff --git a/www/docs/pages/transports/stdio.mdx b/www/docs/pages/transports/stdio.mdx index 6dea7adcf..b1c744b59 100644 --- a/www/docs/pages/transports/stdio.mdx +++ b/www/docs/pages/transports/stdio.mdx @@ -40,7 +40,7 @@ import ( func main() { s := server.NewMCPServer("File Tools", "1.0.0", server.WithToolCapabilities(true), - server.WithResourceCapabilities(true), + server.WithResourceCapabilities(true, true), ) // Add file listing tool @@ -52,7 +52,7 @@ func main() { mcp.Description("Directory path to list"), ), mcp.WithBoolean("recursive", - mcp.Default(false), + mcp.DefaultBool(false), mcp.Description("List files recursively"), ), ), @@ -98,15 +98,11 @@ func handleListFiles(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToo return mcp.NewToolResultError(fmt.Sprintf("failed to list files: %v", err)), nil } - return mcp.NewToolResultJSON(map[string]interface{}{ - "path": path, - "files": files, - "count": len(files), - "recursive": recursive, - }), nil + return mcp.NewToolResultText(fmt.Sprintf(`{"path":"%s","files":%v,"count":%d,"recursive":%t}`, + path, files, len(files), recursive)), nil } -func handleFileContent(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.ReadResourceResult, error) { +func handleFileContent(ctx context.Context, req mcp.ReadResourceRequest) ([]mcp.ResourceContents, error) { // Extract path from URI: "file:///path/to/file" -> "/path/to/file" path := extractPathFromURI(req.Params.URI) @@ -119,13 +115,11 @@ func handleFileContent(ctx context.Context, req mcp.ReadResourceRequest) (*mcp.R return nil, fmt.Errorf("failed to read file: %w", err) } - return &mcp.ReadResourceResult{ - Contents: []mcp.ResourceContent{ - { - URI: req.Params.URI, - MIMEType: detectMIMEType(path), - Text: string(content), - }, + return []mcp.ResourceContents{ + mcp.TextResourceContents{ + URI: req.Params.URI, + MIMEType: detectMIMEType(path), + Text: string(content), }, }, nil } @@ -211,16 +205,10 @@ import ( func main() { s := server.NewMCPServer("Advanced CLI Tool", "1.0.0", - server.WithAllCapabilities(), - server.WithRecovery(), - server.WithHooks(&server.Hooks{ - OnSessionStart: func(sessionID string) { - logToFile(fmt.Sprintf("Session started: %s", sessionID)) - }, - OnSessionEnd: func(sessionID string) { - logToFile(fmt.Sprintf("Session ended: %s", sessionID)) - }, - }), + server.WithResourceCapabilities(true, true), + server.WithPromptCapabilities(true), + server.WithToolCapabilities(true), + server.WithLogging(), ) // Add comprehensive tools @@ -342,12 +330,24 @@ func main() { ctx := context.Background() // Initialize connection - if err := c.Initialize(ctx); err != nil { + _, err = c.Initialize(ctx, mcp.InitializeRequest{ + Params: mcp.InitializeRequestParams{ + ProtocolVersion: "2024-11-05", + Capabilities: mcp.ClientCapabilities{ + Tools: &mcp.ToolsCapability{}, + }, + ClientInfo: mcp.Implementation{ + Name: "test-client", + Version: "1.0.0", + }, + }, + }) + if err != nil { log.Fatal(err) } // List available tools - tools, err := c.ListTools(ctx) + tools, err := c.ListTools(ctx, mcp.ListToolsRequest{}) if err != nil { log.Fatal(err) } @@ -359,7 +359,7 @@ func main() { // Call a tool result, err := c.CallTool(ctx, mcp.CallToolRequest{ - Params: mcp.CallToolRequestParams{ + Params: mcp.CallToolParams{ Name: "list_files", Arguments: map[string]interface{}{ "path": ".", @@ -445,18 +445,7 @@ func main() { s := server.NewMCPServer("Debug Server", "1.0.0", server.WithToolCapabilities(true), - server.WithHooks(&server.Hooks{ - OnSessionStart: func(sessionID string) { - logger.Printf("Session started: %s", sessionID) - }, - OnToolCall: func(sessionID, toolName string, duration time.Duration, err error) { - if err != nil { - logger.Printf("Tool %s failed: %v", toolName, err) - } else { - logger.Printf("Tool %s completed in %v", toolName, duration) - } - }, - }), + server.WithLogging(), ) // Add tools with debug logging @@ -466,7 +455,7 @@ func main() { mcp.WithString("message", mcp.Required()), ), func(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { - message := req.Params.Arguments["message"].(string) + message := req.GetString("message", "") logger.Printf("Echo tool called with message: %s", message) return mcp.NewToolResultText(fmt.Sprintf("Echo: %s", message)), nil }, @@ -504,8 +493,8 @@ This opens a web interface where you can: ```go func handleToolWithErrors(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { // Validate required parameters - path, ok := req.Params.Arguments["path"].(string) - if !ok { + path, err := req.RequireString("path") + if err != nil { return nil, fmt.Errorf("path parameter is required and must be a string") } @@ -543,7 +532,7 @@ func handleToolWithErrors(ctx context.Context, req mcp.CallToolRequest) (*mcp.Ca return nil, fmt.Errorf("operation failed: %w", err) } - return mcp.NewToolResultJSON(result), nil + return mcp.NewToolResultText(fmt.Sprintf("%v", result)), nil } ``` @@ -638,7 +627,7 @@ func getCachedFile(path string) (string, bool) { ```go func handleLargeFile(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { - path := req.Params.Arguments["path"].(string) + path := req.GetString("path", "") // Stream large files instead of loading into memory file, err := os.Open(path)