【AI Agent系列】【MetaGPT】7. 一句话订阅专属信息 - 订阅智能体进阶,实现一个更通用的订阅智能体

本文主要是介绍【AI Agent系列】【MetaGPT】7. 一句话订阅专属信息 - 订阅智能体进阶,实现一个更通用的订阅智能体,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 0. 前置推荐阅读
    • 1. 本文内容
    • 2. 解析用户指令(分析用户需求)
      • 2.1 完整代码及注释
      • 2.2 运行结果
    • 3. 利用大模型写爬虫代码
      • 3.1 对html内容进行精简
      • 3.2 利用大模型写爬虫代码
      • 3.3 补充代码,测试本节程序
      • 3.4 运行结果及踩坑
        • 3.4.1 运行结果
        • 3.4.2 坑一:No module named 'playwright'
    • 4. 爬虫工程师角色定义:CrawlerEngineer
    • 5. 订阅助手角色定义:SubscriptionAssistant
    • 6. 运行订阅智能体的Action:RunSubscription
      • 6.1 总结信息的Action
      • 6.2 运行订阅智能体的Action
    • 7. 定时器代码和callback代码
      • 7.1 定时器
      • 7.2 callback
      • 7.3 集成callback
    • 8. 整体运行
      • 运行结果
    • 9. 可能有的坑
      • 9.1 更新MetaGPT源码后,运行报错:No module named 'zhipuai.types'

0. 前置推荐阅读

  • 订阅智能体实战

    • 【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件
  • ActionNode基础与实战

    • 【AI的未来 - AI Agent系列】【MetaGPT】4. ActionNode从理论到实战

    • 【AI的未来 - AI Agent系列】【MetaGPT】4.1 细说我在ActionNode实战中踩的那些坑

1. 本文内容

跟着《MetaGPT智能体开发入门》教程,对上次实现的订阅智能体进行优化和进阶,也用上ActionNode模块。

上次实现订阅智能体的文章可参考 【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件 。在上次这篇文章中我们虽然完成了订阅智能体的功能,但是这个智能体的作用是被局限在某一特定领域的,当我们需要订阅另外一个数据源的分析结果时,我们需要手动再写一个Role,这个智能体不太通用。

如何实现一个更通用的订阅智能体?实现思路如下:

(1)解析用户指令(分析用户需求,我们期望用户用一句自然语言下指令即可得到想要的)
(2)爬取网页
(3)让大模型写从html中提取用户需要的数据的代码
(4)将爬取网页和大模型写的提取用户需要数据的代码结合,得到爬取指定网页信息的Action
(5)将提取后的数据,让大模型进行处理和分析,得到网页数据分析的Action
(6)将这两个Action组合,就可以得到一个特定网页的Watcher Role

下面我们一步一步来实现。

2. 解析用户指令(分析用户需求)

写爬虫代码是需要先知道要爬取的网页URL、网页数据提取需求是什么,最简单的方式就是用户自己填写,但是这就让我们的入参变得复杂。所以,我们首先实现一个解析用户指令的代码,用户只需用一句自然语言输入自己的需求,代码自动解析出网页URL、网页数据提取需求等,即:将用户的输入结构化

2.1 完整代码及注释

# 加载 .env 到环境变量
# from dotenv import load_dotenv, find_dotenv
# _ = load_dotenv(find_dotenv())from metagpt.actions.action_node import ActionNode
from metagpt.actions.action import Action
import asyncio## 分析用户的要求语言
LANGUAGE = ActionNode(key="language",expected_type=str,instruction="Provide the language used in the project, typically matching the user's requirement language.",example="en_us",
)## 分析用户的订阅推送时间
CRON_EXPRESSION = ActionNode(key="Cron Expression",expected_type=str,instruction="If the user requires scheduled triggering, please provide the corresponding 5-field cron expression. ""Otherwise, leave it blank.",example="",
)## 分析用户订阅的网址URL,可以是列表
CRAWLER_URL_LIST = ActionNode(key="Crawler URL List",expected_type=list[str],instruction="List the URLs user want to crawl. Leave it blank if not provided in the User Requirement.",example=["https://example1.com", "https://example2.com"],
)## 分析用户所需要的网站数据
PAGE_CONTENT_EXTRACTION = ActionNode(key="Page Content Extraction",expected_type=str,instruction="Specify the requirements and tips to extract from the crawled web pages based on User Requirement.",example="Retrieve the titles and content of articles published today.",
)## 分析用户所需要的汇总数据的方式
CRAWL_POST_PROCESSING = ActionNode(key="Crawl Post Processing",expected_type=str,instruction="Specify the processing to be applied to the crawled content, such as summarizing today's news.",example="Generate a summary of today's news articles.",
)## 补充说明,如果url或定时器解析为空,则提示用户补充
INFORMATION_SUPPLEMENT = ActionNode(key="Information Supplement",expected_type=str,instruction="If unable to obtain the Cron Expression, prompt the user to provide the time to receive subscription ""messages. If unable to obtain the URL List Crawler, prompt the user to provide the URLs they want to crawl. Keep it ""blank if everything is clear",example="",
)NODES = [LANGUAGE,CRON_EXPRESSION,CRAWLER_URL_LIST,PAGE_CONTENT_EXTRACTION,CRAWL_POST_PROCESSING,INFORMATION_SUPPLEMENT,
]PARSE_SUB_REQUIREMENTS_NODE = ActionNode.from_children("ParseSubscriptionReq", NODES)## 解析用户的需求的Action
PARSE_SUB_REQUIREMENT_TEMPLATE = """
### User Requirement
{requirements}
"""class ParseSubRequirement(Action):async def run(self, requirements):requirements = "\n".join(i.content for i in requirements)context = PARSE_SUB_REQUIREMENT_TEMPLATE.format(requirements=requirements)node = await PARSE_SUB_REQUIREMENTS_NODE.fill(context=context, llm=self.llm)return nodeif __name__ == "__main__":from metagpt.schema import Messageasyncio.run(ParseSubRequirement().run([Message("从36kr创投平台https://pitchhub.36kr.com/financing-flash 爬取所有初创企业融资的信息,获取标题,链接, 时间,总结今天的融资新闻,然后在晚上七点半送给我")]))

2.2 运行结果

在这里插入图片描述

3. 利用大模型写爬虫代码

3.1 对html内容进行精简

一般html元素太多,所以调用大模型时token数非常大,甚至一次对话都无法接收一个完整的html,我们可以先对网页内容做一下简化,因为对元素进行定位一般用css selector就够了,所以我们可以主要提供html的class属性信息,另外可以将html转成css表达式和对应的内容提供给llm,从而减少token的消耗。

def get_outline(page):soup = _get_soup(page.html)outline = []def process_element(element, depth):name = element.nameif not name:returnif name in ["script", "style"]:returnelement_info = {"name": element.name, "depth": depth}if name in ["svg"]:element_info["text"] = Noneoutline.append(element_info)returnelement_info["text"] = element.string# Check if the element has an "id" attributeif "id" in element.attrs:element_info["id"] = element["id"]if "class" in element.attrs:element_info["class"] = element["class"]outline.append(element_info)for child in element.children:process_element(child, depth + 1)for element in soup.body.children:process_element(element, 1)return outline

3.2 利用大模型写爬虫代码

PROMPT_TEMPLATE = """Please complete the web page crawler parse function to achieve the User Requirement. The parse \
function should take a BeautifulSoup object as input, which corresponds to the HTML outline provided in the Context.```python
from bs4 import BeautifulSoup# only complete the parse function
def parse(soup: BeautifulSoup):...# Return the object that the user wants to retrieve, don't use print## User Requirement
{requirement}## ContextThe outline of html page to scrabe is show like below:```tree
{outline}
"""class WriteCrawlerCode(Action):async def run(self, requirement):requirement: Message = requirement[-1] ## 上一步解析完成的用户需求结果data = requirement.instruct_content.dict()urls = data["Crawler URL List"] ## 获取用户想要的网页URLquery = data["Page Content Extraction"] ## 获取用户想关注的网页数据codes = {}for url in urls:codes[url] = await self._write_code(url, query) ## 每个URL的爬虫程序return "\n".join(f"# {url}\n{code}" for url, code in codes.items())async def _write_code(self, url, query):page = await WebBrowserEngine().run(url)outline = get_outline(page)outline = "\n".join(f"{' '*i['depth']}{'.'.join([i['name'], *i.get('class', [])])}: {i['text'] if i['text'] else ''}"for i in outline)code_rsp = await self._aask(PROMPT_TEMPLATE.format(outline=outline, requirement=query))code = CodeParser.parse_code(block="", text=code_rsp)return code

3.3 补充代码,测试本节程序

  • 引入所需包
import asyncio
from metagpt.actions.action import Action
from metagpt.schema import Message
from metagpt.tools.web_browser_engine import WebBrowserEngine
from metagpt.utils.common import CodeParser
from metagpt.utils.parse_html import _get_soup
  • main运行函数
if __name__ == "__main__":from metagpt.actions.action_node import ActionNodecls = ActionNode.create_model_class("ActionModel", {"Cron Expression": (str, ...),"Crawler URL List": (list[str], ...),"Page Content Extraction": (str, ...),"Crawl Post Processing": (str, ...),})data = {"Cron Expression": "0 30 19 * * *","Crawler URL List": ["https://pitchhub.36kr.com/financing-flash"],"Page Content Extraction": "从36kr创投平台爬取所有初创企业融资的信息,获取标题,链接, 时间。","Crawl Post Processing": "总结今天的融资新闻。",}asyncio.run(WriteCrawlerCode().run([Message(instruct_content=cls(**data))]))

3.4 运行结果及踩坑

3.4.1 运行结果

在这里插入图片描述

3.4.2 坑一:No module named ‘playwright’

在这里插入图片描述

  • 解决方法:
pip install playwright

4. 爬虫工程师角色定义:CrawlerEngineer

# 定义爬虫工程师角色
from metagpt.roles import Role
class CrawlerEngineer(Role):name: str = "同学小张的专属爬虫工程师"profile: str = "Crawling Engineer"goal: str = "Write elegant, readable, extensible, efficient code"constraints: str = "The code should conform to standards like PEP8 and be modular and maintainable"def __init__(self, **kwargs) -> None:super().__init__(**kwargs)self._init_actions([WriteCrawlerCode])self._watch([ParseSubRequirement])  ## 触发?

爬虫工程师角色的代码中,初始化了一个写爬虫代码的Action,为WriteCrawlerCode。那这个角色什么时候触发运行呢?答案在这一句:self._watch([ParseSubRequirement]),当解析完用户需求之后,会触发该角色的运行,触发写代码的动作。

5. 订阅助手角色定义:SubscriptionAssistant

class SubscriptionAssistant(Role):"""Analyze user subscription requirements."""name: str = "同学小张的订阅助手"profile: str = "Subscription Assistant"goal: str = "analyze user subscription requirements to provide personalized subscription services."constraints: str = "utilize the same language as the User Requirement"def __init__(self, **kwargs) -> None:super().__init__(**kwargs)self._init_actions([ParseSubRequirement, RunSubscription]) ## 2. 先解析用户需求,然后运行订阅self._watch([UserRequirement, WriteCrawlerCode])  ## 触发?async def _think(self) -> bool:cause_by = self.rc.history[-1].cause_byif cause_by == any_to_str(UserRequirement):state = 0elif cause_by == any_to_str(WriteCrawlerCode):state = 1if self.rc.state == state:self.rc.todo = Nonereturn Falseself._set_state(state)return True

订阅助手角色定义的代码中,初始化了两个Action,一个是解析用户需求,一个是RunSubscription,运行智能体。
看下它是怎么运作的:
(1)self._watch([UserRequirement, WriteCrawlerCode])
它观察了这两个输入,用户输入和写完爬虫代码。
(2)_think函数
当上一次的信息是来自用户输入,则运行ParseSubRequirement,如果上一次输入是写爬虫代码的Action,则运行RunSubscription。

6. 运行订阅智能体的Action:RunSubscription

6.1 总结信息的Action

有了用户需求,有了爬虫代码,还缺的就是将爬下来的数据总结成用户需求中想要的内容。下面的SubAction就是干这事儿的(类比之前订阅智能体中的AnalysisOSSTrending):

data是从所有Url网页中爬下来的数据,process是之前解析的用户对数据的处理需求。

SUB_ACTION_TEMPLATE = """
## Requirements
Answer the question based on the provided context {process}. If the question cannot be answered, please summarize the context.## context
{data}"
"""class SubAction(Action):async def run(self, *args, **kwargs):pages = await WebBrowserEngine().run(*urls)if len(urls) == 1:pages = [pages]data = []for url, page in zip(urls, pages):data.append(getattr(modules[url], "parse")(page.soup))## 8. 根据用户的数据需求,和爬取的网页数据,让大模型总结return await self.llm.aask(SUB_ACTION_TEMPLATE.format(process=process, data=data))

6.2 运行订阅智能体的Action

# 运行订阅智能体的Action
class RunSubscription(Action):async def run(self, msgs):from metagpt.roles.role import Rolefrom metagpt.subscription import SubscriptionRunnercode = msgs[-1].content ## 4. 获取爬虫代码req = msgs[-2].instruct_content.dict() ## 5. 获取用户需求urls = req["Crawler URL List"]process = req["Crawl Post Processing"]spec = req["Cron Expression"]SubAction = self.create_sub_action_cls(urls, code, process) ## 6. 创建一个Action,urls网页链接、code爬虫代码、process用户需求的数据SubRole = type("SubRole", (Role,), {}) ## 7. 定时触发的Rolerole = SubRole()role.init_actions([SubAction])runner = SubscriptionRunner()async def callback(msg):print(msg)await runner.subscribe(role, CronTrigger(spec), callback)await runner.run()@staticmethoddef create_sub_action_cls(urls: list[str], code: str, process: str):modules = {}for url in urls[::-1]:code, current = code.rsplit(f"# {url}", maxsplit=1)name = uuid4().hexmodule = type(sys)(name)exec(current, module.__dict__)modules[url] = moduleclass SubAction(Action):async def run(self, *args, **kwargs):pages = await WebBrowserEngine().run(*urls)if len(urls) == 1:pages = [pages]data = []for url, page in zip(urls, pages):data.append(getattr(modules[url], "parse")(page.soup))## 8. 根据用户的数据需求,和爬取的网页数据,让大模型总结return await self.llm.aask(SUB_ACTION_TEMPLATE.format(process=process, data=data))return SubAction

7. 定时器代码和callback代码

这部分代码就可以直接复用原来订阅智能体的代码了,通用的。

7.1 定时器

class CronTrigger:def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None) -> None:self.crontab = crontab(spec, tz=tz)def __aiter__(self):return selfasync def __anext__(self):await self.crontab.next()return Message()

7.2 callback

class WxPusherClient:def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):self.base_url = base_urlself.token = token or os.environ["WXPUSHER_TOKEN"] # 5.1 从环境变量中获取token,所以你需要在环境变量中配置WXPUSHER_TOKEN或在配置文件中设置WXPUSHER_TOKENasync def send_message(self,content,summary: Optional[str] = None,content_type: int = 1,topic_ids: Optional[list[int]] = None,uids: Optional[list[int]] = None,verify: bool = False,url: Optional[str] = None,):payload = {"appToken": self.token,"content": content,"summary": summary,"contentType": content_type,"topicIds": topic_ids or [],# 5.2 从环境变量中获取uids,所以你需要在环境变量中配置WXPUSHER_UIDS# uids是你想推送给哪个微信,必须是关注了你这个订阅号的微信才可以知道uid"uids": uids or os.environ["WXPUSHER_UIDS"].split(","), "verifyPay": verify,"url": url,}url = f"{self.base_url}/api/send/message"return await self._request("POST", url, json=payload)async def _request(self, method, url, **kwargs):async with aiohttp.ClientSession() as session:async with session.request(method, url, **kwargs) as response:response.raise_for_status()return await response.json()# 5.3 微信callback wrapper,使用WxPusherClient给指定微信推送消息
async def wxpusher_callback(msg: Message):client = WxPusherClient()await client.send_message(msg.content, content_type=3)

7.3 集成callback

# 运行订阅智能体的Action
class RunSubscription(Action):async def run(self, msgs):······ 与前文代码一样callbacks = []callbacks.append(wxpusher_callback)async def callback(msg):print(msg)await asyncio.gather(*(call(msg) for call in callbacks)) # 遍历所有回调函数,触发回调,分发消息······ 与前文代码一样

8. 整体运行

创建了一个Team,其中有SubscriptionAssistant角色和CrawlerEngineer角色。

用户输入信息,然后run_project,SubscriptionAssistant首先观察到用户输入信息,触发ParseSubRequirement动作。

if __name__ == "__main__":import asynciofrom metagpt.team import Teamteam = Team()team.hire([SubscriptionAssistant(), CrawlerEngineer()])team.run_project("从36kr创投平台https://pitchhub.36kr.com/financing-flash爬取所有初创企业融资的信息,获取标题,链接, 时间,总结今天的融资新闻,然后在早上10:56送给我")asyncio.run(team.run())

运行结果

在这里插入图片描述

9. 可能有的坑

9.1 更新MetaGPT源码后,运行报错:No module named ‘zhipuai.types’

在这里插入图片描述

  • 解决方法
pip install zhipuai

本文完。按照教程跑通了订阅智能体进阶的流程。

这好像是教程中唯一的一个多智能体的例子。所以里面涉及了多智能体之间的交互方式,懵懵懂懂,需要深入了解下(_watch、_observe等)。咱们后面的文章细讲。

这篇关于【AI Agent系列】【MetaGPT】7. 一句话订阅专属信息 - 订阅智能体进阶,实现一个更通用的订阅智能体的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/635800

相关文章

Linux下删除乱码文件和目录的实现方式

《Linux下删除乱码文件和目录的实现方式》:本文主要介绍Linux下删除乱码文件和目录的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux下删除乱码文件和目录方法1方法2总结Linux下删除乱码文件和目录方法1使用ls -i命令找到文件或目录

SpringBoot+EasyExcel实现自定义复杂样式导入导出

《SpringBoot+EasyExcel实现自定义复杂样式导入导出》这篇文章主要为大家详细介绍了SpringBoot如何结果EasyExcel实现自定义复杂样式导入导出功能,文中的示例代码讲解详细,... 目录安装处理自定义导出复杂场景1、列不固定,动态列2、动态下拉3、自定义锁定行/列,添加密码4、合并

mybatis执行insert返回id实现详解

《mybatis执行insert返回id实现详解》MyBatis插入操作默认返回受影响行数,需通过useGeneratedKeys+keyProperty或selectKey获取主键ID,确保主键为自... 目录 两种方式获取自增 ID:1. ​​useGeneratedKeys+keyProperty(推

Spring Boot集成Druid实现数据源管理与监控的详细步骤

《SpringBoot集成Druid实现数据源管理与监控的详细步骤》本文介绍如何在SpringBoot项目中集成Druid数据库连接池,包括环境搭建、Maven依赖配置、SpringBoot配置文件... 目录1. 引言1.1 环境准备1.2 Druid介绍2. 配置Druid连接池3. 查看Druid监控

Python通用唯一标识符模块uuid使用案例详解

《Python通用唯一标识符模块uuid使用案例详解》Pythonuuid模块用于生成128位全局唯一标识符,支持UUID1-5版本,适用于分布式系统、数据库主键等场景,需注意隐私、碰撞概率及存储优... 目录简介核心功能1. UUID版本2. UUID属性3. 命名空间使用场景1. 生成唯一标识符2. 数

Linux在线解压jar包的实现方式

《Linux在线解压jar包的实现方式》:本文主要介绍Linux在线解压jar包的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux在线解压jar包解压 jar包的步骤总结Linux在线解压jar包在 Centos 中解压 jar 包可以使用 u

Java中读取YAML文件配置信息常见问题及解决方法

《Java中读取YAML文件配置信息常见问题及解决方法》:本文主要介绍Java中读取YAML文件配置信息常见问题及解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 目录1 使用Spring Boot的@ConfigurationProperties2. 使用@Valu

Python办公自动化实战之打造智能邮件发送工具

《Python办公自动化实战之打造智能邮件发送工具》在数字化办公场景中,邮件自动化是提升工作效率的关键技能,本文将演示如何使用Python的smtplib和email库构建一个支持图文混排,多附件,多... 目录前言一、基础配置:搭建邮件发送框架1.1 邮箱服务准备1.2 核心库导入1.3 基础发送函数二、

c++ 类成员变量默认初始值的实现

《c++类成员变量默认初始值的实现》本文主要介绍了c++类成员变量默认初始值,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录C++类成员变量初始化c++类的变量的初始化在C++中,如果使用类成员变量时未给定其初始值,那么它将被

Qt使用QSqlDatabase连接MySQL实现增删改查功能

《Qt使用QSqlDatabase连接MySQL实现增删改查功能》这篇文章主要为大家详细介绍了Qt如何使用QSqlDatabase连接MySQL实现增删改查功能,文中的示例代码讲解详细,感兴趣的小伙伴... 目录一、创建数据表二、连接mysql数据库三、封装成一个完整的轻量级 ORM 风格类3.1 表结构