Skip to content

feat: separate stream items for tool_call_item and tool_call_output_item #833

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/agents/_run_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -877,12 +877,12 @@ async def run_single_output_guardrail(
return result

@classmethod
def stream_step_result_to_queue(
def stream_step_items_to_queue(
cls,
step_result: SingleStepResult,
new_step_items: list[RunItem],
queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel],
):
for item in step_result.new_step_items:
for item in new_step_items:
if isinstance(item, MessageOutputItem):
event = RunItemStreamEvent(item=item, name="message_output_created")
elif isinstance(item, HandoffCallItem):
Expand All @@ -907,6 +907,14 @@ def stream_step_result_to_queue(
if event:
queue.put_nowait(event)

@classmethod
def stream_step_result_to_queue(
cls,
step_result: SingleStepResult,
queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel],
):
cls.stream_step_items_to_queue(step_result.new_step_items, queue)

@classmethod
async def _check_for_final_output_from_tools(
cls,
Expand Down
57 changes: 50 additions & 7 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,10 +734,9 @@ async def _run_single_turn_streamed(
raise ModelBehaviorError("Model did not produce a final response!")

# 3. Now, we can process the turn as we do in the non-streaming case
single_step_result = await cls._get_single_step_result_from_response(
return await cls._get_single_step_result_from_streamed_response(
agent=agent,
original_input=streamed_result.input,
pre_step_items=streamed_result.new_items,
streamed_result=streamed_result,
new_response=final_response,
output_schema=output_schema,
all_tools=all_tools,
Expand All @@ -747,10 +746,7 @@ async def _run_single_turn_streamed(
run_config=run_config,
tool_use_tracker=tool_use_tracker,
)

RunImpl.stream_step_result_to_queue(single_step_result, streamed_result._event_queue)
return single_step_result


@classmethod
async def _run_single_turn(
cls,
Expand Down Expand Up @@ -848,6 +844,53 @@ async def _get_single_step_result_from_response(
context_wrapper=context_wrapper,
run_config=run_config,
)

@classmethod
async def _get_single_step_result_from_streamed_response(
cls,
*,
agent: Agent[TContext],
all_tools: list[Tool],
streamed_result: RunResultStreaming,
new_response: ModelResponse,
output_schema: AgentOutputSchemaBase | None,
handoffs: list[Handoff],
hooks: RunHooks[TContext],
context_wrapper: RunContextWrapper[TContext],
run_config: RunConfig,
tool_use_tracker: AgentToolUseTracker,
) -> SingleStepResult:

original_input = streamed_result.input
pre_step_items = streamed_result.new_items
event_queue = streamed_result._event_queue

processed_response = RunImpl.process_model_response(
agent=agent,
all_tools=all_tools,
response=new_response,
output_schema=output_schema,
handoffs=handoffs,
)
new_items_processed_response = processed_response.new_items
tool_use_tracker.add_tool_use(agent, processed_response.tools_used)
RunImpl.stream_step_items_to_queue(new_items_processed_response, event_queue)

single_step_result = await RunImpl.execute_tools_and_side_effects(
agent=agent,
original_input=original_input,
pre_step_items=pre_step_items,
new_response=new_response,
processed_response=processed_response,
output_schema=output_schema,
hooks=hooks,
context_wrapper=context_wrapper,
run_config=run_config,
)
new_step_items = [item for item in single_step_result.new_step_items if item not in new_items_processed_response]
RunImpl.stream_step_items_to_queue(new_step_items, event_queue)

return single_step_result

@classmethod
async def _run_input_guardrails(
Expand Down