From 004e42fd57560461e9b0b8a39ce008024f6fb58c Mon Sep 17 00:00:00 2001 From: AlanHsu Date: Fri, 21 Mar 2025 16:36:20 +0800 Subject: [PATCH] replace bufio.Scanner with bufio.Reader and add error handling --- client/sse.go | 75 ++++++++++++++++++++++++++++----------------------- 1 file changed, 42 insertions(+), 33 deletions(-) diff --git a/client/sse.go b/client/sse.go index 2cc6635f8..20886d3fd 100644 --- a/client/sse.go +++ b/client/sse.go @@ -99,39 +99,48 @@ func (c *SSEMCPClient) Start(ctx context.Context) error { // readSSE continuously reads the SSE stream and processes events. // It runs until the connection is closed or an error occurs. func (c *SSEMCPClient) readSSE(reader io.ReadCloser) { - defer reader.Close() - - scanner := bufio.NewScanner(reader) - var event, data string - - for scanner.Scan() { - line := scanner.Text() - - if line == "" { - // Empty line means end of event - if event != "" && data != "" { - c.handleSSEEvent(event, data) - event = "" - data = "" - } - continue - } - - if strings.HasPrefix(line, "event:") { - event = strings.TrimSpace(strings.TrimPrefix(line, "event:")) - } else if strings.HasPrefix(line, "data:") { - data = strings.TrimSpace(strings.TrimPrefix(line, "data:")) - } - } - - if err := scanner.Err(); err != nil { - select { - case <-c.done: - return - default: - fmt.Printf("SSE stream error: %v\n", err) - } - } + defer reader.Close() + + br := bufio.NewReader(reader) + var event, data string + + for { + line, err := br.ReadString('\n') + if err != nil { + if err == io.EOF { + // Process any pending event before exit + if event != "" && data != "" { + c.handleSSEEvent(event, data) + } + break + } + select { + case <-c.done: + return + default: + fmt.Printf("SSE stream error: %v\n", err) + return + } + } + + // Remove only newline markers + line = strings.TrimRight(line, "\r\n") + if line == "" { + // Empty line means end of event + if event != "" && data != "" { + c.handleSSEEvent(event, data) + event = "" + data = "" + } + continue + } + + if strings.HasPrefix(line, "event:") { + event = strings.TrimSpace(strings.TrimPrefix(line, "event:")) + } else if strings.HasPrefix(line, "data:") { + data = strings.TrimSpace(strings.TrimPrefix(line, "data:")) + } + } } // handleSSEEvent processes SSE events based on their type.