BabyAGI源码解读(2)-核心agents部分

2024-03-31 07:52

本文主要是介绍BabyAGI源码解读(2)-核心agents部分,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

话不多说,我们直接进入babyAGI的核心部分,也就是task agent部分。

1. 创建任务agent

这一段代码的任务是创建一个任务,这个函数有四个参数

  • objective 目标
  • result 结果,dict类型
  • task_list 任务清单
  • task_descritption 任务描述

将结果存放到out这个dict中返回。

在prompt中,指定了

You are to use the result from an execution agent to create new tasks with the following objective: {objective}.

角色和目标,当然目标不论是人工智能还是人类,都是很重要的一点。值得注意的,角色和能力都在prompt最开始就以指定,作为一个背景的存在。

接着输入上一个完成的任务的结果和任务描述,然后拼接未完成的任务清单。

最后加入任务的一些规范,新任务不得与未完成任务重复和输出prompt的样式规范等。

整体来看,整个prompt的架构按顺序是

  1. 指定角色和任务
  2. 描述详细内容
  3. 指定输出格式和规则

这是一个优秀的prompt案例,大家可以学习一下。

def task_creation_agent(objective: str, result: Dict, task_description: str, task_list: List[str]
):prompt = f"""
You are to use the result from an execution agent to create new tasks with the following objective: {objective}.
The last completed task has the result: \n{result["data"]}
This result was based on this task description: {task_description}.\n"""if task_list:prompt += f"These are incomplete tasks: {', '.join(task_list)}\n"prompt += "Based on the result, return a list of tasks to be completed in order to meet the objective. "if task_list:prompt += "These new tasks must not overlap with incomplete tasks. "prompt += """
Return one task per line in your response. The result must be a numbered list in the format:#. First task
#. Second taskThe number of each entry must be followed by a period. If your list is empty, write "There are no tasks to add at this time."
Unless your list is empty, do not include any headers before your numbered list or follow your numbered list with any other output."""print(f'\n*****TASK CREATION AGENT PROMPT****\n{prompt}\n')response = openai_call(prompt, max_tokens=2000)print(f'\n****TASK CREATION AGENT RESPONSE****\n{response}\n')new_tasks = response.split('\n')new_tasks_list = []for task_string in new_tasks:task_parts = task_string.strip().split(".", 1)if len(task_parts) == 2:task_id = ''.join(s for s in task_parts[0] if s.isnumeric())task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip()if task_name.strip() and task_id.isnumeric():new_tasks_list.append(task_name)# print('New task created: ' + task_name)out = [{"task_name": task_name} for task_name in new_tasks_list]return out

2. 任务优先级排序agent

这段代码主要是调用openAI对已存储的任务清单进行优先级排序,返回一个新的任务列表。

这段代码可以看一下,prompt的编写,整体和上面的差异不大。

def prioritization_agent():task_names = tasks_storage.get_task_names()bullet_string = '\n'prompt = f"""
You are tasked with prioritizing the following tasks: {bullet_string + bullet_string.join(task_names)}
Consider the ultimate objective of your team: {OBJECTIVE}.
Tasks should be sorted from highest to lowest priority, where higher-priority tasks are those that act as pre-requisites or are more essential for meeting the objective.
Do not remove any tasks. Return the ranked tasks as a numbered list in the format:#. First task
#. Second taskThe entries must be consecutively numbered, starting with 1. The number of each entry must be followed by a period.
Do not include any headers before your ranked list or follow your list with any other output."""print(f'\n****TASK PRIORITIZATION AGENT PROMPT****\n{prompt}\n')response = openai_call(prompt, max_tokens=2000)print(f'\n****TASK PRIORITIZATION AGENT RESPONSE****\n{response}\n')if not response:print('Received empty response from priotritization agent. Keeping task list unchanged.')returnnew_tasks = response.split("\n") if "\n" in response else [response]new_tasks_list = []for task_string in new_tasks:task_parts = task_string.strip().split(".", 1)if len(task_parts) == 2:task_id = ''.join(s for s in task_parts[0] if s.isnumeric())task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip()if task_name.strip():new_tasks_list.append({"task_id": task_id, "task_name": task_name})return new_tasks_list

3. 执行任务agent

这两段段代码执行五个基于目标的优先级比较高的任务。

这段代码是从结果存储中,根据查询内容获取top_results_nums个任务。

# Get the top n completed tasks for the objective
def context_agent(query: str, top_results_num: int):"""Retrieves context for a given query from an index of tasks.Args:query (str): The query or objective for retrieving context.top_results_num (int): The number of top results to retrieve.Returns:list: A list of tasks as context for the given query, sorted by relevance."""results = results_storage.query(query=query, top_results_num=top_results_num)# print("****RESULTS****")# print(results)return results

这段代码是通过OpenAI API执行agent,整个prompt的结构同样是

  1. 先说明目标
  2. 拼接已完成的上下文,详细内容
  3. 再表明你的任务
# Execute a task based on the objective and five previous tasks
def execution_agent(objective: str, task: str) -> str:"""Executes a task based on the given objective and previous context.Args:objective (str): The objective or goal for the AI to perform the task.task (str): The task to be executed by the AI.Returns:str: The response generated by the AI for the given task."""context = context_agent(query=objective, top_results_num=5)# print("\n****RELEVANT CONTEXT****\n")# print(context)# print('')prompt = f'Perform one task based on the following objective: {objective}.\n'if context:prompt += 'Take into account these previously completed tasks:' + '\n'.join(context)prompt += f'\nYour task: {task}\nResponse:'return openai_call(prompt, max_tokens=2000)

这两段代码不知道为什么带上注释了,可能是作者买了coplit了吧😂

4. 任务主体

下面就是整体代码的最终部分了,主程序部分

主程序是一个大的死循环,主要分为三步

  • 获取task_storage中第一个未完成的任务,执行execution_agent,传入目标和未完成的任务,获取result
  • 重新包装resultresult_id,并将结果存放到result_storage中,而这个result_storage正式之前配置的向量数据库
  • 创建新任务,并重新排列任务优先级,这里只有设置了cooperative mode才会执行这个,这里我们也可以理解,当有多个线程同时参与时,需要进行优先级排序,防止重复执行任务

整体项目来看的话,就是一个执行任务-分解任务-执行任务的循环,当列表为空时,任务执行完成。

# Add the initial task if starting new objective
if not JOIN_EXISTING_OBJECTIVE:initial_task = {"task_id": tasks_storage.next_task_id(),"task_name": INITIAL_TASK}tasks_storage.append(initial_task)def main():loop = Truewhile loop:# As long as there are tasks in the storage...if not tasks_storage.is_empty():# Print the task listprint("\033[95m\033[1m" + "\n*****TASK LIST*****\n" + "\033[0m\033[0m")for t in tasks_storage.get_task_names():print(" • " + str(t))# Step 1: Pull the first incomplete tasktask = tasks_storage.popleft()print("\033[92m\033[1m" + "\n*****NEXT TASK*****\n" + "\033[0m\033[0m")print(str(task["task_name"]))# Send to execution function to complete the task based on the contextresult = execution_agent(OBJECTIVE, str(task["task_name"]))print("\033[93m\033[1m" + "\n*****TASK RESULT*****\n" + "\033[0m\033[0m")print(result)# Step 2: Enrich result and store in the results storage# This is where you should enrich the result if neededenriched_result = {"data": result}# extract the actual result from the dictionary# since we don't do enrichment currently# vector = enriched_result["data"]result_id = f"result_{task['task_id']}"results_storage.add(task, result, result_id)# Step 3: Create new tasks and re-prioritize task list# only the main instance in cooperative mode does thatnew_tasks = task_creation_agent(OBJECTIVE,enriched_result,task["task_name"],tasks_storage.get_task_names(),)print('Adding new tasks to task_storage')for new_task in new_tasks:new_task.update({"task_id": tasks_storage.next_task_id()})print(str(new_task))tasks_storage.append(new_task)if not JOIN_EXISTING_OBJECTIVE:prioritized_tasks = prioritization_agent()if prioritized_tasks:tasks_storage.replace(prioritized_tasks)# Sleep a bit before checking the task list againtime.sleep(5)else:print('Done.')loop = Falseif __name__ == "__main__":main()

这就是整体的项目代码,下一篇,我们整体来看看AGI的原理,做个总结。

这篇关于BabyAGI源码解读(2)-核心agents部分的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/863816

相关文章

解读GC日志中的各项指标用法

《解读GC日志中的各项指标用法》:本文主要介绍GC日志中的各项指标用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、基础 GC 日志格式(以 G1 为例)1. Minor GC 日志2. Full GC 日志二、关键指标解析1. GC 类型与触发原因2. 堆

Java设计模式---迭代器模式(Iterator)解读

《Java设计模式---迭代器模式(Iterator)解读》:本文主要介绍Java设计模式---迭代器模式(Iterator),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录1、迭代器(Iterator)1.1、结构1.2、常用方法1.3、本质1、解耦集合与遍历逻辑2、统一

MySQL之InnoDB存储页的独立表空间解读

《MySQL之InnoDB存储页的独立表空间解读》:本文主要介绍MySQL之InnoDB存储页的独立表空间,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、独立表空间【1】表空间大小【2】区【3】组【4】段【5】区的类型【6】XDES Entry区结构【

MySQL主从复制与读写分离的用法解读

《MySQL主从复制与读写分离的用法解读》:本文主要介绍MySQL主从复制与读写分离的用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、主从复制mysql主从复制原理实验案例二、读写分离实验案例安装并配置mycat 软件设置mycat读写分离验证mycat读

Python的端到端测试框架SeleniumBase使用解读

《Python的端到端测试框架SeleniumBase使用解读》:本文主要介绍Python的端到端测试框架SeleniumBase使用,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全... 目录SeleniumBase详细介绍及用法指南什么是 SeleniumBase?SeleniumBase

详解MySQL中DISTINCT去重的核心注意事项

《详解MySQL中DISTINCT去重的核心注意事项》为了实现查询不重复的数据,MySQL提供了DISTINCT关键字,它的主要作用就是对数据表中一个或多个字段重复的数据进行过滤,只返回其中的一条数据... 目录DISTINCT 六大注意事项1. 作用范围:所有 SELECT 字段2. NULL 值的特殊处

Python包管理工具核心指令uvx举例详细解析

《Python包管理工具核心指令uvx举例详细解析》:本文主要介绍Python包管理工具核心指令uvx的相关资料,uvx是uv工具链中用于临时运行Python命令行工具的高效执行器,依托Rust实... 目录一、uvx 的定位与核心功能二、uvx 的典型应用场景三、uvx 与传统工具对比四、uvx 的技术实

java中Optional的核心用法和最佳实践

《java中Optional的核心用法和最佳实践》Java8中Optional用于处理可能为null的值,减少空指针异常,:本文主要介绍java中Optional核心用法和最佳实践的相关资料,文中... 目录前言1. 创建 Optional 对象1.1 常规创建方式2. 访问 Optional 中的值2.1

Nacos注册中心和配置中心的底层原理全面解读

《Nacos注册中心和配置中心的底层原理全面解读》:本文主要介绍Nacos注册中心和配置中心的底层原理的全面解读,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录临时实例和永久实例为什么 Nacos 要将服务实例分为临时实例和永久实例?1.x 版本和2.x版本的区别

C++类和对象之默认成员函数的使用解读

《C++类和对象之默认成员函数的使用解读》:本文主要介绍C++类和对象之默认成员函数的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、默认成员函数有哪些二、各默认成员函数详解默认构造函数析构函数拷贝构造函数拷贝赋值运算符三、默认成员函数的注意事项总结一