diff --git a/src/agents/_run_impl.py b/src/agents/_run_impl.py index e98010555..e67e6fff2 100644 --- a/src/agents/_run_impl.py +++ b/src/agents/_run_impl.py @@ -877,12 +877,12 @@ async def run_single_output_guardrail( return result @classmethod - def stream_step_result_to_queue( + def stream_step_items_to_queue( cls, - step_result: SingleStepResult, + new_step_items: list[RunItem], queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel], ): - for item in step_result.new_step_items: + for item in new_step_items: if isinstance(item, MessageOutputItem): event = RunItemStreamEvent(item=item, name="message_output_created") elif isinstance(item, HandoffCallItem): @@ -907,6 +907,14 @@ def stream_step_result_to_queue( if event: queue.put_nowait(event) + @classmethod + def stream_step_result_to_queue( + cls, + step_result: SingleStepResult, + queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel], + ): + cls.stream_step_items_to_queue(step_result.new_step_items, queue) + @classmethod async def _check_for_final_output_from_tools( cls, diff --git a/src/agents/run.py b/src/agents/run.py index f375a0b89..e685a666c 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -734,10 +734,9 @@ async def _run_single_turn_streamed( raise ModelBehaviorError("Model did not produce a final response!") # 3. Now, we can process the turn as we do in the non-streaming case - single_step_result = await cls._get_single_step_result_from_response( + return await cls._get_single_step_result_from_streamed_response( agent=agent, - original_input=streamed_result.input, - pre_step_items=streamed_result.new_items, + streamed_result=streamed_result, new_response=final_response, output_schema=output_schema, all_tools=all_tools, @@ -747,10 +746,7 @@ async def _run_single_turn_streamed( run_config=run_config, tool_use_tracker=tool_use_tracker, ) - - RunImpl.stream_step_result_to_queue(single_step_result, streamed_result._event_queue) - return single_step_result - + @classmethod async def _run_single_turn( cls, @@ -848,6 +844,53 @@ async def _get_single_step_result_from_response( context_wrapper=context_wrapper, run_config=run_config, ) + + @classmethod + async def _get_single_step_result_from_streamed_response( + cls, + *, + agent: Agent[TContext], + all_tools: list[Tool], + streamed_result: RunResultStreaming, + new_response: ModelResponse, + output_schema: AgentOutputSchemaBase | None, + handoffs: list[Handoff], + hooks: RunHooks[TContext], + context_wrapper: RunContextWrapper[TContext], + run_config: RunConfig, + tool_use_tracker: AgentToolUseTracker, + ) -> SingleStepResult: + + original_input = streamed_result.input + pre_step_items = streamed_result.new_items + event_queue = streamed_result._event_queue + + processed_response = RunImpl.process_model_response( + agent=agent, + all_tools=all_tools, + response=new_response, + output_schema=output_schema, + handoffs=handoffs, + ) + new_items_processed_response = processed_response.new_items + tool_use_tracker.add_tool_use(agent, processed_response.tools_used) + RunImpl.stream_step_items_to_queue(new_items_processed_response, event_queue) + + single_step_result = await RunImpl.execute_tools_and_side_effects( + agent=agent, + original_input=original_input, + pre_step_items=pre_step_items, + new_response=new_response, + processed_response=processed_response, + output_schema=output_schema, + hooks=hooks, + context_wrapper=context_wrapper, + run_config=run_config, + ) + new_step_items = [item for item in single_step_result.new_step_items if item not in new_items_processed_response] + RunImpl.stream_step_items_to_queue(new_step_items, event_queue) + + return single_step_result @classmethod async def _run_input_guardrails(