END 节点用于标识工作流的结束。当流程执行到达这个节点时,整个图(Graph)的运行就会停止,意味着所有预设的流程都已完成。
StateGraph 是一个类,作用是构建和编译图(Graph)的结构。
创建 StateGraph
在官方教程中,创建 StateGraph 之前先要创建状态(State):
1 2 3 4 5 6 7 8
from langgraph.graph.message import add_messages from typing import Annotated
classState(TypedDict): # Messages have the type "list". # The `add_messages` function in the annotation defines how this state key should be updated # (in this case, it appends messages to the list, rather than overwriting them) messages: Annotated[list, add_messages]
状态的变化是由图的节点驱动的。每个节点函数接收当前的完整 State 作为输入,节点函数执行后,并不需要返回一个全新的、完整的 State 对象,而只需要返回一个包含待更新字段的字典(或部分状态对象),这被称为“状态更新”。LangGraph 框架会接收这个“状态更新”,并根据预设的规则将其应用到当前的 State 上,从而生成一个新的 State 快照。
Reducer 函数定义了如何将节点返回的“状态更新”合并到现有的 State 中。State 中的每一个字段都可以有自己独立的 Reducer。如果在定义 State Schema 时没有为某个字段指定 Reducer,那么它会使用默认行为:覆盖(Override)。对于像聊天记录这样的常见场景,LangGraph 提供了预置的 Reducer,上述代码中的 add_messages 是一个很好的例子,另一个例子如下:
1 2 3 4 5 6
from typing import Annotated, TypedDict from operator import add
classState(TypedDict): foo: int bar: Annotated[list[str], add]
foo 字段使用默认行为,将节点返回的更新值覆盖到 foo 字段中;bar 字段标注了 add,这将节点返回的更新值追加到 bar 字段中。
from langchain_core.messages import AIMessage, HumanMessage
defchat(query: str): input_message: list[HumanMessage] = [HumanMessage(content=query)] for event in graph.stream({"messages": input_message}, checkpoint_config): for value in event.values(): ifisinstance(value["messages"][-1], AIMessage): print(value["messages"][-1].content)
whileTrue: user_input = input("User: ") if user_input.lower() in ["quit", "exit", "q"]: break chat(user_input)
现在运行代码即可与模型进行对话。不过你会发现,当前模型无法记住对话内容。
为什么无法记忆?
想让模型记住之前的对话,就需要在调用模型时把过去的历史对话消息都发送给模型。但是当前在每一轮 While 循环下,当顺序流程结束时,保存在 MessagesState 中的所有对话消息都会被丢弃,新一轮循环开始时,模型无法获取之前的对话消息,所以模型无法记住之前的对话。
添加记忆
我们只需要添加和修改少量代码就可以为上述流程添加记忆,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
# 添加 from langgraph.checkpoint.memory import InMemorySaver
defchat(query: str): input_message: list[HumanMessage] = [HumanMessage(content=query)] for event in graph.stream({"messages": input_message}, checkpoint_config): # 修改:加上 checkpoint_config for value in event.values(): ifisinstance(value["messages"][-1], AIMessage): print(value["messages"][-1].content)
defchat(self, query: str): input_message: list[HumanMessage] = [HumanMessage(content=query)] for event inself.graph.stream({"messages": input_message}, self.checkpoint_config): for value in event.values(): ifisinstance(value["messages"][-1], AIMessage): print(value["messages"][-1].content)
if __name__ == "__main__": agent = ChatAgent()
whileTrue: user_input = input("User: ") if user_input.lower() in ["quit", "exit", "q"]: break agent.chat(user_input)