MetaGPT入门(三)-OSS订阅智能体

2024-01-23 21:44
文章标签 入门 智能 订阅 oss metagpt

本文主要是介绍MetaGPT入门(三)-OSS订阅智能体,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

‍‬‍‌⁡‍⁢⁤​‍​⁡⁡‬⁢‍​​⁡​​‬‬‍⁡​⁢⁡​​⁣⁡⁤​‍‌​​‌​‍⁣⁣​⁤⁢⁡​《MetaGPT智能体开发入门》教程 - 飞书云文档 (feishu.cn)

import asyncio
import os
import smtplib
import time
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Any, AsyncGenerator, Awaitable, Callable, Optionalimport aiohttp
import discord
import pandas as pd
from aiocron import crontab
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field
from pytz import BaseTzInfofrom metagpt.actions.action import Action
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message# 订阅模块,可以from metagpt.subscription import SubscriptionRunner导入,这里贴上代码供参考
class SubscriptionRunner(BaseModel):"""A simple wrapper to manage subscription tasks for different roles using asyncio.Example:>>> import asyncio>>> from metagpt.subscription import SubscriptionRunner>>> from metagpt.roles import Searcher>>> from metagpt.schema import Message>>> async def trigger():...     while True:...         yield Message("the latest news about OpenAI")...         await asyncio.sleep(3600 * 24)>>> async def callback(msg: Message):...     print(msg.content)>>> async def main():...     pb = SubscriptionRunner()...     await pb.subscribe(Searcher(), trigger(), callback)...     await pb.run()>>> asyncio.run(main())"""tasks: dict[Role, asyncio.Task] = Field(default_factory=dict)class Config:arbitrary_types_allowed = Trueasync def subscribe(self,role: Role,trigger: AsyncGenerator[Message, None],callback: Callable[[Message,],Awaitable[None],],):"""Subscribes a role to a trigger and sets up a callback to be called with the role's response.Args:role: The role to subscribe.trigger: An asynchronous generator that yields Messages to be processed by the role.callback: An asynchronous function to be called with the response from the role."""loop = asyncio.get_running_loop()async def _start_role():async for msg in trigger:resp = await role.run(msg)await callback(resp)self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")async def unsubscribe(self, role: Role):"""Unsubscribes a role from its trigger and cancels the associated task.Args:role: The role to unsubscribe."""task = self.tasks.pop(role)task.cancel()async def run(self, raise_exception: bool = True):"""Runs all subscribed tasks and handles their completion or exception.Args:raise_exception: _description_. Defaults to True.Raises:task.exception: _description_"""while True:for role, task in self.tasks.items():if task.done():if task.exception():if raise_exception:raise task.exception()logger.opt(exception=task.exception()).error(f"Task {task.get_name()} run error")else:logger.warning(f"Task {task.get_name()} has completed. ""If this is unexpected behavior, please check the trigger function.")self.tasks.pop(role)breakelse:await asyncio.sleep(1)# Actions 的实现
TRENDING_ANALYSIS_PROMPT = """# Requirements
You are a GitHub Trending Analyst, aiming to provide users with insightful and personalized recommendations based on the latest
GitHub Trends. Based on the context, fill in the following missing information, generate engaging and informative titles, 
ensuring users discover repositories aligned with their interests.# The title about Today's GitHub Trending
## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
---
# Format Example```
# [Title]## Today's Trends
Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
The top popular projects are Project1 and Project2.## The Trends Categories
1. Generative AI- [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]- [Project2](https://github/xx/project2): ...
...## Highlights of the List
1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
...
```---
# Github Trending
{trending}
"""class CrawlOSSTrending(Action):async def run(self, url: str = "https://github.com/trending"):async with aiohttp.ClientSession() as client:async with client.get(url, proxy=CONFIG.global_proxy) as response:response.raise_for_status()html = await response.text()soup = BeautifulSoup(html, 'html.parser')repositories = []for article in soup.select('article.Box-row'):repo_info = {}repo_info['name'] = article.select_one('h2 a').text.strip().replace("\n", "").replace(" ", "")repo_info['url'] = "https://github.com" + article.select_one('h2 a')['href'].strip()# Descriptiondescription_element = article.select_one('p')repo_info['description'] = description_element.text.strip() if description_element else None# Languagelanguage_element = article.select_one('span[itemprop="programmingLanguage"]')repo_info['language'] = language_element.text.strip() if language_element else None# Stars and Forksstars_element = article.select('a.Link--muted')[0]forks_element = article.select('a.Link--muted')[1]repo_info['stars'] = stars_element.text.strip()repo_info['forks'] = forks_element.text.strip()# Today's Starstoday_stars_element = article.select_one('span.d-inline-block.float-sm-right')repo_info['today_stars'] = today_stars_element.text.strip() if today_stars_element else Nonerepositories.append(repo_info)return repositoriesclass AnalysisOSSTrending(Action):async def run(self,trending: Any):return await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))# Role实现
class OssWatcher(Role):def __init__(self,name="Codey",profile="OssWatcher",goal="Generate an insightful GitHub Trending analysis report.",constraints="Only analyze based on the provided GitHub Trending data.",):super().__init__(name, profile, goal, constraints)self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])self._set_react_mode(react_mode="by_order")async def _act(self) -> Message:logger.info(f"{self._setting}: ready to {self._rc.todo}")# By choosing the Action by order under the hood# todo will be first SimpleWriteCode() then SimpleRunCode()todo = self._rc.todomsg = self.get_memories(k=1)[0]  # find the most k recent messagesresult = await todo.run(msg.content)msg = Message(content=str(result), role=self.profile, cause_by=type(todo))self._rc.memory.add(msg)return msg# Trigger
class OssInfo(BaseModel):url: strtimestamp: float = Field(default_factory=time.time)class GithubTrendingCronTrigger():def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:self.crontab = crontab(spec, tz=tz)self.url = urldef __aiter__(self):return selfasync def __anext__(self):await self.crontab.next()return Message(self.url, OssInfo(url=self.url))# callback
async def discord_callback(msg: Message):intents = discord.Intents.default()intents.message_content = Trueclient = discord.Client(intents=intents, proxy=CONFIG.global_proxy)token = os.environ["DISCORD_TOKEN"]channel_id = int(os.environ["DISCORD_CHANNEL_ID"])async with client:await client.login(token)channel = await client.fetch_channel(channel_id)lines = []for i in msg.content.splitlines():if i.startswith(("# ", "## ", "### ")):if lines:await channel.send("\n".join(lines))lines = []lines.append(i)if lines:await channel.send("\n".join(lines))class WxPusherClient:def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):self.base_url = base_urlself.token = token or os.environ["WXPUSHER_TOKEN"]async def send_message(self,content,summary: Optional[str] = None,content_type: int = 1,topic_ids: Optional[list[int]] = None,uids: Optional[list[int]] = None,verify: bool = False,url: Optional[str] = None,):payload = {"appToken": self.token,"content": content,"summary": summary,"contentType": content_type,"topicIds": topic_ids or [],"uids": uids or os.environ["WXPUSHER_UIDS"].split(","),"verifyPay": verify,"url": url,}url = f"{self.base_url}/api/send/message"return await self._request("POST", url, json=payload)async def _request(self, method, url, **kwargs):async with aiohttp.ClientSession() as session:async with session.request(method, url, **kwargs) as response:response.raise_for_status()return await response.json()async def wxpusher_callback(msg: Message):client = WxPusherClient()await client.send_message(msg.content, content_type=3)async def send_via_email(msg:Message):# 设置邮件信息sender = '123455@qq.com'receiver = '123455@qq.com'subject = 'Github Trending'body = '<br>'.join(msg.content)# 创建邮件对象msg = MIMEMultipart()msg['From'] = sendermsg['To'] = receivermsg['Subject'] = subject# 添加邮件正文msg.attach(MIMEText(body, 'html'))# 发送邮件try:server = smtplib.SMTP('smtp.qq.com', 587)server.starttls()#注意这个地方的密码不是邮箱密码,是授权码,不知道咋获取的搜索一下吧server.login(sender, 'authenticCODE')server.sendmail(sender, receiver, msg.as_string())server.quit()print('邮件发送成功')except Exception as e:print('邮件发送失败', e)# 运行入口,cron 表达式遵循特定的格式,通常是:分钟、小时、日、月、星期几。测试使用,1分钟一次,用几下就关掉程序啊,不然。。。
async def main(spec: str = "1 * * * *", discord: bool = False, wxpusher: bool = False, email:bool = True):callbacks = []if discord:callbacks.append(discord_callback)if wxpusher:callbacks.append(wxpusher_callback)if email:callbacks.append(send_via_email)if not callbacks:async def _print(msg: Message):print(msg.content)callbacks.append(_print)async def callback(msg):await asyncio.gather(*(call(msg) for call in callbacks))runner = SubscriptionRunner()await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback)await runner.run()if __name__ == "__main__":import firefire.Fire(main)

照抄的教程啊,增加了个发送邮件的功能,发送邮件功能测试过了,可以发送成功,但是整个的没运行成功,应该是没科学上网,打不开trending页面的事,时间紧张,有空再搞通或者爬个国内的网页。

这篇关于MetaGPT入门(三)-OSS订阅智能体的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/637625

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

嵌入式QT开发:构建高效智能的嵌入式系统

摘要: 本文深入探讨了嵌入式 QT 相关的各个方面。从 QT 框架的基础架构和核心概念出发,详细阐述了其在嵌入式环境中的优势与特点。文中分析了嵌入式 QT 的开发环境搭建过程,包括交叉编译工具链的配置等关键步骤。进一步探讨了嵌入式 QT 的界面设计与开发,涵盖了从基本控件的使用到复杂界面布局的构建。同时也深入研究了信号与槽机制在嵌入式系统中的应用,以及嵌入式 QT 与硬件设备的交互,包括输入输出设

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time

数论入门整理(updating)

一、gcd lcm 基础中的基础,一般用来处理计算第一步什么的,分数化简之类。 LL gcd(LL a, LL b) { return b ? gcd(b, a % b) : a; } <pre name="code" class="cpp">LL lcm(LL a, LL b){LL c = gcd(a, b);return a / c * b;} 例题:

Java 创建图形用户界面(GUI)入门指南(Swing库 JFrame 类)概述

概述 基本概念 Java Swing 的架构 Java Swing 是一个为 Java 设计的 GUI 工具包,是 JAVA 基础类的一部分,基于 Java AWT 构建,提供了一系列轻量级、可定制的图形用户界面(GUI)组件。 与 AWT 相比,Swing 提供了许多比 AWT 更好的屏幕显示元素,更加灵活和可定制,具有更好的跨平台性能。 组件和容器 Java Swing 提供了许多

【IPV6从入门到起飞】5-1 IPV6+Home Assistant(搭建基本环境)

【IPV6从入门到起飞】5-1 IPV6+Home Assistant #搭建基本环境 1 背景2 docker下载 hass3 创建容器4 浏览器访问 hass5 手机APP远程访问hass6 更多玩法 1 背景 既然电脑可以IPV6入站,手机流量可以访问IPV6网络的服务,为什么不在电脑搭建Home Assistant(hass),来控制你的设备呢?@智能家居 @万物互联

poj 2104 and hdu 2665 划分树模板入门题

题意: 给一个数组n(1e5)个数,给一个范围(fr, to, k),求这个范围中第k大的数。 解析: 划分树入门。 bing神的模板。 坑爹的地方是把-l 看成了-1........ 一直re。 代码: poj 2104: #include <iostream>#include <cstdio>#include <cstdlib>#include <al

智能交通(二)——Spinger特刊推荐

特刊征稿 01  期刊名称: Autonomous Intelligent Systems  特刊名称: Understanding the Policy Shift  with the Digital Twins in Smart  Transportation and Mobility 截止时间: 开放提交:2024年1月20日 提交截止日

MySQL-CRUD入门1

文章目录 认识配置文件client节点mysql节点mysqld节点 数据的添加(Create)添加一行数据添加多行数据两种添加数据的效率对比 数据的查询(Retrieve)全列查询指定列查询查询中带有表达式关于字面量关于as重命名 临时表引入distinct去重order by 排序关于NULL 认识配置文件 在我们的MySQL服务安装好了之后, 会有一个配置文件, 也就

基于 YOLOv5 的积水检测系统:打造高效智能的智慧城市应用

在城市发展中,积水问题日益严重,特别是在大雨过后,积水往往会影响交通甚至威胁人们的安全。通过现代计算机视觉技术,我们能够智能化地检测和识别积水区域,减少潜在危险。本文将介绍如何使用 YOLOv5 和 PyQt5 搭建一个积水检测系统,结合深度学习和直观的图形界面,为用户提供高效的解决方案。 源码地址: PyQt5+YoloV5 实现积水检测系统 预览: 项目背景