LlamaIndex 使用 RouterOutputAgentWorkflow

2024-09-06 12:12

本文主要是介绍LlamaIndex 使用 RouterOutputAgentWorkflow,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

LlamaIndex 中提供了一个 RouterOutputAgentWorkflow 功能,可以集成多个 QueryTool,根据用户的输入判断使用那个 QueryEngine,在做查询的时候,可以从不同的数据源进行查询,例如确定的数据从数据库查询,如果是语义查询可以从向量数据库进行查询。本文将实现两个搜索引擎,根据不同 Query 使用不同 QueryEngine。

安装 MySQL 依赖

pip install mysql-connector-python  

搜索引擎

定义搜索引擎,初始两个数据源

  • 使用 MySQL 作为数据库的数据源
  • 使用 VectorIndex 作为语义搜索数据源
from pathlib import Path
from llama_index.core.tools import QueryEngineTool
from llama_index.core import VectorStoreIndex
import llm
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.query_engine import NLSQLTableQueryEngine
from llama_index.core import Settings
from llama_index.core import SQLDatabasefrom sqlalchemy import create_engine, MetaData, Table, Column, String, Integer, select
Settings.llm = llm.get_ollama("mistral-nemo")
Settings.embed_model = llm.get_ollama_embbeding()engine = create_engine('mysql+mysqlconnector://root:123456@localhost:13306/db_llama', echo=True  
)def init_db():# 初始化数据库metadata_obj = MetaData()table_name = "city_stats"city_stats_table = Table(table_name,metadata_obj,Column("city_name", String(16), primary_key=True),Column("population", Integer, ),Column("state", String(16), nullable=False),)metadata_obj.create_all(engine)sql_database = SQLDatabase(engine, include_tables=["city_stats"])from sqlalchemy import insertrows = [{"city_name": "New York City", "population": 8336000, "state": "New York"},{"city_name": "Los Angeles", "population": 3822000, "state": "California"},{"city_name": "Chicago", "population": 2665000, "state": "Illinois"},{"city_name": "Houston", "population": 2303000, "state": "Texas"},{"city_name": "Miami", "population": 449514, "state": "Florida"},{"city_name": "Seattle", "population": 749256, "state": "Washington"},]for row in rows:stmt = insert(city_stats_table).values(**row)with engine.begin() as connection:cursor = connection.execute(stmt)from llama_index.core.query_engine import NLSQLTableQueryEnginesql_database = SQLDatabase(engine, include_tables=["city_stats"])
sql_query_engine = NLSQLTableQueryEngine(sql_database=sql_database,tables=["city_stats"]
)def get_doc_index()-> VectorStoreIndex:'''解析 words'''# 创建 OllamaEmbedding 实例,用于指定嵌入模型和服务的基本 URLollama_embedding = llm.get_ollama_embbeding()# 读取 "./data" 目录中的数据并加载为文档对象documents = SimpleDirectoryReader(input_files=[Path(__file__).parent / "data" / "LA.pdf"]).load_data()# 从文档中创建 VectorStoreIndex,并使用 OllamaEmbedding 作为嵌入模型vector_index = VectorStoreIndex.from_documents(documents, embed_model=ollama_embedding, transformations=[SentenceSplitter(chunk_size=1000, chunk_overlap=20)],)vector_index.set_index_id("vector_index")  # 设置索引 IDvector_index.storage_context.persist("./storage")  # 将索引持久化到 "./storage"return vector_indexllama_index_query_engine = get_doc_index().as_query_engine()sql_tool = QueryEngineTool.from_defaults(query_engine=sql_query_engine,description=("Useful for translating a natural language query into a SQL query over"" a table containing: city_stats, containing the population/state of"" each city located in the USA."),name="sql_tool"
)llama_cloud_tool = QueryEngineTool.from_defaults(query_engine=llama_index_query_engine,description=(f"Useful for answering semantic questions about certain cities in the US."),name="llama_cloud_tool"
)

创建工作流

下图中显示了工作流的节点,绿色背景节点是工作流的动作,例如大模型返回 ToolEvent,ToolEvent 节点执行并返回结果。
在这里插入图片描述
工作流定义代码:

from typing import Dict, List, Any, Optionalfrom llama_index.core.tools import BaseTool
from llama_index.core.llms import ChatMessage
from llama_index.core.llms.llm import ToolSelection, LLM
from llama_index.core.workflow import (Workflow,Event,StartEvent,StopEvent,step,Context
)
from llama_index.core.base.response.schema import Response
from llama_index.core.tools import FunctionTool
from llama_index.utils.workflow import draw_all_possible_flows
from llm import get_ollamafrom docs import enable_traceenable_trace()class InputEvent(Event):"""Input event."""class GatherToolsEvent(Event):"""Gather Tools Event"""tool_calls: Anyclass ToolCallEvent(Event):"""Tool Call event"""tool_call: ToolSelectionclass ToolCallEventResult(Event):"""Tool call event result."""msg: ChatMessageclass RouterOutputAgentWorkflow(Workflow):"""Custom router output agent workflow."""def __init__(self,tools: List[BaseTool],timeout: Optional[float] = 10.0,disable_validation: bool = False,verbose: bool = False,llm: Optional[LLM] = None,chat_history: Optional[List[ChatMessage]] = None,):"""Constructor."""super().__init__(timeout=timeout, disable_validation=disable_validation, verbose=verbose)self.tools: List[BaseTool] = toolsself.tools_dict: Optional[Dict[str, BaseTool]] = {tool.metadata.name: tool for tool in self.tools}self.llm: LLM = llmself.chat_history: List[ChatMessage] = chat_history or []def reset(self) -> None:"""Resets Chat History"""self.chat_history = []@step()async def prepare_chat(self, ev: StartEvent) -> InputEvent:message = ev.get("message")if message is None:raise ValueError("'message' field is required.")# add msg to chat historychat_history = self.chat_historychat_history.append(ChatMessage(role="user", content=message))return InputEvent()@step()async def chat(self, ev: InputEvent) -> GatherToolsEvent | StopEvent:"""Appends msg to chat history, then gets tool calls."""# Put msg into LLM with tools includedchat_res = await self.llm.achat_with_tools(self.tools,chat_history=self.chat_history,verbose=self._verbose,allow_parallel_tool_calls=True)tool_calls = self.llm.get_tool_calls_from_response(chat_res, error_on_no_tool_call=False)ai_message = chat_res.messageself.chat_history.append(ai_message)if self._verbose:print(f"Chat message: {ai_message.content}")# no tool calls, return chat message.if not tool_calls:return StopEvent(result=ai_message.content)return GatherToolsEvent(tool_calls=tool_calls)@step(pass_context=True)async def dispatch_calls(self, ctx: Context, ev: GatherToolsEvent) -> ToolCallEvent:"""Dispatches calls."""tool_calls = ev.tool_callsawait ctx.set("num_tool_calls", len(tool_calls))# trigger tool call eventsfor tool_call in tool_calls:ctx.send_event(ToolCallEvent(tool_call=tool_call))return None@step()async def call_tool(self, ev: ToolCallEvent) -> ToolCallEventResult:"""Calls tool."""tool_call = ev.tool_call# get tool ID and function callid_ = tool_call.tool_idif self._verbose:print(f"Calling function {tool_call.tool_name} with msg {tool_call.tool_kwargs}")# call function and put result into a chat messagetool = self.tools_dict[tool_call.tool_name]output = await tool.acall(**tool_call.tool_kwargs)msg = ChatMessage(name=tool_call.tool_name,content=str(output),role="tool",additional_kwargs={"tool_call_id": id_,"name": tool_call.tool_name})return ToolCallEventResult(msg=msg)@step(pass_context=True)async def gather(self, ctx: Context, ev: ToolCallEventResult) -> StopEvent | None:"""Gathers tool calls."""# wait for all tool call events to finish.tool_events = ctx.collect_events(ev, [ToolCallEventResult] * await ctx.get("num_tool_calls"))if not tool_events:return Nonefor tool_event in tool_events:# append tool call chat messages to historyself.chat_history.append(tool_event.msg)# # after all tool calls finish, pass input event back, restart agent loopreturn InputEvent()from muti_agent import sql_tool, llama_cloud_tool
wf = RouterOutputAgentWorkflow(tools=[sql_tool, llama_cloud_tool], verbose=True, timeout=120, llm=get_ollama("mistral-nemo"))async def main():result = await wf.run(message="Which city has the highest population?")print("RSULT ===============", result)# if __name__ == "__main__":
#     import asyncio#     asyncio.run(main())import gradio as grasync def random_response(message, history):wf.reset()result = await wf.run(message=message)print("RSULT ===============", result)return resultdemo = gr.ChatInterface(random_response, clear_btn=None, title="Qwen2")demo.launch()

输入问题是 “What are five popular travel spots in Los Angeles?”,自动路由到 VectorIndex 进行查询。
在这里插入图片描述
输入问题为 “which city has the most population” 时,调用数据库进行搜索。
在这里插入图片描述

总结

LlamaIndex 中搜索引擎自动路由,根据用户的输入型自动选择所需的搜索引擎,这里有一个需要注意的点,模型需要支持 Function Call。如果 Ollama 本地模型进行推理,不是所有的本地模型都支持Function Call,Llama3.1 和 mistral-nemo 是支持 Function Call 的,可以使用。

这篇关于LlamaIndex 使用 RouterOutputAgentWorkflow的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1141971

相关文章

postgresql使用UUID函数的方法

《postgresql使用UUID函数的方法》本文给大家介绍postgresql使用UUID函数的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录PostgreSQL有两种生成uuid的方法。可以先通过sql查看是否已安装扩展函数,和可以安装的扩展函数

如何使用Lombok进行spring 注入

《如何使用Lombok进行spring注入》本文介绍如何用Lombok简化Spring注入,推荐优先使用setter注入,通过注解自动生成getter/setter及构造器,减少冗余代码,提升开发效... Lombok为了开发环境简化代码,好处不用多说。spring 注入方式为2种,构造器注入和setter

MySQL中比较运算符的具体使用

《MySQL中比较运算符的具体使用》本文介绍了SQL中常用的符号类型和非符号类型运算符,符号类型运算符包括等于(=)、安全等于(=)、不等于(/!=)、大小比较(,=,,=)等,感兴趣的可以了解一下... 目录符号类型运算符1. 等于运算符=2. 安全等于运算符<=>3. 不等于运算符<>或!=4. 小于运

使用zip4j实现Java中的ZIP文件加密压缩的操作方法

《使用zip4j实现Java中的ZIP文件加密压缩的操作方法》本文介绍如何通过Maven集成zip4j1.3.2库创建带密码保护的ZIP文件,涵盖依赖配置、代码示例及加密原理,确保数据安全性,感兴趣的... 目录1. zip4j库介绍和版本1.1 zip4j库概述1.2 zip4j的版本演变1.3 zip4

Python 字典 (Dictionary)使用详解

《Python字典(Dictionary)使用详解》字典是python中最重要,最常用的数据结构之一,它提供了高效的键值对存储和查找能力,:本文主要介绍Python字典(Dictionary)... 目录字典1.基本特性2.创建字典3.访问元素4.修改字典5.删除元素6.字典遍历7.字典的高级特性默认字典

使用Python构建一个高效的日志处理系统

《使用Python构建一个高效的日志处理系统》这篇文章主要为大家详细讲解了如何使用Python开发一个专业的日志分析工具,能够自动化处理、分析和可视化各类日志文件,大幅提升运维效率,需要的可以了解下... 目录环境准备工具功能概述完整代码实现代码深度解析1. 类设计与初始化2. 日志解析核心逻辑3. 文件处

一文详解如何使用Java获取PDF页面信息

《一文详解如何使用Java获取PDF页面信息》了解PDF页面属性是我们在处理文档、内容提取、打印设置或页面重组等任务时不可或缺的一环,下面我们就来看看如何使用Java语言获取这些信息吧... 目录引言一、安装和引入PDF处理库引入依赖二、获取 PDF 页数三、获取页面尺寸(宽高)四、获取页面旋转角度五、判断

C++中assign函数的使用

《C++中assign函数的使用》在C++标准模板库中,std::list等容器都提供了assign成员函数,它比操作符更灵活,支持多种初始化方式,下面就来介绍一下assign的用法,具有一定的参考价... 目录​1.assign的基本功能​​语法​2. 具体用法示例​​​(1) 填充n个相同值​​(2)

Spring StateMachine实现状态机使用示例详解

《SpringStateMachine实现状态机使用示例详解》本文介绍SpringStateMachine实现状态机的步骤,包括依赖导入、枚举定义、状态转移规则配置、上下文管理及服务调用示例,重点解... 目录什么是状态机使用示例什么是状态机状态机是计算机科学中的​​核心建模工具​​,用于描述对象在其生命

使用Python删除Excel中的行列和单元格示例详解

《使用Python删除Excel中的行列和单元格示例详解》在处理Excel数据时,删除不需要的行、列或单元格是一项常见且必要的操作,本文将使用Python脚本实现对Excel表格的高效自动化处理,感兴... 目录开发环境准备使用 python 删除 Excphpel 表格中的行删除特定行删除空白行删除含指定