Skip to content

Commit 3778138

Browse files
authored
refactor: Enhance stream processing with abort signal support and increase default timeout (#18)
Signed-off-by: Eden Reich <[email protected]>
1 parent 99b34e7 commit 3778138

File tree

1 file changed

+49
-14
lines changed

1 file changed

+49
-14
lines changed

src/client.ts

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,20 @@ class StreamProcessor {
5151
this.clientProvidedTools = clientProvidedTools;
5252
}
5353

54-
async processStream(body: ReadableStream<Uint8Array>): Promise<void> {
54+
async processStream(
55+
body: ReadableStream<Uint8Array>,
56+
abortSignal?: AbortSignal
57+
): Promise<void> {
5558
const reader = body.getReader();
5659
const decoder = new TextDecoder();
5760
let buffer = '';
5861

5962
try {
6063
while (true) {
64+
if (abortSignal?.aborted) {
65+
throw new Error('Stream processing was aborted');
66+
}
67+
6168
const { done, value } = await reader.read();
6269
if (done) break;
6370

@@ -73,6 +80,11 @@ class StreamProcessor {
7380
}
7481
}
7582
} catch (error) {
83+
if (abortSignal?.aborted || (error as Error).name === 'AbortError') {
84+
console.log('Stream processing was cancelled');
85+
return;
86+
}
87+
7688
const apiError: SchemaError = {
7789
error: (error as Error).message || 'Unknown error',
7890
};
@@ -215,10 +227,10 @@ class StreamProcessor {
215227
}
216228

217229
private finalizeIncompleteToolCalls(): void {
218-
for (const [, toolCall] of this.incompleteToolCalls.entries()) {
230+
this.incompleteToolCalls.forEach((toolCall) => {
219231
if (!toolCall.id || !toolCall.function.name) {
220232
globalThis.console.warn('Incomplete tool call detected:', toolCall);
221-
continue;
233+
return;
222234
}
223235

224236
const completedToolCall = {
@@ -237,15 +249,26 @@ class StreamProcessor {
237249
}
238250
this.callbacks.onMCPTool?.(completedToolCall);
239251
} catch (argError) {
240-
globalThis.console.warn(
241-
`Invalid MCP tool arguments for ${toolCall.function.name}:`,
242-
argError
243-
);
252+
const isIncompleteJSON =
253+
toolCall.function.arguments &&
254+
!toolCall.function.arguments.trim().endsWith('}');
255+
256+
if (isIncompleteJSON) {
257+
globalThis.console.warn(
258+
`Incomplete MCP tool arguments for ${toolCall.function.name} (stream was likely interrupted):`,
259+
toolCall.function.arguments
260+
);
261+
} else {
262+
globalThis.console.warn(
263+
`Invalid MCP tool arguments for ${toolCall.function.name}:`,
264+
argError
265+
);
266+
}
244267
}
245268
} else {
246269
this.callbacks.onTool?.(completedToolCall);
247270
}
248-
}
271+
});
249272
this.incompleteToolCalls.clear();
250273
}
251274

@@ -280,7 +303,7 @@ export class InferenceGatewayClient {
280303
this.apiKey = options.apiKey;
281304
this.defaultHeaders = options.defaultHeaders || {};
282305
this.defaultQuery = options.defaultQuery || {};
283-
this.timeout = options.timeout || 30000;
306+
this.timeout = options.timeout || 60000; // Increased default timeout to 60 seconds
284307
this.fetchFn = options.fetch || globalThis.fetch;
285308
}
286309

@@ -404,17 +427,23 @@ export class InferenceGatewayClient {
404427
* @param request - Chat completion request (must include at least model and messages)
405428
* @param callbacks - Callbacks for handling streaming events
406429
* @param provider - Optional provider to use for this request
430+
* @param abortSignal - Optional AbortSignal to cancel the request
407431
*/
408432
async streamChatCompletion(
409433
request: Omit<
410434
SchemaCreateChatCompletionRequest,
411435
'stream' | 'stream_options'
412436
>,
413437
callbacks: ChatCompletionStreamCallbacks,
414-
provider?: Provider
438+
provider?: Provider,
439+
abortSignal?: AbortSignal
415440
): Promise<void> {
416441
try {
417-
const response = await this.initiateStreamingRequest(request, provider);
442+
const response = await this.initiateStreamingRequest(
443+
request,
444+
provider,
445+
abortSignal
446+
);
418447

419448
if (!response.body) {
420449
const error: SchemaError = {
@@ -440,7 +469,7 @@ export class InferenceGatewayClient {
440469
callbacks,
441470
clientProvidedTools
442471
);
443-
await streamProcessor.processStream(response.body);
472+
await streamProcessor.processStream(response.body, abortSignal);
444473
} catch (error) {
445474
const apiError: SchemaError = {
446475
error: (error as Error).message || 'Unknown error occurred',
@@ -458,7 +487,8 @@ export class InferenceGatewayClient {
458487
SchemaCreateChatCompletionRequest,
459488
'stream' | 'stream_options'
460489
>,
461-
provider?: Provider
490+
provider?: Provider,
491+
abortSignal?: AbortSignal
462492
): Promise<Response> {
463493
const query: Record<string, string> = {};
464494
if (provider) {
@@ -485,6 +515,11 @@ export class InferenceGatewayClient {
485515
}
486516

487517
const controller = new AbortController();
518+
519+
const combinedSignal = abortSignal
520+
? AbortSignal.any([abortSignal, controller.signal])
521+
: controller.signal;
522+
488523
const timeoutId = globalThis.setTimeout(
489524
() => controller.abort(),
490525
this.timeout
@@ -501,7 +536,7 @@ export class InferenceGatewayClient {
501536
include_usage: true,
502537
},
503538
}),
504-
signal: controller.signal,
539+
signal: combinedSignal,
505540
});
506541

507542
if (!response.ok) {

0 commit comments

Comments
 (0)