From 460b663567ed1031467a8d69eb13fd3b3da38827 Mon Sep 17 00:00:00 2001 From: Vincent Josse Date: Wed, 27 Nov 2024 11:35:24 +0100 Subject: [PATCH 1/2] docs(assistants): correct on_text_delta example (#1896) --- src/openai/lib/streaming/_assistants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/openai/lib/streaming/_assistants.py b/src/openai/lib/streaming/_assistants.py index 103e4c40aa..6efb3ca3f1 100644 --- a/src/openai/lib/streaming/_assistants.py +++ b/src/openai/lib/streaming/_assistants.py @@ -243,7 +243,7 @@ def on_text_delta(self, delta: TextDelta, snapshot: Text) -> None: on_text_delta(TextDelta(value=" solution"), Text(value="The solution")), on_text_delta(TextDelta(value=" to"), Text(value="The solution to")), on_text_delta(TextDelta(value=" the"), Text(value="The solution to the")), - on_text_delta(TextDelta(value=" equation"), Text(value="The solution to the equivalent")), + on_text_delta(TextDelta(value=" equation"), Text(value="The solution to the equation")), """ def on_text_done(self, text: Text) -> None: From a1e0cd66d129dd2ed4aa8dd87d136e3000680a55 Mon Sep 17 00:00:00 2001 From: Robert Craigie Date: Wed, 27 Nov 2024 11:05:40 +0000 Subject: [PATCH 2/2] feat(client): make ChatCompletionStreamState public --- src/openai/lib/streaming/chat/__init__.py | 1 + src/openai/lib/streaming/chat/_completions.py | 33 ++++++- tests/lib/chat/test_completions_streaming.py | 94 ++++++++++++++++++- 3 files changed, 123 insertions(+), 5 deletions(-) diff --git a/src/openai/lib/streaming/chat/__init__.py b/src/openai/lib/streaming/chat/__init__.py index 5881c39b9a..dfa3f3f2e3 100644 --- a/src/openai/lib/streaming/chat/__init__.py +++ b/src/openai/lib/streaming/chat/__init__.py @@ -21,6 +21,7 @@ from ._completions import ( ChatCompletionStream as ChatCompletionStream, AsyncChatCompletionStream as AsyncChatCompletionStream, + ChatCompletionStreamState as ChatCompletionStreamState, ChatCompletionStreamManager as ChatCompletionStreamManager, AsyncChatCompletionStreamManager as AsyncChatCompletionStreamManager, ) diff --git a/src/openai/lib/streaming/chat/_completions.py b/src/openai/lib/streaming/chat/_completions.py index 8518de967f..2146091354 100644 --- a/src/openai/lib/streaming/chat/_completions.py +++ b/src/openai/lib/streaming/chat/_completions.py @@ -287,11 +287,31 @@ async def __aexit__( class ChatCompletionStreamState(Generic[ResponseFormatT]): + """Helper class for manually accumulating `ChatCompletionChunk`s into a final `ChatCompletion` object. + + This is useful in cases where you can't always use the `.stream()` method, e.g. + + ```py + from openai.lib.streaming.chat import ChatCompletionStreamState + + state = ChatCompletionStreamState() + + stream = client.chat.completions.create(..., stream=True) + for chunk in response: + state.handle_chunk(chunk) + + # can also access the accumulated `ChatCompletion` mid-stream + state.current_completion_snapshot + + print(state.get_final_completion()) + ``` + """ + def __init__( self, *, - input_tools: Iterable[ChatCompletionToolParam] | NotGiven, - response_format: type[ResponseFormatT] | ResponseFormatParam | NotGiven, + input_tools: Iterable[ChatCompletionToolParam] | NotGiven = NOT_GIVEN, + response_format: type[ResponseFormatT] | ResponseFormatParam | NotGiven = NOT_GIVEN, ) -> None: self.__current_completion_snapshot: ParsedChatCompletionSnapshot | None = None self.__choice_event_states: list[ChoiceEventState] = [] @@ -301,6 +321,11 @@ def __init__( self._rich_response_format: type | NotGiven = response_format if inspect.isclass(response_format) else NOT_GIVEN def get_final_completion(self) -> ParsedChatCompletion[ResponseFormatT]: + """Parse the final completion object. + + Note this does not provide any guarantees that the stream has actually finished, you must + only call this method when the stream is finished. + """ return parse_chat_completion( chat_completion=self.current_completion_snapshot, response_format=self._rich_response_format, @@ -312,8 +337,8 @@ def current_completion_snapshot(self) -> ParsedChatCompletionSnapshot: assert self.__current_completion_snapshot is not None return self.__current_completion_snapshot - def handle_chunk(self, chunk: ChatCompletionChunk) -> list[ChatCompletionStreamEvent[ResponseFormatT]]: - """Accumulate a new chunk into the snapshot and returns a list of events to yield.""" + def handle_chunk(self, chunk: ChatCompletionChunk) -> Iterable[ChatCompletionStreamEvent[ResponseFormatT]]: + """Accumulate a new chunk into the snapshot and returns an iterable of events to yield.""" self.__current_completion_snapshot = self._accumulate_chunk(chunk) return self._build_events( diff --git a/tests/lib/chat/test_completions_streaming.py b/tests/lib/chat/test_completions_streaming.py index ab12de44b3..1eed031af7 100644 --- a/tests/lib/chat/test_completions_streaming.py +++ b/tests/lib/chat/test_completions_streaming.py @@ -13,12 +13,14 @@ import openai from openai import OpenAI, AsyncOpenAI -from openai._utils import assert_signatures_in_sync +from openai._utils import consume_sync_iterator, assert_signatures_in_sync from openai._compat import model_copy +from openai.types.chat import ChatCompletionChunk from openai.lib.streaming.chat import ( ContentDoneEvent, ChatCompletionStream, ChatCompletionStreamEvent, + ChatCompletionStreamState, ChatCompletionStreamManager, ParsedChatCompletionSnapshot, ) @@ -997,6 +999,55 @@ def test_allows_non_strict_tools_but_no_parsing( ) +@pytest.mark.respx(base_url=base_url) +def test_chat_completion_state_helper(client: OpenAI, respx_mock: MockRouter, monkeypatch: pytest.MonkeyPatch) -> None: + state = ChatCompletionStreamState() + + def streamer(client: OpenAI) -> Iterator[ChatCompletionChunk]: + stream = client.chat.completions.create( + model="gpt-4o-2024-08-06", + messages=[ + { + "role": "user", + "content": "What's the weather like in SF?", + }, + ], + stream=True, + ) + for chunk in stream: + state.handle_chunk(chunk) + yield chunk + + _make_raw_stream_snapshot_request( + streamer, + content_snapshot=snapshot(external("e2aad469b71d*.bin")), + mock_client=client, + respx_mock=respx_mock, + ) + + assert print_obj(state.get_final_completion().choices, monkeypatch) == snapshot( + """\ +[ + ParsedChoice[NoneType]( + finish_reason='stop', + index=0, + logprobs=None, + message=ParsedChatCompletionMessage[NoneType]( + audio=None, + content="I'm unable to provide real-time weather updates. To get the current weather in San Francisco, I +recommend checking a reliable weather website or a weather app.", + function_call=None, + parsed=None, + refusal=None, + role='assistant', + tool_calls=[] + ) + ) +] +""" + ) + + @pytest.mark.parametrize("sync", [True, False], ids=["sync", "async"]) def test_stream_method_in_sync(sync: bool, client: OpenAI, async_client: AsyncOpenAI) -> None: checking_client: OpenAI | AsyncOpenAI = client if sync else async_client @@ -1075,3 +1126,44 @@ def _on_response(response: httpx.Response) -> None: client.close() return listener + + +def _make_raw_stream_snapshot_request( + func: Callable[[OpenAI], Iterator[ChatCompletionChunk]], + *, + content_snapshot: Any, + respx_mock: MockRouter, + mock_client: OpenAI, +) -> None: + live = os.environ.get("OPENAI_LIVE") == "1" + if live: + + def _on_response(response: httpx.Response) -> None: + # update the content snapshot + assert outsource(response.read()) == content_snapshot + + respx_mock.stop() + + client = OpenAI( + http_client=httpx.Client( + event_hooks={ + "response": [_on_response], + } + ) + ) + else: + respx_mock.post("/chat/completions").mock( + return_value=httpx.Response( + 200, + content=content_snapshot._old_value._load_value(), + headers={"content-type": "text/event-stream"}, + ) + ) + + client = mock_client + + stream = func(client) + consume_sync_iterator(stream) + + if live: + client.close()