LlamaIndex 使用 RouterOutputAgentWorkflow

2024-09-06 12:12

本文主要是介绍LlamaIndex 使用 RouterOutputAgentWorkflow,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

LlamaIndex 中提供了一个 RouterOutputAgentWorkflow 功能,可以集成多个 QueryTool,根据用户的输入判断使用那个 QueryEngine,在做查询的时候,可以从不同的数据源进行查询,例如确定的数据从数据库查询,如果是语义查询可以从向量数据库进行查询。本文将实现两个搜索引擎,根据不同 Query 使用不同 QueryEngine。

安装 MySQL 依赖

pip install mysql-connector-python  

搜索引擎

定义搜索引擎,初始两个数据源

  • 使用 MySQL 作为数据库的数据源
  • 使用 VectorIndex 作为语义搜索数据源
from pathlib import Path
from llama_index.core.tools import QueryEngineTool
from llama_index.core import VectorStoreIndex
import llm
from llama_index.core import SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.query_engine import NLSQLTableQueryEngine
from llama_index.core import Settings
from llama_index.core import SQLDatabasefrom sqlalchemy import create_engine, MetaData, Table, Column, String, Integer, select
Settings.llm = llm.get_ollama("mistral-nemo")
Settings.embed_model = llm.get_ollama_embbeding()engine = create_engine('mysql+mysqlconnector://root:123456@localhost:13306/db_llama', echo=True  
)def init_db():# 初始化数据库metadata_obj = MetaData()table_name = "city_stats"city_stats_table = Table(table_name,metadata_obj,Column("city_name", String(16), primary_key=True),Column("population", Integer, ),Column("state", String(16), nullable=False),)metadata_obj.create_all(engine)sql_database = SQLDatabase(engine, include_tables=["city_stats"])from sqlalchemy import insertrows = [{"city_name": "New York City", "population": 8336000, "state": "New York"},{"city_name": "Los Angeles", "population": 3822000, "state": "California"},{"city_name": "Chicago", "population": 2665000, "state": "Illinois"},{"city_name": "Houston", "population": 2303000, "state": "Texas"},{"city_name": "Miami", "population": 449514, "state": "Florida"},{"city_name": "Seattle", "population": 749256, "state": "Washington"},]for row in rows:stmt = insert(city_stats_table).values(**row)with engine.begin() as connection:cursor = connection.execute(stmt)from llama_index.core.query_engine import NLSQLTableQueryEnginesql_database = SQLDatabase(engine, include_tables=["city_stats"])
sql_query_engine = NLSQLTableQueryEngine(sql_database=sql_database,tables=["city_stats"]
)def get_doc_index()-> VectorStoreIndex:'''解析 words'''# 创建 OllamaEmbedding 实例,用于指定嵌入模型和服务的基本 URLollama_embedding = llm.get_ollama_embbeding()# 读取 "./data" 目录中的数据并加载为文档对象documents = SimpleDirectoryReader(input_files=[Path(__file__).parent / "data" / "LA.pdf"]).load_data()# 从文档中创建 VectorStoreIndex,并使用 OllamaEmbedding 作为嵌入模型vector_index = VectorStoreIndex.from_documents(documents, embed_model=ollama_embedding, transformations=[SentenceSplitter(chunk_size=1000, chunk_overlap=20)],)vector_index.set_index_id("vector_index")  # 设置索引 IDvector_index.storage_context.persist("./storage")  # 将索引持久化到 "./storage"return vector_indexllama_index_query_engine = get_doc_index().as_query_engine()sql_tool = QueryEngineTool.from_defaults(query_engine=sql_query_engine,description=("Useful for translating a natural language query into a SQL query over"" a table containing: city_stats, containing the population/state of"" each city located in the USA."),name="sql_tool"
)llama_cloud_tool = QueryEngineTool.from_defaults(query_engine=llama_index_query_engine,description=(f"Useful for answering semantic questions about certain cities in the US."),name="llama_cloud_tool"
)

创建工作流

下图中显示了工作流的节点,绿色背景节点是工作流的动作,例如大模型返回 ToolEvent,ToolEvent 节点执行并返回结果。
在这里插入图片描述
工作流定义代码:

from typing import Dict, List, Any, Optionalfrom llama_index.core.tools import BaseTool
from llama_index.core.llms import ChatMessage
from llama_index.core.llms.llm import ToolSelection, LLM
from llama_index.core.workflow import (Workflow,Event,StartEvent,StopEvent,step,Context
)
from llama_index.core.base.response.schema import Response
from llama_index.core.tools import FunctionTool
from llama_index.utils.workflow import draw_all_possible_flows
from llm import get_ollamafrom docs import enable_traceenable_trace()class InputEvent(Event):"""Input event."""class GatherToolsEvent(Event):"""Gather Tools Event"""tool_calls: Anyclass ToolCallEvent(Event):"""Tool Call event"""tool_call: ToolSelectionclass ToolCallEventResult(Event):"""Tool call event result."""msg: ChatMessageclass RouterOutputAgentWorkflow(Workflow):"""Custom router output agent workflow."""def __init__(self,tools: List[BaseTool],timeout: Optional[float] = 10.0,disable_validation: bool = False,verbose: bool = False,llm: Optional[LLM] = None,chat_history: Optional[List[ChatMessage]] = None,):"""Constructor."""super().__init__(timeout=timeout, disable_validation=disable_validation, verbose=verbose)self.tools: List[BaseTool] = toolsself.tools_dict: Optional[Dict[str, BaseTool]] = {tool.metadata.name: tool for tool in self.tools}self.llm: LLM = llmself.chat_history: List[ChatMessage] = chat_history or []def reset(self) -> None:"""Resets Chat History"""self.chat_history = []@step()async def prepare_chat(self, ev: StartEvent) -> InputEvent:message = ev.get("message")if message is None:raise ValueError("'message' field is required.")# add msg to chat historychat_history = self.chat_historychat_history.append(ChatMessage(role="user", content=message))return InputEvent()@step()async def chat(self, ev: InputEvent) -> GatherToolsEvent | StopEvent:"""Appends msg to chat history, then gets tool calls."""# Put msg into LLM with tools includedchat_res = await self.llm.achat_with_tools(self.tools,chat_history=self.chat_history,verbose=self._verbose,allow_parallel_tool_calls=True)tool_calls = self.llm.get_tool_calls_from_response(chat_res, error_on_no_tool_call=False)ai_message = chat_res.messageself.chat_history.append(ai_message)if self._verbose:print(f"Chat message: {ai_message.content}")# no tool calls, return chat message.if not tool_calls:return StopEvent(result=ai_message.content)return GatherToolsEvent(tool_calls=tool_calls)@step(pass_context=True)async def dispatch_calls(self, ctx: Context, ev: GatherToolsEvent) -> ToolCallEvent:"""Dispatches calls."""tool_calls = ev.tool_callsawait ctx.set("num_tool_calls", len(tool_calls))# trigger tool call eventsfor tool_call in tool_calls:ctx.send_event(ToolCallEvent(tool_call=tool_call))return None@step()async def call_tool(self, ev: ToolCallEvent) -> ToolCallEventResult:"""Calls tool."""tool_call = ev.tool_call# get tool ID and function callid_ = tool_call.tool_idif self._verbose:print(f"Calling function {tool_call.tool_name} with msg {tool_call.tool_kwargs}")# call function and put result into a chat messagetool = self.tools_dict[tool_call.tool_name]output = await tool.acall(**tool_call.tool_kwargs)msg = ChatMessage(name=tool_call.tool_name,content=str(output),role="tool",additional_kwargs={"tool_call_id": id_,"name": tool_call.tool_name})return ToolCallEventResult(msg=msg)@step(pass_context=True)async def gather(self, ctx: Context, ev: ToolCallEventResult) -> StopEvent | None:"""Gathers tool calls."""# wait for all tool call events to finish.tool_events = ctx.collect_events(ev, [ToolCallEventResult] * await ctx.get("num_tool_calls"))if not tool_events:return Nonefor tool_event in tool_events:# append tool call chat messages to historyself.chat_history.append(tool_event.msg)# # after all tool calls finish, pass input event back, restart agent loopreturn InputEvent()from muti_agent import sql_tool, llama_cloud_tool
wf = RouterOutputAgentWorkflow(tools=[sql_tool, llama_cloud_tool], verbose=True, timeout=120, llm=get_ollama("mistral-nemo"))async def main():result = await wf.run(message="Which city has the highest population?")print("RSULT ===============", result)# if __name__ == "__main__":
#     import asyncio#     asyncio.run(main())import gradio as grasync def random_response(message, history):wf.reset()result = await wf.run(message=message)print("RSULT ===============", result)return resultdemo = gr.ChatInterface(random_response, clear_btn=None, title="Qwen2")demo.launch()

输入问题是 “What are five popular travel spots in Los Angeles?”,自动路由到 VectorIndex 进行查询。
在这里插入图片描述
输入问题为 “which city has the most population” 时,调用数据库进行搜索。
在这里插入图片描述

总结

LlamaIndex 中搜索引擎自动路由,根据用户的输入型自动选择所需的搜索引擎,这里有一个需要注意的点,模型需要支持 Function Call。如果 Ollama 本地模型进行推理,不是所有的本地模型都支持Function Call,Llama3.1 和 mistral-nemo 是支持 Function Call 的,可以使用。

这篇关于LlamaIndex 使用 RouterOutputAgentWorkflow的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1141971

相关文章

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传

pdfmake生成pdf的使用

实际项目中有时会有根据填写的表单数据或者其他格式的数据,将数据自动填充到pdf文件中根据固定模板生成pdf文件的需求 文章目录 利用pdfmake生成pdf文件1.下载安装pdfmake第三方包2.封装生成pdf文件的共用配置3.生成pdf文件的文件模板内容4.调用方法生成pdf 利用pdfmake生成pdf文件 1.下载安装pdfmake第三方包 npm i pdfma

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

git使用的说明总结

Git使用说明 下载安装(下载地址) macOS: Git - Downloading macOS Windows: Git - Downloading Windows Linux/Unix: Git (git-scm.com) 创建新仓库 本地创建新仓库:创建新文件夹,进入文件夹目录,执行指令 git init ,用以创建新的git 克隆仓库 执行指令用以创建一个本地仓库的

【北交大信息所AI-Max2】使用方法

BJTU信息所集群AI_MAX2使用方法 使用的前提是预约到相应的算力卡,拥有登录权限的账号密码,一般为导师组共用一个。 有浏览器、ssh工具就可以。 1.新建集群Terminal 浏览器登陆10.126.62.75 (如果是1集群把75改成66) 交互式开发 执行器选Terminal 密码随便设一个(需记住) 工作空间:私有数据、全部文件 加速器选GeForce_RTX_2080_Ti

【Linux 从基础到进阶】Ansible自动化运维工具使用

Ansible自动化运维工具使用 Ansible 是一款开源的自动化运维工具,采用无代理架构(agentless),基于 SSH 连接进行管理,具有简单易用、灵活强大、可扩展性高等特点。它广泛用于服务器管理、应用部署、配置管理等任务。本文将介绍 Ansible 的安装、基本使用方法及一些实际运维场景中的应用,旨在帮助运维人员快速上手并熟练运用 Ansible。 1. Ansible的核心概念