LlamaIndex 使用 RouterOutputAgentWorkflow

2024-09-06 12:12

本文主要是介绍LlamaIndex 使用 RouterOutputAgentWorkflow,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

LlamaIndex 中提供了一个 RouterOutputAgentWorkflow 功能,可以集成多个 QueryTool,根据用户的输入判断使用那个 QueryEngine,在做查询的时候,可以从不同的数据源进行查询,例如确定的数据从数据库查询,如果是语义查询可以从向量数据库进行查询。本文将实现两个搜索引擎,根据不同 Query 使用不同 QueryEngine。

安装 MySQL 依赖

pip install mysql-connector-python  

搜索引擎

定义搜索引擎,初始两个数据源

  • 使用 MySQL 作为数据库的数据源
  • 使用 VectorIndex 作为语义搜索数据源
from pathlib import Path
from llama_index.core.tools import QueryEngineTool
from llama_index.core import VectorStoreIndex
import llm
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.query_engine import NLSQLTableQueryEngine
from llama_index.core import Settings
from llama_index.core import SQLDatabasefrom sqlalchemy import create_engine, MetaData, Table, Column, String, Integer, select
Settings.llm = llm.get_ollama("mistral-nemo")
Settings.embed_model = llm.get_ollama_embbeding()engine = create_engine('mysql+mysqlconnector://root:123456@localhost:13306/db_llama', echo=True  
)def init_db():# 初始化数据库metadata_obj = MetaData()table_name = "city_stats"city_stats_table = Table(table_name,metadata_obj,Column("city_name", String(16), primary_key=True),Column("population", Integer, ),Column("state", String(16), nullable=False),)metadata_obj.create_all(engine)sql_database = SQLDatabase(engine, include_tables=["city_stats"])from sqlalchemy import insertrows = [{"city_name": "New York City", "population": 8336000, "state": "New York"},{"city_name": "Los Angeles", "population": 3822000, "state": "California"},{"city_name": "Chicago", "population": 2665000, "state": "Illinois"},{"city_name": "Houston", "population": 2303000, "state": "Texas"},{"city_name": "Miami", "population": 449514, "state": "Florida"},{"city_name": "Seattle", "population": 749256, "state": "Washington"},]for row in rows:stmt = insert(city_stats_table).values(**row)with engine.begin() as connection:cursor = connection.execute(stmt)from llama_index.core.query_engine import NLSQLTableQueryEnginesql_database = SQLDatabase(engine, include_tables=["city_stats"])
sql_query_engine = NLSQLTableQueryEngine(sql_database=sql_database,tables=["city_stats"]
)def get_doc_index()-> VectorStoreIndex:'''解析 words'''# 创建 OllamaEmbedding 实例,用于指定嵌入模型和服务的基本 URLollama_embedding = llm.get_ollama_embbeding()# 读取 "./data" 目录中的数据并加载为文档对象documents = SimpleDirectoryReader(input_files=[Path(__file__).parent / "data" / "LA.pdf"]).load_data()# 从文档中创建 VectorStoreIndex,并使用 OllamaEmbedding 作为嵌入模型vector_index = VectorStoreIndex.from_documents(documents, embed_model=ollama_embedding, transformations=[SentenceSplitter(chunk_size=1000, chunk_overlap=20)],)vector_index.set_index_id("vector_index")  # 设置索引 IDvector_index.storage_context.persist("./storage")  # 将索引持久化到 "./storage"return vector_indexllama_index_query_engine = get_doc_index().as_query_engine()sql_tool = QueryEngineTool.from_defaults(query_engine=sql_query_engine,description=("Useful for translating a natural language query into a SQL query over"" a table containing: city_stats, containing the population/state of"" each city located in the USA."),name="sql_tool"
)llama_cloud_tool = QueryEngineTool.from_defaults(query_engine=llama_index_query_engine,description=(f"Useful for answering semantic questions about certain cities in the US."),name="llama_cloud_tool"
)

创建工作流

下图中显示了工作流的节点,绿色背景节点是工作流的动作,例如大模型返回 ToolEvent,ToolEvent 节点执行并返回结果。
在这里插入图片描述
工作流定义代码:

from typing import Dict, List, Any, Optionalfrom llama_index.core.tools import BaseTool
from llama_index.core.llms import ChatMessage
from llama_index.core.llms.llm import ToolSelection, LLM
from llama_index.core.workflow import (Workflow,Event,StartEvent,StopEvent,step,Context
)
from llama_index.core.base.response.schema import Response
from llama_index.core.tools import FunctionTool
from llama_index.utils.workflow import draw_all_possible_flows
from llm import get_ollamafrom docs import enable_traceenable_trace()class InputEvent(Event):"""Input event."""class GatherToolsEvent(Event):"""Gather Tools Event"""tool_calls: Anyclass ToolCallEvent(Event):"""Tool Call event"""tool_call: ToolSelectionclass ToolCallEventResult(Event):"""Tool call event result."""msg: ChatMessageclass RouterOutputAgentWorkflow(Workflow):"""Custom router output agent workflow."""def __init__(self,tools: List[BaseTool],timeout: Optional[float] = 10.0,disable_validation: bool = False,verbose: bool = False,llm: Optional[LLM] = None,chat_history: Optional[List[ChatMessage]] = None,):"""Constructor."""super().__init__(timeout=timeout, disable_validation=disable_validation, verbose=verbose)self.tools: List[BaseTool] = toolsself.tools_dict: Optional[Dict[str, BaseTool]] = {tool.metadata.name: tool for tool in self.tools}self.llm: LLM = llmself.chat_history: List[ChatMessage] = chat_history or []def reset(self) -> None:"""Resets Chat History"""self.chat_history = []@step()async def prepare_chat(self, ev: StartEvent) -> InputEvent:message = ev.get("message")if message is None:raise ValueError("'message' field is required.")# add msg to chat historychat_history = self.chat_historychat_history.append(ChatMessage(role="user", content=message))return InputEvent()@step()async def chat(self, ev: InputEvent) -> GatherToolsEvent | StopEvent:"""Appends msg to chat history, then gets tool calls."""# Put msg into LLM with tools includedchat_res = await self.llm.achat_with_tools(self.tools,chat_history=self.chat_history,verbose=self._verbose,allow_parallel_tool_calls=True)tool_calls = self.llm.get_tool_calls_from_response(chat_res, error_on_no_tool_call=False)ai_message = chat_res.messageself.chat_history.append(ai_message)if self._verbose:print(f"Chat message: {ai_message.content}")# no tool calls, return chat message.if not tool_calls:return StopEvent(result=ai_message.content)return GatherToolsEvent(tool_calls=tool_calls)@step(pass_context=True)async def dispatch_calls(self, ctx: Context, ev: GatherToolsEvent) -> ToolCallEvent:"""Dispatches calls."""tool_calls = ev.tool_callsawait ctx.set("num_tool_calls", len(tool_calls))# trigger tool call eventsfor tool_call in tool_calls:ctx.send_event(ToolCallEvent(tool_call=tool_call))return None@step()async def call_tool(self, ev: ToolCallEvent) -> ToolCallEventResult:"""Calls tool."""tool_call = ev.tool_call# get tool ID and function callid_ = tool_call.tool_idif self._verbose:print(f"Calling function {tool_call.tool_name} with msg {tool_call.tool_kwargs}")# call function and put result into a chat messagetool = self.tools_dict[tool_call.tool_name]output = await tool.acall(**tool_call.tool_kwargs)msg = ChatMessage(name=tool_call.tool_name,content=str(output),role="tool",additional_kwargs={"tool_call_id": id_,"name": tool_call.tool_name})return ToolCallEventResult(msg=msg)@step(pass_context=True)async def gather(self, ctx: Context, ev: ToolCallEventResult) -> StopEvent | None:"""Gathers tool calls."""# wait for all tool call events to finish.tool_events = ctx.collect_events(ev, [ToolCallEventResult] * await ctx.get("num_tool_calls"))if not tool_events:return Nonefor tool_event in tool_events:# append tool call chat messages to historyself.chat_history.append(tool_event.msg)# # after all tool calls finish, pass input event back, restart agent loopreturn InputEvent()from muti_agent import sql_tool, llama_cloud_tool
wf = RouterOutputAgentWorkflow(tools=[sql_tool, llama_cloud_tool], verbose=True, timeout=120, llm=get_ollama("mistral-nemo"))async def main():result = await wf.run(message="Which city has the highest population?")print("RSULT ===============", result)# if __name__ == "__main__":
#     import asyncio#     asyncio.run(main())import gradio as grasync def random_response(message, history):wf.reset()result = await wf.run(message=message)print("RSULT ===============", result)return resultdemo = gr.ChatInterface(random_response, clear_btn=None, title="Qwen2")demo.launch()

输入问题是 “What are five popular travel spots in Los Angeles?”,自动路由到 VectorIndex 进行查询。
在这里插入图片描述
输入问题为 “which city has the most population” 时,调用数据库进行搜索。
在这里插入图片描述

总结

LlamaIndex 中搜索引擎自动路由,根据用户的输入型自动选择所需的搜索引擎,这里有一个需要注意的点,模型需要支持 Function Call。如果 Ollama 本地模型进行推理,不是所有的本地模型都支持Function Call,Llama3.1 和 mistral-nemo 是支持 Function Call 的,可以使用。

这篇关于LlamaIndex 使用 RouterOutputAgentWorkflow的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1141971

相关文章

C++中assign函数的使用

《C++中assign函数的使用》在C++标准模板库中,std::list等容器都提供了assign成员函数,它比操作符更灵活,支持多种初始化方式,下面就来介绍一下assign的用法,具有一定的参考价... 目录​1.assign的基本功能​​语法​2. 具体用法示例​​​(1) 填充n个相同值​​(2)

Spring StateMachine实现状态机使用示例详解

《SpringStateMachine实现状态机使用示例详解》本文介绍SpringStateMachine实现状态机的步骤,包括依赖导入、枚举定义、状态转移规则配置、上下文管理及服务调用示例,重点解... 目录什么是状态机使用示例什么是状态机状态机是计算机科学中的​​核心建模工具​​,用于描述对象在其生命

使用Python删除Excel中的行列和单元格示例详解

《使用Python删除Excel中的行列和单元格示例详解》在处理Excel数据时,删除不需要的行、列或单元格是一项常见且必要的操作,本文将使用Python脚本实现对Excel表格的高效自动化处理,感兴... 目录开发环境准备使用 python 删除 Excphpel 表格中的行删除特定行删除空白行删除含指定

深入理解Go语言中二维切片的使用

《深入理解Go语言中二维切片的使用》本文深入讲解了Go语言中二维切片的概念与应用,用于表示矩阵、表格等二维数据结构,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来一起学习学习吧... 目录引言二维切片的基本概念定义创建二维切片二维切片的操作访问元素修改元素遍历二维切片二维切片的动态调整追加行动态

prometheus如何使用pushgateway监控网路丢包

《prometheus如何使用pushgateway监控网路丢包》:本文主要介绍prometheus如何使用pushgateway监控网路丢包问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录监控网路丢包脚本数据图表总结监控网路丢包脚本[root@gtcq-gt-monitor-prome

Python通用唯一标识符模块uuid使用案例详解

《Python通用唯一标识符模块uuid使用案例详解》Pythonuuid模块用于生成128位全局唯一标识符,支持UUID1-5版本,适用于分布式系统、数据库主键等场景,需注意隐私、碰撞概率及存储优... 目录简介核心功能1. UUID版本2. UUID属性3. 命名空间使用场景1. 生成唯一标识符2. 数

SpringBoot中如何使用Assert进行断言校验

《SpringBoot中如何使用Assert进行断言校验》Java提供了内置的assert机制,而Spring框架也提供了更强大的Assert工具类来帮助开发者进行参数校验和状态检查,下... 目录前言一、Java 原生assert简介1.1 使用方式1.2 示例代码1.3 优缺点分析二、Spring Fr

Android kotlin中 Channel 和 Flow 的区别和选择使用场景分析

《Androidkotlin中Channel和Flow的区别和选择使用场景分析》Kotlin协程中,Flow是冷数据流,按需触发,适合响应式数据处理;Channel是热数据流,持续发送,支持... 目录一、基本概念界定FlowChannel二、核心特性对比数据生产触发条件生产与消费的关系背压处理机制生命周期

java使用protobuf-maven-plugin的插件编译proto文件详解

《java使用protobuf-maven-plugin的插件编译proto文件详解》:本文主要介绍java使用protobuf-maven-plugin的插件编译proto文件,具有很好的参考价... 目录protobuf文件作为数据传输和存储的协议主要介绍在Java使用maven编译proto文件的插件

SpringBoot线程池配置使用示例详解

《SpringBoot线程池配置使用示例详解》SpringBoot集成@Async注解,支持线程池参数配置(核心数、队列容量、拒绝策略等)及生命周期管理,结合监控与任务装饰器,提升异步处理效率与系统... 目录一、核心特性二、添加依赖三、参数详解四、配置线程池五、应用实践代码说明拒绝策略(Rejected