LangGraph核心概念

2024-06-15 00:28
文章标签 概念 核心 langgraph

本文主要是介绍LangGraph核心概念,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

引言

LangGraph是一个基于图状态机构建复杂、稳定的AI agent的库。本文介绍LangGraph的核心概念。

基于LangGraph官方文档。

背景

尽管可能每个人对AI Agent由什么构成的定义不同,这里将agent看成是利用语言模型来控制工作流(workflow)循环和采取行动的系统。原型的LLM agent使用ReAct式设计,将LLM应用于驱动一个基本循环,具体步骤如下:

  • 推理并规划要采取的动作
  • 调用工具(普通的函数)来采取动作
  • 观察工具的影响(结果)然后重新规划乎或行动

虽然LLM agent在这方面效果出人意料,但单纯的agent循环在大规模情况下并不能提供用户期待的可靠性。它们具有美丽的随机性。设计精良的系统充分利用这种随机性,使得该系统能容忍LLM输出中的错误。

AI设计模式应该应用良好工程实践,包括:

  • AI应用必须在自主运行和用户控制之间取得平衡;
  • Agent应用类似分布式系统,都需要具备容错和纠正能力;
  • Multi-agent(多智能体)系统在并行时需要解决多人在线类似的冲突;
  • 需要有撤回和版本控制;

LangGraph的StateGraph抽象支持这些需求,提供比AgentExecutor框架更低级别的API,可以完全控制何时何地以及如何使用AI。

核心设计

LangGraph将Agent的工作流建模为状态机,可以使用三个关键组件定义Agent的行为:

  1. State: 表示应用当前快照的共享数据结构,可以是任何Python类型,通常是TypedDictPydanticBaseModel
  2. Node: 编码Agent逻辑的Python函数,接收当前状态,执行一些计算或副作用(side-effect),返回更新后的状态;
  3. Edge: 根据当前状态确定下一个要执行的节点的控制流规则,可以是条件分支或固定转换;

通过组合节点和边,可以创建随时间演变状态的复杂循环工作流,LangGraph的强大在于它如何管理这些状态。

LangGraph的底层图算法使用消息传递来定义通信程序,当一个节点完成时,它沿着一个或多个边发送消息给其他节点。这些节点运行它们的函数,将结果消息传递给下一组节点,以此类推。受Pregel启发,程序按离散的超步(super-step)进行,这些超步在概念上都是并行执行。

当运行图时,所有的节点都处于非活动(inactive)状态,每当入边(incoming edge/channel)接收到新消息(状态)时,节点变为活动(active)状态。运行函数,并响应更新。在每个超步结束时,每个节点都会通过将自身标记为非活动状态来投票停止(vote to halt)。如果没有更多的消息传入,当所有节点都处于非活动状态且无消息在传输时图终止。

Node

节点通常是Python函数,第一个位置参数都是状态,第二个可选的位置参数是config,包含配置参数(例如thread_id)。使用add_node方法将这些节点添加到图中。

from langchain_core.runnables import RunnableConfig
from langgraph.graph import END, START, StateGraphbuilder = StateGraph(dict)def my_node(state: dict, config: RunnableConfig):print("In node: ", config["configurable"]["user_id"])return {"results": f"Hello, {state['input']}!"}# The second argument is optional
def my_other_node(state: dict):return statebuilder.add_node("my_node", my_node)
builder.add_node("other_node", my_other_node)
builder.add_edge(START, "my_node")
builder.add_edge("my_node", "other_node")
builder.add_edge("other_node", END)
graph = builder.compile()
graph.invoke({"input": "Will"}, {"configurable": {"user_id": "abcd-123"}})
# In node:  abcd-123
# {'results': 'Hello, Will!'}

在底层,函数会被转换为RunnableLambda,它添加了批处理和异步支持,以及跟踪和调试功能。

Edge

边定义了逻辑的路由方式和图如何决定停止。类似节点,它们接收图的当前状态并返回一个值。默认该值是要将状态发送到下一个节点或节点的名称。所有的这些节点将作为下一个超步的一部分并行运行。

如果想重用边,可以选择提供一个字典,将边的输出映射到下一个节点的名称。

如果希望始终从节点A到节点B,可以直接使用add_edge方法。

如果要选择性地路由到一个或多个边(或选择性地终止),可以使用add_conditional_edges方法。

如果一个节点有多个出边,所有这些目标节点将作为下一个超步的一部分并行执行

状态管理

LangGraph引入了状态管理的两个关键概念:状态模式(state schema)和reducer。

状态模式定义了提供给图中每个节点的对象的类型。

Reducer定义了如何将节点输出应用于当前状态。例如,可以使用reducer将心的对话响应合并到对话历史记录中,或将多个Agent节点的输出平均聚合在一起(average together)。

下面通过一个示例来看看reducer的工作原理,比较下面两个状态。

from typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import END, START, StateGraphclass StateA(TypedDict):value: intbuilder = StateGraph(StateA)
builder.add_node("my_node", lambda state: {"value": 1}) # 更新value为1
builder.add_edge(START, "my_node")
builder.add_edge("my_node", END)
graph = builder.compile()
graph.invoke({"value": 5})

和StateB:

from typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.graph import END, START, StateGraphdef add(existing: int, new: int):return existing + newclass StateB(TypedDict):# 高亮的新行value: Annotated[int, add]builder = StateGraph(StateB)
builder.add_node("my_node", lambda state: {"value": 1}) # 更新为 5 + 1
builder.add_edge(START, "my_node")
builder.add_edge("my_node", END)
graph = builder.compile()
graph.invoke({"value": 5})

在StateA中,结果是1。因为状态的默认reducer是直接覆盖。在StateB中,结果是6,因为我们将add函数创建为reducer,它接收现有状态和状态更新,并返回更新后的值。

虽然我们通常使用TypedDict作为State的state schema,实际上可以是几乎任何类型,下面的代码也是有效的:

# Analogous to StateA above
builder = StateGraph(int)
builder.add_node("my_node", lambda state: 1)
builder.add_edge(START, "my_node")
builder.add_edge("my_node", END)
builder.compile().invoke(5)# Analogous to StateB
def add(left, right):return left + rightbuilder = StateGraph(Annotated[int, add])
builder.add_node("my_node", lambda state: 1)
builder.add_edge(START, "my_node")
builder.add_edge("my_node", END)
graph = builder.compile()
graph.invoke(5)

这意味着可以使用Pydantic BaseModel作为图的状态,可以添加默认值和额外的数据验证

当构建像ChatGPT这样的聊天机器人时,状态可能仅仅是一个聊天消息列表。这是MessageGraph(StateGrpah的轻量包装器)使用的状态,仅比下面的稍微复杂一点:

builder = StateGraph(Annotated[list, add])

在图中使用共享状态涉及一些设计的权衡。共享一个类型化状态提供了很多与构建AI工作流相关的优势,包括:

  1. 在每个超步之和和之后完全检验数据流;
  2. 状态是可变的,可以让用户(或其他客户端)在超步之间写入相同的状态来控制Agent的方向(通过update_state)变得容易;
  3. 明确定义了检查点,可以很容易保存和恢复。

持久化

任何智能系统都需要记忆才能运作。AI智能体也是一样,需要跨一个或多个时间范围(timeframe)的记忆:

  • 它们需要记住已经完成的步骤(以避免回答特定问题时重复自己);
  • 需要记住与用户进行的多轮对话中的先前轮次(用于指代消歧或提供额外的上下文);
  • 理想情况下需要从与用户之前交互中记住上下文,以便在行为上更加个性化和高效;

最后一种记忆形式涵盖了很多内容(个性化、优化、持续学习等),超出了本次的内容。

前两种记忆形式通过基于检查点的StateGraph的API来支持。

检查点

检查点(checkpoint)代表应用程序和用户之间进行的多轮互动中线程的状态。在单次运行中创建的检查点将具有一组在从该状态开始时执行的下一个节点。在给定运行结束时创建的检查点是相同的,只是没有下一个节点可以转换(正在等待用户输入)。

检查点支持聊天记忆等功能,可以tag并持久化系统中已经采取的每个状态。

单轮记忆

Agent的每一步都被设为检查点,在代理未能实现你的目标而遇到错误的情况下,可以随时从其中一个保存的检查点恢复它的任务。

这可以支持human-in-the-loop工作流,在执行给定节点之前或之后,可以中断图的执行将控制权交给用户,这个用户可以立即回复,也可以之后回复。你的工作流都可以随时恢复。

多轮记忆

检查点保存在一个thread_id下,来支持用户和系统之间的多轮交互。在如何配置图以添加多轮记忆支持方面没有任何区别,因为检查点工作在整个过程中都是相同的。

如果要在多轮对话中保留一部分状态并将一些状态视为"短暂的",你可以在图的最终节点中清除相关状态。

使用检查点就像调用compile(checkpointer=my_checkpointer)一样简单,然后在其可配置参数中使用一个thread_id来调用它。

Thread

线程表示图的不同会话,它们将状态检查点组织在离散会话中,以便在应用中支持多用户对话。

一个典型的聊天机器人应用为每个用户创建了多个线程,每个线程代表一次对话,都具有自己的持久化的聊天记录和其他状态。线程内的检查点可以根据需要进行回放和分支。

当一个StateGraph基于checkpointer编译,每次调用图时都需要通过配置(configuration)提供一个thread_id

Configuration

对于任何给定的图部署,你可能希望有一些可在运行时控制的可配置值。这些与图输入不同,因为它们不是要视为状态变量。

一个常见的例子是对话线程thread_id、用户user_id、选择使用哪个LLM、在检索器中返回多少个文档等。虽然你可以将这些值传递到状态中,但最好将其与常规数据流分开。

我们来看一个例子,看多轮记忆是如何工作的。

from typing import Annotatedfrom typing_extensions import TypedDictfrom langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraphdef add(left, right):return left + rightclass State(TypedDict):total: Annotated[int, add]turn: strbuilder = StateGraph(State) # 不存在检查点
builder.add_node("add_one", lambda x: {"total": 1}) # 默认初始或增加1
builder.add_edge(START, "add_one") # 进入add_one
builder.add_edge("add_one", END) # 结束memory = MemorySaver()
graph = builder.compile(checkpointer=memory) # 基于checkpointer编译
thread_id = "some-thread"
config = {"configurable": {"thread_id": thread_id}} # 配置thread_id
result = graph.invoke({"total": 1, "turn": "First Turn"}, config) # 第一次运行,累加到2
result2 = graph.invoke({"turn": "Next Turn"}, config) # 累加到3,默认传入total=1
result3 = graph.invoke({"total": 5}, config) # 累加 5+1,变成9
result4 = graph.invoke({"total": 5}, {"configurable": {"thread_id": "new-thread-id"}}) # 累加到6,因为是新的对话,从1开始累加

对于第一次运行,不存在检查点,因此图是在原始输入上运行的。total值从1增加到2,turn设置为First Turn

对于第二次运行,用户更新了turn,但没有更新total!由于我们是从状态中加载的,先前的结果增加了1(在add_one节点中),然后turn被用户覆盖。

对于第三次运行,turn保持不变,因为它是从检查点加载的,而没有被用户覆盖。total增加了用户提供的值,因为这个值是通过add函数reduce(更新)的。

对于第四次运行,使用了一个新的线程id,但没有找到检查点,所以结果仅仅是默认的total增加1

这种面向用户的行为等同于没有检查点情况下运行以下内容:

graph = builder.compile()
result = graph.invoke({"total": 1, "turn": "First Turn"})
result2 = graph.invoke({**result, "turn": "Next Turn"})
result3 = graph.invoke({**result2, "total": result2["total"] + 5})
result4 = graph.invoke({"total": 5})

对于StateGraph的单轮执行数据流

下面我们看复杂一点的例子,通过在上面的玩具示例中添加一个条件边。

from typing import Annotated, Literalfrom typing_extensions import TypedDictfrom langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, START, StateGraphdef add(left, right):return left + rightclass State(TypedDict):total: Annotated[int, add]builder = StateGraph(State)
builder.add_node("add_one", lambda x: {"total": 1}) # 新增1 
builder.add_node("double", lambda x: {"total": x["total"]}) # 新增现在的x值,即翻倍
builder.add_edge(START, "add_one")# 定义一个路由 route -> double 也可能 route -> end
def route(state: State) -> Literal["double", "__end__"]:if state["total"] < 6: return "double" # 路由到doublereturn "__end__" # 结束builder.add_conditional_edges("add_one", route) # add_one -> route
builder.add_edge("double", "add_one") # double -> add_onememory = MemorySaver()
graph = builder.compile(checkpointer=memory)

然后第一次调用:

thread_id = "some-thread"
config = {"configurable": {"thread_id": thread_id}}
for step in graph.stream({"total": 1}, config, stream_mode="debug"):print(step["step"], step["type"], step["payload"].get("values"))
# 0 checkpoint {'total': 1} 将输入的1增加到初始值0中,得到1
# 1 checkpoint {'total': 2}  进入 add_one ,新增了1
# 2 checkpoint {'total': 4} 进入route,到double,翻倍
# 3 checkpoint {'total': 5}  double返回到add_one,新增了1
# 4 checkpoint {'total': 10} 进入route,到double,翻倍
# 5 checkpoint {'total': 11} double返回到add_one,新增了1,然后进入route到end

下面详细介绍执行过程:

  1. 图查找检查点。 没有找到检查点,因此状态被初始化为0。
  2. 接下来,图将用户的输入作为状态更新应用,将输入1加到现有值0中。 在这一超步结束时,总量(total)为1
  3. 调用add_one节点,返回1
  4. 将此更新加到现有总量(1)中。 状态现在是2
  5. 调用条件边route,由于值小于6,继续到double节点。
  6. doube输出现有的总量2并返回。 然后将其加到现有状态中。 状态现在是4
  7. 图通过add_one返回(5),检查条件边并继续进行,因为它小于6
  8. 经过double后,总量为10
  9. 固定边回到add_one,总量为11,检查条件边,由于大于6,程序终止。

第二轮调用,我们使用同样的配置:

for step in graph.stream({"total": -2, "turn": "First Turn"}, config, stream_mode="debug"
):print(step["step"], step["type"], step["payload"].get("values"))
# 7 checkpoint {'total': 9} 输入为-2,11-2=9
# 8 checkpoint {'total': 10} 进入add_one,增加了1,变成10
  1. 它应用了来自用户输入的更新。 add reducer 将总量从0更改为-2
  2. 图寻找检查点。 将其加载到内存中作为初始状态。 总数现在为9=-2+11
  3. add_one节点以此状态被调用。 它返回10
  4. 使用reducer应用该更新,将值更新为10
  5. 进入route,由于值大于6,终止程序。

这篇关于LangGraph核心概念的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1061908

相关文章

Andrej Karpathy最新采访:认知核心模型10亿参数就够了,AI会打破教育不公的僵局

夕小瑶科技说 原创  作者 | 海野 AI圈子的红人,AI大神Andrej Karpathy,曾是OpenAI联合创始人之一,特斯拉AI总监。上一次的动态是官宣创办一家名为 Eureka Labs 的人工智能+教育公司 ,宣布将长期致力于AI原生教育。 近日,Andrej Karpathy接受了No Priors(投资博客)的采访,与硅谷知名投资人 Sara Guo 和 Elad G

【VUE】跨域问题的概念,以及解决方法。

目录 1.跨域概念 2.解决方法 2.1 配置网络请求代理 2.2 使用@CrossOrigin 注解 2.3 通过配置文件实现跨域 2.4 添加 CorsWebFilter 来解决跨域问题 1.跨域概念 跨域问题是由于浏览器实施了同源策略,该策略要求请求的域名、协议和端口必须与提供资源的服务相同。如果不相同,则需要服务器显式地允许这种跨域请求。一般在springbo

PostgreSQL核心功能特性与使用领域及场景分析

PostgreSQL有什么优点? 开源和免费 PostgreSQL是一个开源的数据库管理系统,可以免费使用和修改。这降低了企业的成本,并为开发者提供了一个活跃的社区和丰富的资源。 高度兼容 PostgreSQL支持多种操作系统(如Linux、Windows、macOS等)和编程语言(如C、C++、Java、Python、Ruby等),并提供了多种接口(如JDBC、ODBC、ADO.NET等

【MRI基础】TR 和 TE 时间概念

重复时间 (TR) 磁共振成像 (MRI) 中的 TR(重复时间,repetition time)是施加于同一切片的连续脉冲序列之间的时间间隔。具体而言,TR 是施加一个 RF(射频)脉冲与施加下一个 RF 脉冲之间的持续时间。TR 以毫秒 (ms) 为单位,主要控制后续脉冲之前的纵向弛豫程度(T1 弛豫),使其成为显著影响 MRI 中的图像对比度和信号特性的重要参数。 回声时间 (TE)

深入解析秒杀业务中的核心问题 —— 从并发控制到事务管理

深入解析秒杀业务中的核心问题 —— 从并发控制到事务管理 秒杀系统是应对高并发、高压力下的典型业务场景,涉及到并发控制、库存管理、事务管理等多个关键技术点。本文将深入剖析秒杀商品业务中常见的几个核心问题,包括 AOP 事务管理、同步锁机制、乐观锁、CAS 操作,以及用户限购策略。通过这些技术的结合,确保秒杀系统在高并发场景下的稳定性和一致性。 1. AOP 代理对象与事务管理 在秒杀商品

计算机网络基础概念 交换机、路由器、网关、TBOX

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、VLAN是什么?二 、交换机三、路由器四、网关五、TBOXTelematics BOX,简称车载T-BOX,车联网系统包含四部分,主机、车载T-BOX、手机APP及后台系统。主机主要用于车内的影音娱乐,以及车辆信息显示;车载T-BOX主要用于和后台系统/手机APP通信,实现手机APP的车辆信息显示与控

01 Docker概念和部署

目录 1.1 Docker 概述 1.1.1 Docker 的优势 1.1.2 镜像 1.1.3 容器 1.1.4 仓库 1.2 安装 Docker 1.2.1 配置和安装依赖环境 1.3镜像操作 1.3.1 搜索镜像 1.3.2 获取镜像 1.3.3 查看镜像 1.3.4 给镜像重命名 1.3.5 存储,载入镜像和删除镜像 1.4 Doecker容器操作 1.4

【机器学习-一-基础概念篇】

机器学习 定义分类算法 应用 定义 机器学习最早是被Arthur Samuel 提出的一个概念,指计算机无需明确编程即可学习的研究领域。1950年他发明的跳棋程序,这个人机对弈游戏让他的声名鹊起,机器学习这个概念才进入大众的是视线。 在这个跳棋程序里,他编程了一种算法,这个程序与Arthur下了数万次跳棋,计算机逐渐学会了下在哪里有更大的可能会赢得比赛,哪里会输,通过这种方法,最

【吊打面试官系列-Redis面试题】说说 Redis 哈希槽的概念?

大家好,我是锋哥。今天分享关于 【说说 Redis 哈希槽的概念?】面试题,希望对大家有帮助; 说说 Redis 哈希槽的概念? Redis 集群没有使用一致性 hash,而是引入了哈希槽的概念,Redis 集群有 16384 个哈希槽,每个 key 通过 CRC16 校验后对 16384 取模来决定放置哪个槽, 集群的每个节点负责一部分 hash 槽。

文章解读与仿真程序复现思路——电力自动化设备EI\CSCD\北大核心《考虑燃料电池和电解槽虚拟惯量支撑的电力系统优化调度方法》

本专栏栏目提供文章与程序复现思路,具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源程序擅长文章解读,论文与完整源程序,等方面的知识,电网论文源程序关注python