本文主要是介绍【AI Agent系列】【MetaGPT】7. 一句话订阅专属信息 - 订阅智能体进阶,实现一个更通用的订阅智能体,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 0. 前置推荐阅读
- 1. 本文内容
- 2. 解析用户指令(分析用户需求)
- 2.1 完整代码及注释
- 2.2 运行结果
- 3. 利用大模型写爬虫代码
- 3.1 对html内容进行精简
- 3.2 利用大模型写爬虫代码
- 3.3 补充代码,测试本节程序
- 3.4 运行结果及踩坑
- 3.4.1 运行结果
- 3.4.2 坑一:No module named 'playwright'
- 4. 爬虫工程师角色定义:CrawlerEngineer
- 5. 订阅助手角色定义:SubscriptionAssistant
- 6. 运行订阅智能体的Action:RunSubscription
- 6.1 总结信息的Action
- 6.2 运行订阅智能体的Action
- 7. 定时器代码和callback代码
- 7.1 定时器
- 7.2 callback
- 7.3 集成callback
- 8. 整体运行
- 运行结果
- 9. 可能有的坑
- 9.1 更新MetaGPT源码后,运行报错:No module named 'zhipuai.types'
0. 前置推荐阅读
-
订阅智能体实战
- 【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件
-
ActionNode基础与实战
-
【AI的未来 - AI Agent系列】【MetaGPT】4. ActionNode从理论到实战
-
【AI的未来 - AI Agent系列】【MetaGPT】4.1 细说我在ActionNode实战中踩的那些坑
-
1. 本文内容
跟着《MetaGPT智能体开发入门》教程,对上次实现的订阅智能体进行优化和进阶,也用上ActionNode模块。
上次实现订阅智能体的文章可参考 【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件 。在上次这篇文章中我们虽然完成了订阅智能体的功能,但是这个智能体的作用是被局限在某一特定领域的,当我们需要订阅另外一个数据源的分析结果时,我们需要手动再写一个Role,这个智能体不太通用。
如何实现一个更通用的订阅智能体?实现思路如下:
(1)解析用户指令(分析用户需求,我们期望用户用一句自然语言下指令即可得到想要的)
(2)爬取网页
(3)让大模型写从html中提取用户需要的数据的代码
(4)将爬取网页和大模型写的提取用户需要数据的代码结合,得到爬取指定网页信息的Action
(5)将提取后的数据,让大模型进行处理和分析,得到网页数据分析的Action
(6)将这两个Action组合,就可以得到一个特定网页的Watcher Role
下面我们一步一步来实现。
2. 解析用户指令(分析用户需求)
写爬虫代码是需要先知道要爬取的网页URL、网页数据提取需求是什么,最简单的方式就是用户自己填写,但是这就让我们的入参变得复杂。所以,我们首先实现一个解析用户指令的代码,用户只需用一句自然语言输入自己的需求,代码自动解析出网页URL、网页数据提取需求等,即:将用户的输入结构化。
2.1 完整代码及注释
# 加载 .env 到环境变量
# from dotenv import load_dotenv, find_dotenv
# _ = load_dotenv(find_dotenv())from metagpt.actions.action_node import ActionNode
from metagpt.actions.action import Action
import asyncio## 分析用户的要求语言
LANGUAGE = ActionNode(key="language",expected_type=str,instruction="Provide the language used in the project, typically matching the user's requirement language.",example="en_us",
)## 分析用户的订阅推送时间
CRON_EXPRESSION = ActionNode(key="Cron Expression",expected_type=str,instruction="If the user requires scheduled triggering, please provide the corresponding 5-field cron expression. ""Otherwise, leave it blank.",example="",
)## 分析用户订阅的网址URL,可以是列表
CRAWLER_URL_LIST = ActionNode(key="Crawler URL List",expected_type=list[str],instruction="List the URLs user want to crawl. Leave it blank if not provided in the User Requirement.",example=["https://example1.com", "https://example2.com"],
)## 分析用户所需要的网站数据
PAGE_CONTENT_EXTRACTION = ActionNode(key="Page Content Extraction",expected_type=str,instruction="Specify the requirements and tips to extract from the crawled web pages based on User Requirement.",example="Retrieve the titles and content of articles published today.",
)## 分析用户所需要的汇总数据的方式
CRAWL_POST_PROCESSING = ActionNode(key="Crawl Post Processing",expected_type=str,instruction="Specify the processing to be applied to the crawled content, such as summarizing today's news.",example="Generate a summary of today's news articles.",
)## 补充说明,如果url或定时器解析为空,则提示用户补充
INFORMATION_SUPPLEMENT = ActionNode(key="Information Supplement",expected_type=str,instruction="If unable to obtain the Cron Expression, prompt the user to provide the time to receive subscription ""messages. If unable to obtain the URL List Crawler, prompt the user to provide the URLs they want to crawl. Keep it ""blank if everything is clear",example="",
)NODES = [LANGUAGE,CRON_EXPRESSION,CRAWLER_URL_LIST,PAGE_CONTENT_EXTRACTION,CRAWL_POST_PROCESSING,INFORMATION_SUPPLEMENT,
]PARSE_SUB_REQUIREMENTS_NODE = ActionNode.from_children("ParseSubscriptionReq", NODES)## 解析用户的需求的Action
PARSE_SUB_REQUIREMENT_TEMPLATE = """
### User Requirement
{requirements}
"""class ParseSubRequirement(Action):async def run(self, requirements):requirements = "\n".join(i.content for i in requirements)context = PARSE_SUB_REQUIREMENT_TEMPLATE.format(requirements=requirements)node = await PARSE_SUB_REQUIREMENTS_NODE.fill(context=context, llm=self.llm)return nodeif __name__ == "__main__":from metagpt.schema import Messageasyncio.run(ParseSubRequirement().run([Message("从36kr创投平台https://pitchhub.36kr.com/financing-flash 爬取所有初创企业融资的信息,获取标题,链接, 时间,总结今天的融资新闻,然后在晚上七点半送给我")]))
2.2 运行结果
3. 利用大模型写爬虫代码
3.1 对html内容进行精简
一般html元素太多,所以调用大模型时token数非常大,甚至一次对话都无法接收一个完整的html,我们可以先对网页内容做一下简化,因为对元素进行定位一般用css selector就够了,所以我们可以主要提供html的class属性信息,另外可以将html转成css表达式和对应的内容提供给llm,从而减少token的消耗。
def get_outline(page):soup = _get_soup(page.html)outline = []def process_element(element, depth):name = element.nameif not name:returnif name in ["script", "style"]:returnelement_info = {"name": element.name, "depth": depth}if name in ["svg"]:element_info["text"] = Noneoutline.append(element_info)returnelement_info["text"] = element.string# Check if the element has an "id" attributeif "id" in element.attrs:element_info["id"] = element["id"]if "class" in element.attrs:element_info["class"] = element["class"]outline.append(element_info)for child in element.children:process_element(child, depth + 1)for element in soup.body.children:process_element(element, 1)return outline
3.2 利用大模型写爬虫代码
PROMPT_TEMPLATE = """Please complete the web page crawler parse function to achieve the User Requirement. The parse \
function should take a BeautifulSoup object as input, which corresponds to the HTML outline provided in the Context.```python
from bs4 import BeautifulSoup# only complete the parse function
def parse(soup: BeautifulSoup):...# Return the object that the user wants to retrieve, don't use print## User Requirement
{requirement}## ContextThe outline of html page to scrabe is show like below:```tree
{outline}
"""class WriteCrawlerCode(Action):async def run(self, requirement):requirement: Message = requirement[-1] ## 上一步解析完成的用户需求结果data = requirement.instruct_content.dict()urls = data["Crawler URL List"] ## 获取用户想要的网页URLquery = data["Page Content Extraction"] ## 获取用户想关注的网页数据codes = {}for url in urls:codes[url] = await self._write_code(url, query) ## 每个URL的爬虫程序return "\n".join(f"# {url}\n{code}" for url, code in codes.items())async def _write_code(self, url, query):page = await WebBrowserEngine().run(url)outline = get_outline(page)outline = "\n".join(f"{' '*i['depth']}{'.'.join([i['name'], *i.get('class', [])])}: {i['text'] if i['text'] else ''}"for i in outline)code_rsp = await self._aask(PROMPT_TEMPLATE.format(outline=outline, requirement=query))code = CodeParser.parse_code(block="", text=code_rsp)return code
3.3 补充代码,测试本节程序
- 引入所需包
import asyncio
from metagpt.actions.action import Action
from metagpt.schema import Message
from metagpt.tools.web_browser_engine import WebBrowserEngine
from metagpt.utils.common import CodeParser
from metagpt.utils.parse_html import _get_soup
- main运行函数
if __name__ == "__main__":from metagpt.actions.action_node import ActionNodecls = ActionNode.create_model_class("ActionModel", {"Cron Expression": (str, ...),"Crawler URL List": (list[str], ...),"Page Content Extraction": (str, ...),"Crawl Post Processing": (str, ...),})data = {"Cron Expression": "0 30 19 * * *","Crawler URL List": ["https://pitchhub.36kr.com/financing-flash"],"Page Content Extraction": "从36kr创投平台爬取所有初创企业融资的信息,获取标题,链接, 时间。","Crawl Post Processing": "总结今天的融资新闻。",}asyncio.run(WriteCrawlerCode().run([Message(instruct_content=cls(**data))]))
3.4 运行结果及踩坑
3.4.1 运行结果
3.4.2 坑一:No module named ‘playwright’
- 解决方法:
pip install playwright
4. 爬虫工程师角色定义:CrawlerEngineer
# 定义爬虫工程师角色
from metagpt.roles import Role
class CrawlerEngineer(Role):name: str = "同学小张的专属爬虫工程师"profile: str = "Crawling Engineer"goal: str = "Write elegant, readable, extensible, efficient code"constraints: str = "The code should conform to standards like PEP8 and be modular and maintainable"def __init__(self, **kwargs) -> None:super().__init__(**kwargs)self._init_actions([WriteCrawlerCode])self._watch([ParseSubRequirement]) ## 触发?
爬虫工程师角色的代码中,初始化了一个写爬虫代码的Action,为WriteCrawlerCode。那这个角色什么时候触发运行呢?答案在这一句:self._watch([ParseSubRequirement])
,当解析完用户需求之后,会触发该角色的运行,触发写代码的动作。
5. 订阅助手角色定义:SubscriptionAssistant
class SubscriptionAssistant(Role):"""Analyze user subscription requirements."""name: str = "同学小张的订阅助手"profile: str = "Subscription Assistant"goal: str = "analyze user subscription requirements to provide personalized subscription services."constraints: str = "utilize the same language as the User Requirement"def __init__(self, **kwargs) -> None:super().__init__(**kwargs)self._init_actions([ParseSubRequirement, RunSubscription]) ## 2. 先解析用户需求,然后运行订阅self._watch([UserRequirement, WriteCrawlerCode]) ## 触发?async def _think(self) -> bool:cause_by = self.rc.history[-1].cause_byif cause_by == any_to_str(UserRequirement):state = 0elif cause_by == any_to_str(WriteCrawlerCode):state = 1if self.rc.state == state:self.rc.todo = Nonereturn Falseself._set_state(state)return True
订阅助手角色定义的代码中,初始化了两个Action,一个是解析用户需求,一个是RunSubscription,运行智能体。
看下它是怎么运作的:
(1)self._watch([UserRequirement, WriteCrawlerCode])
它观察了这两个输入,用户输入和写完爬虫代码。
(2)_think函数
当上一次的信息是来自用户输入,则运行ParseSubRequirement,如果上一次输入是写爬虫代码的Action,则运行RunSubscription。
6. 运行订阅智能体的Action:RunSubscription
6.1 总结信息的Action
有了用户需求,有了爬虫代码,还缺的就是将爬下来的数据总结成用户需求中想要的内容。下面的SubAction就是干这事儿的(类比之前订阅智能体中的AnalysisOSSTrending
):
data是从所有Url网页中爬下来的数据,process是之前解析的用户对数据的处理需求。
SUB_ACTION_TEMPLATE = """
## Requirements
Answer the question based on the provided context {process}. If the question cannot be answered, please summarize the context.## context
{data}"
"""class SubAction(Action):async def run(self, *args, **kwargs):pages = await WebBrowserEngine().run(*urls)if len(urls) == 1:pages = [pages]data = []for url, page in zip(urls, pages):data.append(getattr(modules[url], "parse")(page.soup))## 8. 根据用户的数据需求,和爬取的网页数据,让大模型总结return await self.llm.aask(SUB_ACTION_TEMPLATE.format(process=process, data=data))
6.2 运行订阅智能体的Action
# 运行订阅智能体的Action
class RunSubscription(Action):async def run(self, msgs):from metagpt.roles.role import Rolefrom metagpt.subscription import SubscriptionRunnercode = msgs[-1].content ## 4. 获取爬虫代码req = msgs[-2].instruct_content.dict() ## 5. 获取用户需求urls = req["Crawler URL List"]process = req["Crawl Post Processing"]spec = req["Cron Expression"]SubAction = self.create_sub_action_cls(urls, code, process) ## 6. 创建一个Action,urls网页链接、code爬虫代码、process用户需求的数据SubRole = type("SubRole", (Role,), {}) ## 7. 定时触发的Rolerole = SubRole()role.init_actions([SubAction])runner = SubscriptionRunner()async def callback(msg):print(msg)await runner.subscribe(role, CronTrigger(spec), callback)await runner.run()@staticmethoddef create_sub_action_cls(urls: list[str], code: str, process: str):modules = {}for url in urls[::-1]:code, current = code.rsplit(f"# {url}", maxsplit=1)name = uuid4().hexmodule = type(sys)(name)exec(current, module.__dict__)modules[url] = moduleclass SubAction(Action):async def run(self, *args, **kwargs):pages = await WebBrowserEngine().run(*urls)if len(urls) == 1:pages = [pages]data = []for url, page in zip(urls, pages):data.append(getattr(modules[url], "parse")(page.soup))## 8. 根据用户的数据需求,和爬取的网页数据,让大模型总结return await self.llm.aask(SUB_ACTION_TEMPLATE.format(process=process, data=data))return SubAction
7. 定时器代码和callback代码
这部分代码就可以直接复用原来订阅智能体的代码了,通用的。
7.1 定时器
class CronTrigger:def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None) -> None:self.crontab = crontab(spec, tz=tz)def __aiter__(self):return selfasync def __anext__(self):await self.crontab.next()return Message()
7.2 callback
class WxPusherClient:def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):self.base_url = base_urlself.token = token or os.environ["WXPUSHER_TOKEN"] # 5.1 从环境变量中获取token,所以你需要在环境变量中配置WXPUSHER_TOKEN或在配置文件中设置WXPUSHER_TOKENasync def send_message(self,content,summary: Optional[str] = None,content_type: int = 1,topic_ids: Optional[list[int]] = None,uids: Optional[list[int]] = None,verify: bool = False,url: Optional[str] = None,):payload = {"appToken": self.token,"content": content,"summary": summary,"contentType": content_type,"topicIds": topic_ids or [],# 5.2 从环境变量中获取uids,所以你需要在环境变量中配置WXPUSHER_UIDS# uids是你想推送给哪个微信,必须是关注了你这个订阅号的微信才可以知道uid"uids": uids or os.environ["WXPUSHER_UIDS"].split(","), "verifyPay": verify,"url": url,}url = f"{self.base_url}/api/send/message"return await self._request("POST", url, json=payload)async def _request(self, method, url, **kwargs):async with aiohttp.ClientSession() as session:async with session.request(method, url, **kwargs) as response:response.raise_for_status()return await response.json()# 5.3 微信callback wrapper,使用WxPusherClient给指定微信推送消息
async def wxpusher_callback(msg: Message):client = WxPusherClient()await client.send_message(msg.content, content_type=3)
7.3 集成callback
# 运行订阅智能体的Action
class RunSubscription(Action):async def run(self, msgs):······ 与前文代码一样callbacks = []callbacks.append(wxpusher_callback)async def callback(msg):print(msg)await asyncio.gather(*(call(msg) for call in callbacks)) # 遍历所有回调函数,触发回调,分发消息······ 与前文代码一样
8. 整体运行
创建了一个Team,其中有SubscriptionAssistant角色和CrawlerEngineer角色。
用户输入信息,然后run_project
,SubscriptionAssistant首先观察到用户输入信息,触发ParseSubRequirement动作。
if __name__ == "__main__":import asynciofrom metagpt.team import Teamteam = Team()team.hire([SubscriptionAssistant(), CrawlerEngineer()])team.run_project("从36kr创投平台https://pitchhub.36kr.com/financing-flash爬取所有初创企业融资的信息,获取标题,链接, 时间,总结今天的融资新闻,然后在早上10:56送给我")asyncio.run(team.run())
运行结果
9. 可能有的坑
9.1 更新MetaGPT源码后,运行报错:No module named ‘zhipuai.types’
- 解决方法
pip install zhipuai
本文完。按照教程跑通了订阅智能体进阶的流程。
这好像是教程中唯一的一个多智能体的例子。所以里面涉及了多智能体之间的交互方式,懵懵懂懂,需要深入了解下(_watch、_observe等)。咱们后面的文章细讲。
这篇关于【AI Agent系列】【MetaGPT】7. 一句话订阅专属信息 - 订阅智能体进阶,实现一个更通用的订阅智能体的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!