【AI Agent系列】【MetaGPT】7. 一句话订阅专属信息 - 订阅智能体进阶,实现一个更通用的订阅智能体

本文主要是介绍【AI Agent系列】【MetaGPT】7. 一句话订阅专属信息 - 订阅智能体进阶,实现一个更通用的订阅智能体,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 0. 前置推荐阅读
    • 1. 本文内容
    • 2. 解析用户指令(分析用户需求)
      • 2.1 完整代码及注释
      • 2.2 运行结果
    • 3. 利用大模型写爬虫代码
      • 3.1 对html内容进行精简
      • 3.2 利用大模型写爬虫代码
      • 3.3 补充代码,测试本节程序
      • 3.4 运行结果及踩坑
        • 3.4.1 运行结果
        • 3.4.2 坑一:No module named 'playwright'
    • 4. 爬虫工程师角色定义:CrawlerEngineer
    • 5. 订阅助手角色定义:SubscriptionAssistant
    • 6. 运行订阅智能体的Action:RunSubscription
      • 6.1 总结信息的Action
      • 6.2 运行订阅智能体的Action
    • 7. 定时器代码和callback代码
      • 7.1 定时器
      • 7.2 callback
      • 7.3 集成callback
    • 8. 整体运行
      • 运行结果
    • 9. 可能有的坑
      • 9.1 更新MetaGPT源码后,运行报错:No module named 'zhipuai.types'

0. 前置推荐阅读

  • 订阅智能体实战

    • 【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件
  • ActionNode基础与实战

    • 【AI的未来 - AI Agent系列】【MetaGPT】4. ActionNode从理论到实战

    • 【AI的未来 - AI Agent系列】【MetaGPT】4.1 细说我在ActionNode实战中踩的那些坑

1. 本文内容

跟着《MetaGPT智能体开发入门》教程,对上次实现的订阅智能体进行优化和进阶,也用上ActionNode模块。

上次实现订阅智能体的文章可参考 【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件 。在上次这篇文章中我们虽然完成了订阅智能体的功能,但是这个智能体的作用是被局限在某一特定领域的,当我们需要订阅另外一个数据源的分析结果时,我们需要手动再写一个Role,这个智能体不太通用。

如何实现一个更通用的订阅智能体?实现思路如下:

(1)解析用户指令(分析用户需求,我们期望用户用一句自然语言下指令即可得到想要的)
(2)爬取网页
(3)让大模型写从html中提取用户需要的数据的代码
(4)将爬取网页和大模型写的提取用户需要数据的代码结合,得到爬取指定网页信息的Action
(5)将提取后的数据,让大模型进行处理和分析,得到网页数据分析的Action
(6)将这两个Action组合,就可以得到一个特定网页的Watcher Role

下面我们一步一步来实现。

2. 解析用户指令(分析用户需求)

写爬虫代码是需要先知道要爬取的网页URL、网页数据提取需求是什么,最简单的方式就是用户自己填写,但是这就让我们的入参变得复杂。所以,我们首先实现一个解析用户指令的代码,用户只需用一句自然语言输入自己的需求,代码自动解析出网页URL、网页数据提取需求等,即:将用户的输入结构化

2.1 完整代码及注释

# 加载 .env 到环境变量
# from dotenv import load_dotenv, find_dotenv
# _ = load_dotenv(find_dotenv())from metagpt.actions.action_node import ActionNode
from metagpt.actions.action import Action
import asyncio## 分析用户的要求语言
LANGUAGE = ActionNode(key="language",expected_type=str,instruction="Provide the language used in the project, typically matching the user's requirement language.",example="en_us",
)## 分析用户的订阅推送时间
CRON_EXPRESSION = ActionNode(key="Cron Expression",expected_type=str,instruction="If the user requires scheduled triggering, please provide the corresponding 5-field cron expression. ""Otherwise, leave it blank.",example="",
)## 分析用户订阅的网址URL,可以是列表
CRAWLER_URL_LIST = ActionNode(key="Crawler URL List",expected_type=list[str],instruction="List the URLs user want to crawl. Leave it blank if not provided in the User Requirement.",example=["https://example1.com", "https://example2.com"],
)## 分析用户所需要的网站数据
PAGE_CONTENT_EXTRACTION = ActionNode(key="Page Content Extraction",expected_type=str,instruction="Specify the requirements and tips to extract from the crawled web pages based on User Requirement.",example="Retrieve the titles and content of articles published today.",
)## 分析用户所需要的汇总数据的方式
CRAWL_POST_PROCESSING = ActionNode(key="Crawl Post Processing",expected_type=str,instruction="Specify the processing to be applied to the crawled content, such as summarizing today's news.",example="Generate a summary of today's news articles.",
)## 补充说明,如果url或定时器解析为空,则提示用户补充
INFORMATION_SUPPLEMENT = ActionNode(key="Information Supplement",expected_type=str,instruction="If unable to obtain the Cron Expression, prompt the user to provide the time to receive subscription ""messages. If unable to obtain the URL List Crawler, prompt the user to provide the URLs they want to crawl. Keep it ""blank if everything is clear",example="",
)NODES = [LANGUAGE,CRON_EXPRESSION,CRAWLER_URL_LIST,PAGE_CONTENT_EXTRACTION,CRAWL_POST_PROCESSING,INFORMATION_SUPPLEMENT,
]PARSE_SUB_REQUIREMENTS_NODE = ActionNode.from_children("ParseSubscriptionReq", NODES)## 解析用户的需求的Action
PARSE_SUB_REQUIREMENT_TEMPLATE = """
### User Requirement
{requirements}
"""class ParseSubRequirement(Action):async def run(self, requirements):requirements = "\n".join(i.content for i in requirements)context = PARSE_SUB_REQUIREMENT_TEMPLATE.format(requirements=requirements)node = await PARSE_SUB_REQUIREMENTS_NODE.fill(context=context, llm=self.llm)return nodeif __name__ == "__main__":from metagpt.schema import Messageasyncio.run(ParseSubRequirement().run([Message("从36kr创投平台https://pitchhub.36kr.com/financing-flash 爬取所有初创企业融资的信息,获取标题,链接, 时间,总结今天的融资新闻,然后在晚上七点半送给我")]))

2.2 运行结果

在这里插入图片描述

3. 利用大模型写爬虫代码

3.1 对html内容进行精简

一般html元素太多,所以调用大模型时token数非常大,甚至一次对话都无法接收一个完整的html,我们可以先对网页内容做一下简化,因为对元素进行定位一般用css selector就够了,所以我们可以主要提供html的class属性信息,另外可以将html转成css表达式和对应的内容提供给llm,从而减少token的消耗。

def get_outline(page):soup = _get_soup(page.html)outline = []def process_element(element, depth):name = element.nameif not name:returnif name in ["script", "style"]:returnelement_info = {"name": element.name, "depth": depth}if name in ["svg"]:element_info["text"] = Noneoutline.append(element_info)returnelement_info["text"] = element.string# Check if the element has an "id" attributeif "id" in element.attrs:element_info["id"] = element["id"]if "class" in element.attrs:element_info["class"] = element["class"]outline.append(element_info)for child in element.children:process_element(child, depth + 1)for element in soup.body.children:process_element(element, 1)return outline

3.2 利用大模型写爬虫代码

PROMPT_TEMPLATE = """Please complete the web page crawler parse function to achieve the User Requirement. The parse \
function should take a BeautifulSoup object as input, which corresponds to the HTML outline provided in the Context.```python
from bs4 import BeautifulSoup# only complete the parse function
def parse(soup: BeautifulSoup):...# Return the object that the user wants to retrieve, don't use print## User Requirement
{requirement}## ContextThe outline of html page to scrabe is show like below:```tree
{outline}
"""class WriteCrawlerCode(Action):async def run(self, requirement):requirement: Message = requirement[-1] ## 上一步解析完成的用户需求结果data = requirement.instruct_content.dict()urls = data["Crawler URL List"] ## 获取用户想要的网页URLquery = data["Page Content Extraction"] ## 获取用户想关注的网页数据codes = {}for url in urls:codes[url] = await self._write_code(url, query) ## 每个URL的爬虫程序return "\n".join(f"# {url}\n{code}" for url, code in codes.items())async def _write_code(self, url, query):page = await WebBrowserEngine().run(url)outline = get_outline(page)outline = "\n".join(f"{' '*i['depth']}{'.'.join([i['name'], *i.get('class', [])])}: {i['text'] if i['text'] else ''}"for i in outline)code_rsp = await self._aask(PROMPT_TEMPLATE.format(outline=outline, requirement=query))code = CodeParser.parse_code(block="", text=code_rsp)return code

3.3 补充代码,测试本节程序

  • 引入所需包
import asyncio
from metagpt.actions.action import Action
from metagpt.schema import Message
from metagpt.tools.web_browser_engine import WebBrowserEngine
from metagpt.utils.common import CodeParser
from metagpt.utils.parse_html import _get_soup
  • main运行函数
if __name__ == "__main__":from metagpt.actions.action_node import ActionNodecls = ActionNode.create_model_class("ActionModel", {"Cron Expression": (str, ...),"Crawler URL List": (list[str], ...),"Page Content Extraction": (str, ...),"Crawl Post Processing": (str, ...),})data = {"Cron Expression": "0 30 19 * * *","Crawler URL List": ["https://pitchhub.36kr.com/financing-flash"],"Page Content Extraction": "从36kr创投平台爬取所有初创企业融资的信息,获取标题,链接, 时间。","Crawl Post Processing": "总结今天的融资新闻。",}asyncio.run(WriteCrawlerCode().run([Message(instruct_content=cls(**data))]))

3.4 运行结果及踩坑

3.4.1 运行结果

在这里插入图片描述

3.4.2 坑一:No module named ‘playwright’

在这里插入图片描述

  • 解决方法:
pip install playwright

4. 爬虫工程师角色定义:CrawlerEngineer

# 定义爬虫工程师角色
from metagpt.roles import Role
class CrawlerEngineer(Role):name: str = "同学小张的专属爬虫工程师"profile: str = "Crawling Engineer"goal: str = "Write elegant, readable, extensible, efficient code"constraints: str = "The code should conform to standards like PEP8 and be modular and maintainable"def __init__(self, **kwargs) -> None:super().__init__(**kwargs)self._init_actions([WriteCrawlerCode])self._watch([ParseSubRequirement])  ## 触发?

爬虫工程师角色的代码中,初始化了一个写爬虫代码的Action,为WriteCrawlerCode。那这个角色什么时候触发运行呢?答案在这一句:self._watch([ParseSubRequirement]),当解析完用户需求之后,会触发该角色的运行,触发写代码的动作。

5. 订阅助手角色定义:SubscriptionAssistant

class SubscriptionAssistant(Role):"""Analyze user subscription requirements."""name: str = "同学小张的订阅助手"profile: str = "Subscription Assistant"goal: str = "analyze user subscription requirements to provide personalized subscription services."constraints: str = "utilize the same language as the User Requirement"def __init__(self, **kwargs) -> None:super().__init__(**kwargs)self._init_actions([ParseSubRequirement, RunSubscription]) ## 2. 先解析用户需求,然后运行订阅self._watch([UserRequirement, WriteCrawlerCode])  ## 触发?async def _think(self) -> bool:cause_by = self.rc.history[-1].cause_byif cause_by == any_to_str(UserRequirement):state = 0elif cause_by == any_to_str(WriteCrawlerCode):state = 1if self.rc.state == state:self.rc.todo = Nonereturn Falseself._set_state(state)return True

订阅助手角色定义的代码中,初始化了两个Action,一个是解析用户需求,一个是RunSubscription,运行智能体。
看下它是怎么运作的:
(1)self._watch([UserRequirement, WriteCrawlerCode])
它观察了这两个输入,用户输入和写完爬虫代码。
(2)_think函数
当上一次的信息是来自用户输入,则运行ParseSubRequirement,如果上一次输入是写爬虫代码的Action,则运行RunSubscription。

6. 运行订阅智能体的Action:RunSubscription

6.1 总结信息的Action

有了用户需求,有了爬虫代码,还缺的就是将爬下来的数据总结成用户需求中想要的内容。下面的SubAction就是干这事儿的(类比之前订阅智能体中的AnalysisOSSTrending):

data是从所有Url网页中爬下来的数据,process是之前解析的用户对数据的处理需求。

SUB_ACTION_TEMPLATE = """
## Requirements
Answer the question based on the provided context {process}. If the question cannot be answered, please summarize the context.## context
{data}"
"""class SubAction(Action):async def run(self, *args, **kwargs):pages = await WebBrowserEngine().run(*urls)if len(urls) == 1:pages = [pages]data = []for url, page in zip(urls, pages):data.append(getattr(modules[url], "parse")(page.soup))## 8. 根据用户的数据需求,和爬取的网页数据,让大模型总结return await self.llm.aask(SUB_ACTION_TEMPLATE.format(process=process, data=data))

6.2 运行订阅智能体的Action

# 运行订阅智能体的Action
class RunSubscription(Action):async def run(self, msgs):from metagpt.roles.role import Rolefrom metagpt.subscription import SubscriptionRunnercode = msgs[-1].content ## 4. 获取爬虫代码req = msgs[-2].instruct_content.dict() ## 5. 获取用户需求urls = req["Crawler URL List"]process = req["Crawl Post Processing"]spec = req["Cron Expression"]SubAction = self.create_sub_action_cls(urls, code, process) ## 6. 创建一个Action,urls网页链接、code爬虫代码、process用户需求的数据SubRole = type("SubRole", (Role,), {}) ## 7. 定时触发的Rolerole = SubRole()role.init_actions([SubAction])runner = SubscriptionRunner()async def callback(msg):print(msg)await runner.subscribe(role, CronTrigger(spec), callback)await runner.run()@staticmethoddef create_sub_action_cls(urls: list[str], code: str, process: str):modules = {}for url in urls[::-1]:code, current = code.rsplit(f"# {url}", maxsplit=1)name = uuid4().hexmodule = type(sys)(name)exec(current, module.__dict__)modules[url] = moduleclass SubAction(Action):async def run(self, *args, **kwargs):pages = await WebBrowserEngine().run(*urls)if len(urls) == 1:pages = [pages]data = []for url, page in zip(urls, pages):data.append(getattr(modules[url], "parse")(page.soup))## 8. 根据用户的数据需求,和爬取的网页数据,让大模型总结return await self.llm.aask(SUB_ACTION_TEMPLATE.format(process=process, data=data))return SubAction

7. 定时器代码和callback代码

这部分代码就可以直接复用原来订阅智能体的代码了,通用的。

7.1 定时器

class CronTrigger:def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None) -> None:self.crontab = crontab(spec, tz=tz)def __aiter__(self):return selfasync def __anext__(self):await self.crontab.next()return Message()

7.2 callback

class WxPusherClient:def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):self.base_url = base_urlself.token = token or os.environ["WXPUSHER_TOKEN"] # 5.1 从环境变量中获取token,所以你需要在环境变量中配置WXPUSHER_TOKEN或在配置文件中设置WXPUSHER_TOKENasync def send_message(self,content,summary: Optional[str] = None,content_type: int = 1,topic_ids: Optional[list[int]] = None,uids: Optional[list[int]] = None,verify: bool = False,url: Optional[str] = None,):payload = {"appToken": self.token,"content": content,"summary": summary,"contentType": content_type,"topicIds": topic_ids or [],# 5.2 从环境变量中获取uids,所以你需要在环境变量中配置WXPUSHER_UIDS# uids是你想推送给哪个微信,必须是关注了你这个订阅号的微信才可以知道uid"uids": uids or os.environ["WXPUSHER_UIDS"].split(","), "verifyPay": verify,"url": url,}url = f"{self.base_url}/api/send/message"return await self._request("POST", url, json=payload)async def _request(self, method, url, **kwargs):async with aiohttp.ClientSession() as session:async with session.request(method, url, **kwargs) as response:response.raise_for_status()return await response.json()# 5.3 微信callback wrapper,使用WxPusherClient给指定微信推送消息
async def wxpusher_callback(msg: Message):client = WxPusherClient()await client.send_message(msg.content, content_type=3)

7.3 集成callback

# 运行订阅智能体的Action
class RunSubscription(Action):async def run(self, msgs):······ 与前文代码一样callbacks = []callbacks.append(wxpusher_callback)async def callback(msg):print(msg)await asyncio.gather(*(call(msg) for call in callbacks)) # 遍历所有回调函数,触发回调,分发消息······ 与前文代码一样

8. 整体运行

创建了一个Team,其中有SubscriptionAssistant角色和CrawlerEngineer角色。

用户输入信息,然后run_project,SubscriptionAssistant首先观察到用户输入信息,触发ParseSubRequirement动作。

if __name__ == "__main__":import asynciofrom metagpt.team import Teamteam = Team()team.hire([SubscriptionAssistant(), CrawlerEngineer()])team.run_project("从36kr创投平台https://pitchhub.36kr.com/financing-flash爬取所有初创企业融资的信息,获取标题,链接, 时间,总结今天的融资新闻,然后在早上10:56送给我")asyncio.run(team.run())

运行结果

在这里插入图片描述

9. 可能有的坑

9.1 更新MetaGPT源码后,运行报错:No module named ‘zhipuai.types’

在这里插入图片描述

  • 解决方法
pip install zhipuai

本文完。按照教程跑通了订阅智能体进阶的流程。

这好像是教程中唯一的一个多智能体的例子。所以里面涉及了多智能体之间的交互方式,懵懵懂懂,需要深入了解下(_watch、_observe等)。咱们后面的文章细讲。

这篇关于【AI Agent系列】【MetaGPT】7. 一句话订阅专属信息 - 订阅智能体进阶,实现一个更通用的订阅智能体的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/635800

相关文章

SpringBoot实现微信小程序支付功能

《SpringBoot实现微信小程序支付功能》小程序支付功能已成为众多应用的核心需求之一,本文主要介绍了SpringBoot实现微信小程序支付功能,文中通过示例代码介绍的非常详细,对大家的学习或者工作... 目录一、引言二、准备工作(一)微信支付商户平台配置(二)Spring Boot项目搭建(三)配置文件

基于Python实现高效PPT转图片工具

《基于Python实现高效PPT转图片工具》在日常工作中,PPT是我们常用的演示工具,但有时候我们需要将PPT的内容提取为图片格式以便于展示或保存,所以本文将用Python实现PPT转PNG工具,希望... 目录1. 概述2. 功能使用2.1 安装依赖2.2 使用步骤2.3 代码实现2.4 GUI界面3.效

MySQL更新某个字段拼接固定字符串的实现

《MySQL更新某个字段拼接固定字符串的实现》在MySQL中,我们经常需要对数据库中的某个字段进行更新操作,本文就来介绍一下MySQL更新某个字段拼接固定字符串的实现,感兴趣的可以了解一下... 目录1. 查看字段当前值2. 更新字段拼接固定字符串3. 验证更新结果mysql更新某个字段拼接固定字符串 -

java实现延迟/超时/定时问题

《java实现延迟/超时/定时问题》:本文主要介绍java实现延迟/超时/定时问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java实现延迟/超时/定时java 每间隔5秒执行一次,一共执行5次然后结束scheduleAtFixedRate 和 schedu

Java Optional避免空指针异常的实现

《JavaOptional避免空指针异常的实现》空指针异常一直是困扰开发者的常见问题之一,本文主要介绍了JavaOptional避免空指针异常的实现,帮助开发者编写更健壮、可读性更高的代码,减少因... 目录一、Optional 概述二、Optional 的创建三、Optional 的常用方法四、Optio

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

Spring Boot项目中结合MyBatis实现MySQL的自动主从切换功能

《SpringBoot项目中结合MyBatis实现MySQL的自动主从切换功能》:本文主要介绍SpringBoot项目中结合MyBatis实现MySQL的自动主从切换功能,本文分步骤给大家介绍的... 目录原理解析1. mysql主从复制(Master-Slave Replication)2. 读写分离3.

Redis实现延迟任务的三种方法详解

《Redis实现延迟任务的三种方法详解》延迟任务(DelayedTask)是指在未来的某个时间点,执行相应的任务,本文为大家整理了三种常见的实现方法,感兴趣的小伙伴可以参考一下... 目录1.前言2.Redis如何实现延迟任务3.代码实现3.1. 过期键通知事件实现3.2. 使用ZSet实现延迟任务3.3

基于Python和MoviePy实现照片管理和视频合成工具

《基于Python和MoviePy实现照片管理和视频合成工具》在这篇博客中,我们将详细剖析一个基于Python的图形界面应用程序,该程序使用wxPython构建用户界面,并结合MoviePy、Pill... 目录引言项目概述代码结构分析1. 导入和依赖2. 主类:PhotoManager初始化方法:__in

springboot filter实现请求响应全链路拦截

《springbootfilter实现请求响应全链路拦截》这篇文章主要为大家详细介绍了SpringBoot如何结合Filter同时拦截请求和响应,从而实现​​日志采集自动化,感兴趣的小伙伴可以跟随小... 目录一、为什么你需要这个过滤器?​​​二、核心实现:一个Filter搞定双向数据流​​​​三、完整代码