Skip to content

Commit a627a2f

Browse files
committed
use atomic for closed to be more natural compared to started
1 parent 2672c13 commit a627a2f

File tree

1 file changed

+8
-15
lines changed

1 file changed

+8
-15
lines changed

client/transport/sse.go

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type SSE struct {
3434
sseReadTimeout time.Duration
3535

3636
started atomic.Bool
37-
closed chan struct{}
37+
closed atomic.Bool
3838
cancelSSEStream context.CancelFunc
3939
}
4040

@@ -64,7 +64,6 @@ func NewSSE(baseURL string, options ...ClientOption) (*SSE, error) {
6464
baseURL: parsedURL,
6565
httpClient: &http.Client{},
6666
responses: make(map[int64]chan *JSONRPCResponse),
67-
closed: make(chan struct{}),
6867
endpointChan: make(chan struct{}),
6968
sseReadTimeout: 30 * time.Second,
7069
headers: make(map[string]string),
@@ -91,9 +90,7 @@ func (c *SSE) Start(ctx context.Context) error {
9190
req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL.String(), nil)
9291

9392
if err != nil {
94-
9593
return fmt.Errorf("failed to create request: %w", err)
96-
9794
}
9895

9996
req.Header.Set("Accept", "text/event-stream")
@@ -153,13 +150,10 @@ func (c *SSE) readSSE(reader io.ReadCloser) {
153150
}
154151
break
155152
}
156-
select {
157-
case <-c.closed:
158-
return
159-
default:
153+
if !c.closed.Load() {
160154
fmt.Printf("SSE stream error: %v\n", err)
161-
return
162155
}
156+
return
163157
}
164158

165159
// Remove only newline markers
@@ -248,9 +242,11 @@ func (c *SSE) SendRequest(
248242
) (*JSONRPCResponse, error) {
249243

250244
if !c.started.Load() {
251-
return nil, fmt.Errorf("transport not started")
245+
return nil, fmt.Errorf("transport not started yet")
246+
}
247+
if c.closed.Load() {
248+
return nil, fmt.Errorf("transport has been closed")
252249
}
253-
254250
if c.endpoint == nil {
255251
return nil, fmt.Errorf("endpoint not received")
256252
}
@@ -311,11 +307,8 @@ func (c *SSE) SendRequest(
311307
// Close shuts down the SSE client connection and cleans up any pending responses.
312308
// Returns an error if the shutdown process fails.
313309
func (c *SSE) Close() error {
314-
select {
315-
case <-c.closed:
310+
if !c.closed.CompareAndSwap(false, true) {
316311
return nil // Already closed
317-
default:
318-
close(c.closed)
319312
}
320313

321314
if c.cancelSSEStream != nil {

0 commit comments

Comments
 (0)