MetaGPT入门(三)-OSS订阅智能体

2024-01-23 21:44
文章标签 入门 智能 订阅 oss metagpt

本文主要是介绍MetaGPT入门(三)-OSS订阅智能体,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

‍‬‍‌⁡‍⁢⁤​‍​⁡⁡‬⁢‍​​⁡​​‬‬‍⁡​⁢⁡​​⁣⁡⁤​‍‌​​‌​‍⁣⁣​⁤⁢⁡​《MetaGPT智能体开发入门》教程 - 飞书云文档 (feishu.cn)

import asyncio
import os
import smtplib
import time
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Any, AsyncGenerator, Awaitable, Callable, Optionalimport aiohttp
import discord
import pandas as pd
from aiocron import crontab
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field
from pytz import BaseTzInfofrom metagpt.actions.action import Action
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message# 订阅模块,可以from metagpt.subscription import SubscriptionRunner导入,这里贴上代码供参考
class SubscriptionRunner(BaseModel):"""A simple wrapper to manage subscription tasks for different roles using asyncio.Example:>>> import asyncio>>> from metagpt.subscription import SubscriptionRunner>>> from metagpt.roles import Searcher>>> from metagpt.schema import Message>>> async def trigger():...     while True:...         yield Message("the latest news about OpenAI")...         await asyncio.sleep(3600 * 24)>>> async def callback(msg: Message):...     print(msg.content)>>> async def main():...     pb = SubscriptionRunner()...     await pb.subscribe(Searcher(), trigger(), callback)...     await pb.run()>>> asyncio.run(main())"""tasks: dict[Role, asyncio.Task] = Field(default_factory=dict)class Config:arbitrary_types_allowed = Trueasync def subscribe(self,role: Role,trigger: AsyncGenerator[Message, None],callback: Callable[[Message,],Awaitable[None],],):"""Subscribes a role to a trigger and sets up a callback to be called with the role's response.Args:role: The role to subscribe.trigger: An asynchronous generator that yields Messages to be processed by the role.callback: An asynchronous function to be called with the response from the role."""loop = asyncio.get_running_loop()async def _start_role():async for msg in trigger:resp = await role.run(msg)await callback(resp)self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")async def unsubscribe(self, role: Role):"""Unsubscribes a role from its trigger and cancels the associated task.Args:role: The role to unsubscribe."""task = self.tasks.pop(role)task.cancel()async def run(self, raise_exception: bool = True):"""Runs all subscribed tasks and handles their completion or exception.Args:raise_exception: _description_. Defaults to True.Raises:task.exception: _description_"""while True:for role, task in self.tasks.items():if task.done():if task.exception():if raise_exception:raise task.exception()logger.opt(exception=task.exception()).error(f"Task {task.get_name()} run error")else:logger.warning(f"Task {task.get_name()} has completed. ""If this is unexpected behavior, please check the trigger function.")self.tasks.pop(role)breakelse:await asyncio.sleep(1)# Actions 的实现
TRENDING_ANALYSIS_PROMPT = """# Requirements
You are a GitHub Trending Analyst, aiming to provide users with insightful and personalized recommendations based on the latest
GitHub Trends. Based on the context, fill in the following missing information, generate engaging and informative titles, 
ensuring users discover repositories aligned with their interests.# The title about Today's GitHub Trending
## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
---
# Format Example```
# [Title]## Today's Trends
Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
The top popular projects are Project1 and Project2.## The Trends Categories
1. Generative AI- [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]- [Project2](https://github/xx/project2): ...
...## Highlights of the List
1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
...
```---
# Github Trending
{trending}
"""class CrawlOSSTrending(Action):async def run(self, url: str = "https://github.com/trending"):async with aiohttp.ClientSession() as client:async with client.get(url, proxy=CONFIG.global_proxy) as response:response.raise_for_status()html = await response.text()soup = BeautifulSoup(html, 'html.parser')repositories = []for article in soup.select('article.Box-row'):repo_info = {}repo_info['name'] = article.select_one('h2 a').text.strip().replace("\n", "").replace(" ", "")repo_info['url'] = "https://github.com" + article.select_one('h2 a')['href'].strip()# Descriptiondescription_element = article.select_one('p')repo_info['description'] = description_element.text.strip() if description_element else None# Languagelanguage_element = article.select_one('span[itemprop="programmingLanguage"]')repo_info['language'] = language_element.text.strip() if language_element else None# Stars and Forksstars_element = article.select('a.Link--muted')[0]forks_element = article.select('a.Link--muted')[1]repo_info['stars'] = stars_element.text.strip()repo_info['forks'] = forks_element.text.strip()# Today's Starstoday_stars_element = article.select_one('span.d-inline-block.float-sm-right')repo_info['today_stars'] = today_stars_element.text.strip() if today_stars_element else Nonerepositories.append(repo_info)return repositoriesclass AnalysisOSSTrending(Action):async def run(self,trending: Any):return await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))# Role实现
class OssWatcher(Role):def __init__(self,name="Codey",profile="OssWatcher",goal="Generate an insightful GitHub Trending analysis report.",constraints="Only analyze based on the provided GitHub Trending data.",):super().__init__(name, profile, goal, constraints)self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])self._set_react_mode(react_mode="by_order")async def _act(self) -> Message:logger.info(f"{self._setting}: ready to {self._rc.todo}")# By choosing the Action by order under the hood# todo will be first SimpleWriteCode() then SimpleRunCode()todo = self._rc.todomsg = self.get_memories(k=1)[0]  # find the most k recent messagesresult = await todo.run(msg.content)msg = Message(content=str(result), role=self.profile, cause_by=type(todo))self._rc.memory.add(msg)return msg# Trigger
class OssInfo(BaseModel):url: strtimestamp: float = Field(default_factory=time.time)class GithubTrendingCronTrigger():def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:self.crontab = crontab(spec, tz=tz)self.url = urldef __aiter__(self):return selfasync def __anext__(self):await self.crontab.next()return Message(self.url, OssInfo(url=self.url))# callback
async def discord_callback(msg: Message):intents = discord.Intents.default()intents.message_content = Trueclient = discord.Client(intents=intents, proxy=CONFIG.global_proxy)token = os.environ["DISCORD_TOKEN"]channel_id = int(os.environ["DISCORD_CHANNEL_ID"])async with client:await client.login(token)channel = await client.fetch_channel(channel_id)lines = []for i in msg.content.splitlines():if i.startswith(("# ", "## ", "### ")):if lines:await channel.send("\n".join(lines))lines = []lines.append(i)if lines:await channel.send("\n".join(lines))class WxPusherClient:def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):self.base_url = base_urlself.token = token or os.environ["WXPUSHER_TOKEN"]async def send_message(self,content,summary: Optional[str] = None,content_type: int = 1,topic_ids: Optional[list[int]] = None,uids: Optional[list[int]] = None,verify: bool = False,url: Optional[str] = None,):payload = {"appToken": self.token,"content": content,"summary": summary,"contentType": content_type,"topicIds": topic_ids or [],"uids": uids or os.environ["WXPUSHER_UIDS"].split(","),"verifyPay": verify,"url": url,}url = f"{self.base_url}/api/send/message"return await self._request("POST", url, json=payload)async def _request(self, method, url, **kwargs):async with aiohttp.ClientSession() as session:async with session.request(method, url, **kwargs) as response:response.raise_for_status()return await response.json()async def wxpusher_callback(msg: Message):client = WxPusherClient()await client.send_message(msg.content, content_type=3)async def send_via_email(msg:Message):# 设置邮件信息sender = '123455@qq.com'receiver = '123455@qq.com'subject = 'Github Trending'body = '<br>'.join(msg.content)# 创建邮件对象msg = MIMEMultipart()msg['From'] = sendermsg['To'] = receivermsg['Subject'] = subject# 添加邮件正文msg.attach(MIMEText(body, 'html'))# 发送邮件try:server = smtplib.SMTP('smtp.qq.com', 587)server.starttls()#注意这个地方的密码不是邮箱密码,是授权码,不知道咋获取的搜索一下吧server.login(sender, 'authenticCODE')server.sendmail(sender, receiver, msg.as_string())server.quit()print('邮件发送成功')except Exception as e:print('邮件发送失败', e)# 运行入口,cron 表达式遵循特定的格式,通常是:分钟、小时、日、月、星期几。测试使用,1分钟一次,用几下就关掉程序啊,不然。。。
async def main(spec: str = "1 * * * *", discord: bool = False, wxpusher: bool = False, email:bool = True):callbacks = []if discord:callbacks.append(discord_callback)if wxpusher:callbacks.append(wxpusher_callback)if email:callbacks.append(send_via_email)if not callbacks:async def _print(msg: Message):print(msg.content)callbacks.append(_print)async def callback(msg):await asyncio.gather(*(call(msg) for call in callbacks))runner = SubscriptionRunner()await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback)await runner.run()if __name__ == "__main__":import firefire.Fire(main)

照抄的教程啊,增加了个发送邮件的功能,发送邮件功能测试过了,可以发送成功,但是整个的没运行成功,应该是没科学上网,打不开trending页面的事,时间紧张,有空再搞通或者爬个国内的网页。

这篇关于MetaGPT入门(三)-OSS订阅智能体的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/637625

相关文章

从入门到精通MySQL联合查询

《从入门到精通MySQL联合查询》:本文主要介绍从入门到精通MySQL联合查询,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下... 目录摘要1. 多表联合查询时mysql内部原理2. 内连接3. 外连接4. 自连接5. 子查询6. 合并查询7. 插入查询结果摘要前面我们学习了数据库设计时要满

Python实现对阿里云OSS对象存储的操作详解

《Python实现对阿里云OSS对象存储的操作详解》这篇文章主要为大家详细介绍了Python实现对阿里云OSS对象存储的操作相关知识,包括连接,上传,下载,列举等功能,感兴趣的小伙伴可以了解下... 目录一、直接使用代码二、详细使用1. 环境准备2. 初始化配置3. bucket配置创建4. 文件上传到os

从入门到精通C++11 <chrono> 库特性

《从入门到精通C++11<chrono>库特性》chrono库是C++11中一个非常强大和实用的库,它为时间处理提供了丰富的功能和类型安全的接口,通过本文的介绍,我们了解了chrono库的基本概念... 目录一、引言1.1 为什么需要<chrono>库1.2<chrono>库的基本概念二、时间段(Durat

解析C++11 static_assert及与Boost库的关联从入门到精通

《解析C++11static_assert及与Boost库的关联从入门到精通》static_assert是C++中强大的编译时验证工具,它能够在编译阶段拦截不符合预期的类型或值,增强代码的健壮性,通... 目录一、背景知识:传统断言方法的局限性1.1 assert宏1.2 #error指令1.3 第三方解决

从入门到精通MySQL 数据库索引(实战案例)

《从入门到精通MySQL数据库索引(实战案例)》索引是数据库的目录,提升查询速度,主要类型包括BTree、Hash、全文、空间索引,需根据场景选择,建议用于高频查询、关联字段、排序等,避免重复率高或... 目录一、索引是什么?能干嘛?核心作用:二、索引的 4 种主要类型(附通俗例子)1. BTree 索引(

Redis 配置文件使用建议redis.conf 从入门到实战

《Redis配置文件使用建议redis.conf从入门到实战》Redis配置方式包括配置文件、命令行参数、运行时CONFIG命令,支持动态修改参数及持久化,常用项涉及端口、绑定、内存策略等,版本8... 目录一、Redis.conf 是什么?二、命令行方式传参(适用于测试)三、运行时动态修改配置(不重启服务

MySQL DQL从入门到精通

《MySQLDQL从入门到精通》通过DQL,我们可以从数据库中检索出所需的数据,进行各种复杂的数据分析和处理,本文将深入探讨MySQLDQL的各个方面,帮助你全面掌握这一重要技能,感兴趣的朋友跟随小... 目录一、DQL 基础:SELECT 语句入门二、数据过滤:WHERE 子句的使用三、结果排序:ORDE

Python中OpenCV与Matplotlib的图像操作入门指南

《Python中OpenCV与Matplotlib的图像操作入门指南》:本文主要介绍Python中OpenCV与Matplotlib的图像操作指南,本文通过实例代码给大家介绍的非常详细,对大家的学... 目录一、环境准备二、图像的基本操作1. 图像读取、显示与保存 使用OpenCV操作2. 像素级操作3.

基于Python实现智能天气提醒助手

《基于Python实现智能天气提醒助手》这篇文章主要来和大家分享一个实用的Python天气提醒助手开发方案,这个工具可以方便地集成到青龙面板或其他调度框架中使用,有需要的小伙伴可以参考一下... 目录项目概述核心功能技术实现1. 天气API集成2. AI建议生成3. 消息推送环境配置使用方法完整代码项目特点

JavaScript实战:智能密码生成器开发指南

本文通过JavaScript实战开发智能密码生成器,详解如何运用crypto.getRandomValues实现加密级随机密码生成,包含多字符组合、安全强度可视化、易混淆字符排除等企业级功能。学习密码强度检测算法与信息熵计算原理,获取可直接嵌入项目的完整代码,提升Web应用的安全开发能力 目录