【AI Agent系列】【MetaGPT】7. 一句话订阅专属信息 - 订阅智能体进阶,实现一个更通用的订阅智能体

本文主要是介绍【AI Agent系列】【MetaGPT】7. 一句话订阅专属信息 - 订阅智能体进阶,实现一个更通用的订阅智能体,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 0. 前置推荐阅读
    • 1. 本文内容
    • 2. 解析用户指令(分析用户需求)
      • 2.1 完整代码及注释
      • 2.2 运行结果
    • 3. 利用大模型写爬虫代码
      • 3.1 对html内容进行精简
      • 3.2 利用大模型写爬虫代码
      • 3.3 补充代码,测试本节程序
      • 3.4 运行结果及踩坑
        • 3.4.1 运行结果
        • 3.4.2 坑一:No module named 'playwright'
    • 4. 爬虫工程师角色定义:CrawlerEngineer
    • 5. 订阅助手角色定义:SubscriptionAssistant
    • 6. 运行订阅智能体的Action:RunSubscription
      • 6.1 总结信息的Action
      • 6.2 运行订阅智能体的Action
    • 7. 定时器代码和callback代码
      • 7.1 定时器
      • 7.2 callback
      • 7.3 集成callback
    • 8. 整体运行
      • 运行结果
    • 9. 可能有的坑
      • 9.1 更新MetaGPT源码后,运行报错:No module named 'zhipuai.types'

0. 前置推荐阅读

  • 订阅智能体实战

    • 【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件
  • ActionNode基础与实战

    • 【AI的未来 - AI Agent系列】【MetaGPT】4. ActionNode从理论到实战

    • 【AI的未来 - AI Agent系列】【MetaGPT】4.1 细说我在ActionNode实战中踩的那些坑

1. 本文内容

跟着《MetaGPT智能体开发入门》教程,对上次实现的订阅智能体进行优化和进阶,也用上ActionNode模块。

上次实现订阅智能体的文章可参考 【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件 。在上次这篇文章中我们虽然完成了订阅智能体的功能,但是这个智能体的作用是被局限在某一特定领域的,当我们需要订阅另外一个数据源的分析结果时,我们需要手动再写一个Role,这个智能体不太通用。

如何实现一个更通用的订阅智能体?实现思路如下:

(1)解析用户指令(分析用户需求,我们期望用户用一句自然语言下指令即可得到想要的)
(2)爬取网页
(3)让大模型写从html中提取用户需要的数据的代码
(4)将爬取网页和大模型写的提取用户需要数据的代码结合,得到爬取指定网页信息的Action
(5)将提取后的数据,让大模型进行处理和分析,得到网页数据分析的Action
(6)将这两个Action组合,就可以得到一个特定网页的Watcher Role

下面我们一步一步来实现。

2. 解析用户指令(分析用户需求)

写爬虫代码是需要先知道要爬取的网页URL、网页数据提取需求是什么,最简单的方式就是用户自己填写,但是这就让我们的入参变得复杂。所以,我们首先实现一个解析用户指令的代码,用户只需用一句自然语言输入自己的需求,代码自动解析出网页URL、网页数据提取需求等,即:将用户的输入结构化

2.1 完整代码及注释

# 加载 .env 到环境变量
# from dotenv import load_dotenv, find_dotenv
# _ = load_dotenv(find_dotenv())from metagpt.actions.action_node import ActionNode
from metagpt.actions.action import Action
import asyncio## 分析用户的要求语言
LANGUAGE = ActionNode(key="language",expected_type=str,instruction="Provide the language used in the project, typically matching the user's requirement language.",example="en_us",
)## 分析用户的订阅推送时间
CRON_EXPRESSION = ActionNode(key="Cron Expression",expected_type=str,instruction="If the user requires scheduled triggering, please provide the corresponding 5-field cron expression. ""Otherwise, leave it blank.",example="",
)## 分析用户订阅的网址URL,可以是列表
CRAWLER_URL_LIST = ActionNode(key="Crawler URL List",expected_type=list[str],instruction="List the URLs user want to crawl. Leave it blank if not provided in the User Requirement.",example=["https://example1.com", "https://example2.com"],
)## 分析用户所需要的网站数据
PAGE_CONTENT_EXTRACTION = ActionNode(key="Page Content Extraction",expected_type=str,instruction="Specify the requirements and tips to extract from the crawled web pages based on User Requirement.",example="Retrieve the titles and content of articles published today.",
)## 分析用户所需要的汇总数据的方式
CRAWL_POST_PROCESSING = ActionNode(key="Crawl Post Processing",expected_type=str,instruction="Specify the processing to be applied to the crawled content, such as summarizing today's news.",example="Generate a summary of today's news articles.",
)## 补充说明,如果url或定时器解析为空,则提示用户补充
INFORMATION_SUPPLEMENT = ActionNode(key="Information Supplement",expected_type=str,instruction="If unable to obtain the Cron Expression, prompt the user to provide the time to receive subscription ""messages. If unable to obtain the URL List Crawler, prompt the user to provide the URLs they want to crawl. Keep it ""blank if everything is clear",example="",
)NODES = [LANGUAGE,CRON_EXPRESSION,CRAWLER_URL_LIST,PAGE_CONTENT_EXTRACTION,CRAWL_POST_PROCESSING,INFORMATION_SUPPLEMENT,
]PARSE_SUB_REQUIREMENTS_NODE = ActionNode.from_children("ParseSubscriptionReq", NODES)## 解析用户的需求的Action
PARSE_SUB_REQUIREMENT_TEMPLATE = """
### User Requirement
{requirements}
"""class ParseSubRequirement(Action):async def run(self, requirements):requirements = "\n".join(i.content for i in requirements)context = PARSE_SUB_REQUIREMENT_TEMPLATE.format(requirements=requirements)node = await PARSE_SUB_REQUIREMENTS_NODE.fill(context=context, llm=self.llm)return nodeif __name__ == "__main__":from metagpt.schema import Messageasyncio.run(ParseSubRequirement().run([Message("从36kr创投平台https://pitchhub.36kr.com/financing-flash 爬取所有初创企业融资的信息,获取标题,链接, 时间,总结今天的融资新闻,然后在晚上七点半送给我")]))

2.2 运行结果

在这里插入图片描述

3. 利用大模型写爬虫代码

3.1 对html内容进行精简

一般html元素太多,所以调用大模型时token数非常大,甚至一次对话都无法接收一个完整的html,我们可以先对网页内容做一下简化,因为对元素进行定位一般用css selector就够了,所以我们可以主要提供html的class属性信息,另外可以将html转成css表达式和对应的内容提供给llm,从而减少token的消耗。

def get_outline(page):soup = _get_soup(page.html)outline = []def process_element(element, depth):name = element.nameif not name:returnif name in ["script", "style"]:returnelement_info = {"name": element.name, "depth": depth}if name in ["svg"]:element_info["text"] = Noneoutline.append(element_info)returnelement_info["text"] = element.string# Check if the element has an "id" attributeif "id" in element.attrs:element_info["id"] = element["id"]if "class" in element.attrs:element_info["class"] = element["class"]outline.append(element_info)for child in element.children:process_element(child, depth + 1)for element in soup.body.children:process_element(element, 1)return outline

3.2 利用大模型写爬虫代码

PROMPT_TEMPLATE = """Please complete the web page crawler parse function to achieve the User Requirement. The parse \
function should take a BeautifulSoup object as input, which corresponds to the HTML outline provided in the Context.```python
from bs4 import BeautifulSoup# only complete the parse function
def parse(soup: BeautifulSoup):...# Return the object that the user wants to retrieve, don't use print## User Requirement
{requirement}## ContextThe outline of html page to scrabe is show like below:```tree
{outline}
"""class WriteCrawlerCode(Action):async def run(self, requirement):requirement: Message = requirement[-1] ## 上一步解析完成的用户需求结果data = requirement.instruct_content.dict()urls = data["Crawler URL List"] ## 获取用户想要的网页URLquery = data["Page Content Extraction"] ## 获取用户想关注的网页数据codes = {}for url in urls:codes[url] = await self._write_code(url, query) ## 每个URL的爬虫程序return "\n".join(f"# {url}\n{code}" for url, code in codes.items())async def _write_code(self, url, query):page = await WebBrowserEngine().run(url)outline = get_outline(page)outline = "\n".join(f"{' '*i['depth']}{'.'.join([i['name'], *i.get('class', [])])}: {i['text'] if i['text'] else ''}"for i in outline)code_rsp = await self._aask(PROMPT_TEMPLATE.format(outline=outline, requirement=query))code = CodeParser.parse_code(block="", text=code_rsp)return code

3.3 补充代码,测试本节程序

  • 引入所需包
import asyncio
from metagpt.actions.action import Action
from metagpt.schema import Message
from metagpt.tools.web_browser_engine import WebBrowserEngine
from metagpt.utils.common import CodeParser
from metagpt.utils.parse_html import _get_soup
  • main运行函数
if __name__ == "__main__":from metagpt.actions.action_node import ActionNodecls = ActionNode.create_model_class("ActionModel", {"Cron Expression": (str, ...),"Crawler URL List": (list[str], ...),"Page Content Extraction": (str, ...),"Crawl Post Processing": (str, ...),})data = {"Cron Expression": "0 30 19 * * *","Crawler URL List": ["https://pitchhub.36kr.com/financing-flash"],"Page Content Extraction": "从36kr创投平台爬取所有初创企业融资的信息,获取标题,链接, 时间。","Crawl Post Processing": "总结今天的融资新闻。",}asyncio.run(WriteCrawlerCode().run([Message(instruct_content=cls(**data))]))

3.4 运行结果及踩坑

3.4.1 运行结果

在这里插入图片描述

3.4.2 坑一:No module named ‘playwright’

在这里插入图片描述

  • 解决方法:
pip install playwright

4. 爬虫工程师角色定义:CrawlerEngineer

# 定义爬虫工程师角色
from metagpt.roles import Role
class CrawlerEngineer(Role):name: str = "同学小张的专属爬虫工程师"profile: str = "Crawling Engineer"goal: str = "Write elegant, readable, extensible, efficient code"constraints: str = "The code should conform to standards like PEP8 and be modular and maintainable"def __init__(self, **kwargs) -> None:super().__init__(**kwargs)self._init_actions([WriteCrawlerCode])self._watch([ParseSubRequirement])  ## 触发?

爬虫工程师角色的代码中,初始化了一个写爬虫代码的Action,为WriteCrawlerCode。那这个角色什么时候触发运行呢?答案在这一句:self._watch([ParseSubRequirement]),当解析完用户需求之后,会触发该角色的运行,触发写代码的动作。

5. 订阅助手角色定义:SubscriptionAssistant

class SubscriptionAssistant(Role):"""Analyze user subscription requirements."""name: str = "同学小张的订阅助手"profile: str = "Subscription Assistant"goal: str = "analyze user subscription requirements to provide personalized subscription services."constraints: str = "utilize the same language as the User Requirement"def __init__(self, **kwargs) -> None:super().__init__(**kwargs)self._init_actions([ParseSubRequirement, RunSubscription]) ## 2. 先解析用户需求,然后运行订阅self._watch([UserRequirement, WriteCrawlerCode])  ## 触发?async def _think(self) -> bool:cause_by = self.rc.history[-1].cause_byif cause_by == any_to_str(UserRequirement):state = 0elif cause_by == any_to_str(WriteCrawlerCode):state = 1if self.rc.state == state:self.rc.todo = Nonereturn Falseself._set_state(state)return True

订阅助手角色定义的代码中,初始化了两个Action,一个是解析用户需求,一个是RunSubscription,运行智能体。
看下它是怎么运作的:
(1)self._watch([UserRequirement, WriteCrawlerCode])
它观察了这两个输入,用户输入和写完爬虫代码。
(2)_think函数
当上一次的信息是来自用户输入,则运行ParseSubRequirement,如果上一次输入是写爬虫代码的Action,则运行RunSubscription。

6. 运行订阅智能体的Action:RunSubscription

6.1 总结信息的Action

有了用户需求,有了爬虫代码,还缺的就是将爬下来的数据总结成用户需求中想要的内容。下面的SubAction就是干这事儿的(类比之前订阅智能体中的AnalysisOSSTrending):

data是从所有Url网页中爬下来的数据,process是之前解析的用户对数据的处理需求。

SUB_ACTION_TEMPLATE = """
## Requirements
Answer the question based on the provided context {process}. If the question cannot be answered, please summarize the context.## context
{data}"
"""class SubAction(Action):async def run(self, *args, **kwargs):pages = await WebBrowserEngine().run(*urls)if len(urls) == 1:pages = [pages]data = []for url, page in zip(urls, pages):data.append(getattr(modules[url], "parse")(page.soup))## 8. 根据用户的数据需求,和爬取的网页数据,让大模型总结return await self.llm.aask(SUB_ACTION_TEMPLATE.format(process=process, data=data))

6.2 运行订阅智能体的Action

# 运行订阅智能体的Action
class RunSubscription(Action):async def run(self, msgs):from metagpt.roles.role import Rolefrom metagpt.subscription import SubscriptionRunnercode = msgs[-1].content ## 4. 获取爬虫代码req = msgs[-2].instruct_content.dict() ## 5. 获取用户需求urls = req["Crawler URL List"]process = req["Crawl Post Processing"]spec = req["Cron Expression"]SubAction = self.create_sub_action_cls(urls, code, process) ## 6. 创建一个Action,urls网页链接、code爬虫代码、process用户需求的数据SubRole = type("SubRole", (Role,), {}) ## 7. 定时触发的Rolerole = SubRole()role.init_actions([SubAction])runner = SubscriptionRunner()async def callback(msg):print(msg)await runner.subscribe(role, CronTrigger(spec), callback)await runner.run()@staticmethoddef create_sub_action_cls(urls: list[str], code: str, process: str):modules = {}for url in urls[::-1]:code, current = code.rsplit(f"# {url}", maxsplit=1)name = uuid4().hexmodule = type(sys)(name)exec(current, module.__dict__)modules[url] = moduleclass SubAction(Action):async def run(self, *args, **kwargs):pages = await WebBrowserEngine().run(*urls)if len(urls) == 1:pages = [pages]data = []for url, page in zip(urls, pages):data.append(getattr(modules[url], "parse")(page.soup))## 8. 根据用户的数据需求,和爬取的网页数据,让大模型总结return await self.llm.aask(SUB_ACTION_TEMPLATE.format(process=process, data=data))return SubAction

7. 定时器代码和callback代码

这部分代码就可以直接复用原来订阅智能体的代码了,通用的。

7.1 定时器

class CronTrigger:def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None) -> None:self.crontab = crontab(spec, tz=tz)def __aiter__(self):return selfasync def __anext__(self):await self.crontab.next()return Message()

7.2 callback

class WxPusherClient:def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):self.base_url = base_urlself.token = token or os.environ["WXPUSHER_TOKEN"] # 5.1 从环境变量中获取token,所以你需要在环境变量中配置WXPUSHER_TOKEN或在配置文件中设置WXPUSHER_TOKENasync def send_message(self,content,summary: Optional[str] = None,content_type: int = 1,topic_ids: Optional[list[int]] = None,uids: Optional[list[int]] = None,verify: bool = False,url: Optional[str] = None,):payload = {"appToken": self.token,"content": content,"summary": summary,"contentType": content_type,"topicIds": topic_ids or [],# 5.2 从环境变量中获取uids,所以你需要在环境变量中配置WXPUSHER_UIDS# uids是你想推送给哪个微信,必须是关注了你这个订阅号的微信才可以知道uid"uids": uids or os.environ["WXPUSHER_UIDS"].split(","), "verifyPay": verify,"url": url,}url = f"{self.base_url}/api/send/message"return await self._request("POST", url, json=payload)async def _request(self, method, url, **kwargs):async with aiohttp.ClientSession() as session:async with session.request(method, url, **kwargs) as response:response.raise_for_status()return await response.json()# 5.3 微信callback wrapper,使用WxPusherClient给指定微信推送消息
async def wxpusher_callback(msg: Message):client = WxPusherClient()await client.send_message(msg.content, content_type=3)

7.3 集成callback

# 运行订阅智能体的Action
class RunSubscription(Action):async def run(self, msgs):······ 与前文代码一样callbacks = []callbacks.append(wxpusher_callback)async def callback(msg):print(msg)await asyncio.gather(*(call(msg) for call in callbacks)) # 遍历所有回调函数,触发回调,分发消息······ 与前文代码一样

8. 整体运行

创建了一个Team,其中有SubscriptionAssistant角色和CrawlerEngineer角色。

用户输入信息,然后run_project,SubscriptionAssistant首先观察到用户输入信息,触发ParseSubRequirement动作。

if __name__ == "__main__":import asynciofrom metagpt.team import Teamteam = Team()team.hire([SubscriptionAssistant(), CrawlerEngineer()])team.run_project("从36kr创投平台https://pitchhub.36kr.com/financing-flash爬取所有初创企业融资的信息,获取标题,链接, 时间,总结今天的融资新闻,然后在早上10:56送给我")asyncio.run(team.run())

运行结果

在这里插入图片描述

9. 可能有的坑

9.1 更新MetaGPT源码后,运行报错:No module named ‘zhipuai.types’

在这里插入图片描述

  • 解决方法
pip install zhipuai

本文完。按照教程跑通了订阅智能体进阶的流程。

这好像是教程中唯一的一个多智能体的例子。所以里面涉及了多智能体之间的交互方式,懵懵懂懂,需要深入了解下(_watch、_observe等)。咱们后面的文章细讲。

这篇关于【AI Agent系列】【MetaGPT】7. 一句话订阅专属信息 - 订阅智能体进阶,实现一个更通用的订阅智能体的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/635800

相关文章

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

如何使用Java实现请求deepseek

《如何使用Java实现请求deepseek》这篇文章主要为大家详细介绍了如何使用Java实现请求deepseek功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.deepseek的api创建2.Java实现请求deepseek2.1 pom文件2.2 json转化文件2.2

Spring AI集成DeepSeek的详细步骤

《SpringAI集成DeepSeek的详细步骤》DeepSeek作为一款卓越的国产AI模型,越来越多的公司考虑在自己的应用中集成,对于Java应用来说,我们可以借助SpringAI集成DeepSe... 目录DeepSeek 介绍Spring AI 是什么?1、环境准备2、构建项目2.1、pom依赖2.2

python使用fastapi实现多语言国际化的操作指南

《python使用fastapi实现多语言国际化的操作指南》本文介绍了使用Python和FastAPI实现多语言国际化的操作指南,包括多语言架构技术栈、翻译管理、前端本地化、语言切换机制以及常见陷阱和... 目录多语言国际化实现指南项目多语言架构技术栈目录结构翻译工作流1. 翻译数据存储2. 翻译生成脚本

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque

Python如何实现PDF隐私信息检测

《Python如何实现PDF隐私信息检测》随着越来越多的个人信息以电子形式存储和传输,确保这些信息的安全至关重要,本文将介绍如何使用Python检测PDF文件中的隐私信息,需要的可以参考下... 目录项目背景技术栈代码解析功能说明运行结php果在当今,数据隐私保护变得尤为重要。随着越来越多的个人信息以电子形

使用 sql-research-assistant进行 SQL 数据库研究的实战指南(代码实现演示)

《使用sql-research-assistant进行SQL数据库研究的实战指南(代码实现演示)》本文介绍了sql-research-assistant工具,该工具基于LangChain框架,集... 目录技术背景介绍核心原理解析代码实现演示安装和配置项目集成LangSmith 配置(可选)启动服务应用场景

使用Python快速实现链接转word文档

《使用Python快速实现链接转word文档》这篇文章主要为大家详细介绍了如何使用Python快速实现链接转word文档功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 演示代码展示from newspaper import Articlefrom docx import

前端原生js实现拖拽排课效果实例

《前端原生js实现拖拽排课效果实例》:本文主要介绍如何实现一个简单的课程表拖拽功能,通过HTML、CSS和JavaScript的配合,我们实现了课程项的拖拽、放置和显示功能,文中通过实例代码介绍的... 目录1. 效果展示2. 效果分析2.1 关键点2.2 实现方法3. 代码实现3.1 html部分3.2

Deepseek R1模型本地化部署+API接口调用详细教程(释放AI生产力)

《DeepseekR1模型本地化部署+API接口调用详细教程(释放AI生产力)》本文介绍了本地部署DeepSeekR1模型和通过API调用将其集成到VSCode中的过程,作者详细步骤展示了如何下载和... 目录前言一、deepseek R1模型与chatGPT o1系列模型对比二、本地部署步骤1.安装oll