-
Notifications
You must be signed in to change notification settings - Fork 112
Closed
Labels
Description
Doing further work on #1610, I've noticed a completely different behavior between v1.0.0 and 1.1.0, when working with LangGraph streams. The stream consists of chunks of type dict
that a LangGraph agent produces. In version 1.0.0, the stream appeared in the chat GUI as expected, appearing message by message. In version 1.1.0, the stream only appeared after full completion, all messages rendered in the GUI simultaneously. Here is a (hopefully) reproducible example:
# requirements.txt
python==3.12
shiny==1.1.0
langgraph==0.2.35
# my_agent.py
### DEFINE THE GRAPH ###
import operator
from typing import Annotated, Any
from typing_extensions import TypedDict
import time
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
def node_a(state):
time.sleep(3)
return {"aggregate": ["I'm A"]}
def node_b(state):
time.sleep(5)
return {"aggregate": ["I'm B"]}
def node_c(state):
time.sleep(5)
return {"aggregate": ["I'm C"]}
def node_d(state):
time.sleep(5)
return {"aggregate": ["I'm D"]}
builder = StateGraph(State)
builder.add_node("a", node_a)
builder.add_edge(START, "a")
builder.add_node("b", node_b)
builder.add_node("c", node_c)
builder.add_node("d", node_d)
builder.add_edge("a", "b")
builder.add_edge("a", "c") # Nodes B and C should appear together by design
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
# app.py
from shiny.express import ui
import my_agent
# Set some Shiny page options
ui.page_opts(
title="Hello LangChain Chat Models",
fillable=True,
fillable_mobile=True,
)
# Create and display an empty chat UI
chat = ui.Chat(id="chat")
chat.ui()
# Define a callback to run when the user submits a message
@chat.on_user_submit
async def _():
# Get messages currently in the chat
messages = chat.messages(format="langchain")
# Create a response message stream
async for chunk in my_agent.graph.astream({"aggregate": []}, stream_mode="updates"):
assert len(chunk.keys()) == 1, 'Got multiple keys from the stream :/'
for key, value in chunk.items():
print(chunk)
output = chunk.get(key).get('aggregate')[0]
await chat.append_message_stream(output)
In practice, this renders working with LangGraph agents unfeasible, requiring to downgrade.