BabyAGI源码解读(2)-核心agents部分

2024-03-31 07:52

本文主要是介绍BabyAGI源码解读(2)-核心agents部分,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

话不多说,我们直接进入babyAGI的核心部分,也就是task agent部分。

1. 创建任务agent

这一段代码的任务是创建一个任务,这个函数有四个参数

  • objective 目标
  • result 结果,dict类型
  • task_list 任务清单
  • task_descritption 任务描述

将结果存放到out这个dict中返回。

在prompt中,指定了

You are to use the result from an execution agent to create new tasks with the following objective: {objective}.

角色和目标,当然目标不论是人工智能还是人类,都是很重要的一点。值得注意的,角色和能力都在prompt最开始就以指定,作为一个背景的存在。

接着输入上一个完成的任务的结果和任务描述,然后拼接未完成的任务清单。

最后加入任务的一些规范,新任务不得与未完成任务重复和输出prompt的样式规范等。

整体来看,整个prompt的架构按顺序是

  1. 指定角色和任务
  2. 描述详细内容
  3. 指定输出格式和规则

这是一个优秀的prompt案例,大家可以学习一下。

def task_creation_agent(objective: str, result: Dict, task_description: str, task_list: List[str]
):prompt = f"""
You are to use the result from an execution agent to create new tasks with the following objective: {objective}.
The last completed task has the result: \n{result["data"]}
This result was based on this task description: {task_description}.\n"""if task_list:prompt += f"These are incomplete tasks: {', '.join(task_list)}\n"prompt += "Based on the result, return a list of tasks to be completed in order to meet the objective. "if task_list:prompt += "These new tasks must not overlap with incomplete tasks. "prompt += """
Return one task per line in your response. The result must be a numbered list in the format:#. First task
#. Second taskThe number of each entry must be followed by a period. If your list is empty, write "There are no tasks to add at this time."
Unless your list is empty, do not include any headers before your numbered list or follow your numbered list with any other output."""print(f'\n*****TASK CREATION AGENT PROMPT****\n{prompt}\n')response = openai_call(prompt, max_tokens=2000)print(f'\n****TASK CREATION AGENT RESPONSE****\n{response}\n')new_tasks = response.split('\n')new_tasks_list = []for task_string in new_tasks:task_parts = task_string.strip().split(".", 1)if len(task_parts) == 2:task_id = ''.join(s for s in task_parts[0] if s.isnumeric())task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip()if task_name.strip() and task_id.isnumeric():new_tasks_list.append(task_name)# print('New task created: ' + task_name)out = [{"task_name": task_name} for task_name in new_tasks_list]return out

2. 任务优先级排序agent

这段代码主要是调用openAI对已存储的任务清单进行优先级排序,返回一个新的任务列表。

这段代码可以看一下,prompt的编写,整体和上面的差异不大。

def prioritization_agent():task_names = tasks_storage.get_task_names()bullet_string = '\n'prompt = f"""
You are tasked with prioritizing the following tasks: {bullet_string + bullet_string.join(task_names)}
Consider the ultimate objective of your team: {OBJECTIVE}.
Tasks should be sorted from highest to lowest priority, where higher-priority tasks are those that act as pre-requisites or are more essential for meeting the objective.
Do not remove any tasks. Return the ranked tasks as a numbered list in the format:#. First task
#. Second taskThe entries must be consecutively numbered, starting with 1. The number of each entry must be followed by a period.
Do not include any headers before your ranked list or follow your list with any other output."""print(f'\n****TASK PRIORITIZATION AGENT PROMPT****\n{prompt}\n')response = openai_call(prompt, max_tokens=2000)print(f'\n****TASK PRIORITIZATION AGENT RESPONSE****\n{response}\n')if not response:print('Received empty response from priotritization agent. Keeping task list unchanged.')returnnew_tasks = response.split("\n") if "\n" in response else [response]new_tasks_list = []for task_string in new_tasks:task_parts = task_string.strip().split(".", 1)if len(task_parts) == 2:task_id = ''.join(s for s in task_parts[0] if s.isnumeric())task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip()if task_name.strip():new_tasks_list.append({"task_id": task_id, "task_name": task_name})return new_tasks_list

3. 执行任务agent

这两段段代码执行五个基于目标的优先级比较高的任务。

这段代码是从结果存储中,根据查询内容获取top_results_nums个任务。

# Get the top n completed tasks for the objective
def context_agent(query: str, top_results_num: int):"""Retrieves context for a given query from an index of tasks.Args:query (str): The query or objective for retrieving context.top_results_num (int): The number of top results to retrieve.Returns:list: A list of tasks as context for the given query, sorted by relevance."""results = results_storage.query(query=query, top_results_num=top_results_num)# print("****RESULTS****")# print(results)return results

这段代码是通过OpenAI API执行agent,整个prompt的结构同样是

  1. 先说明目标
  2. 拼接已完成的上下文,详细内容
  3. 再表明你的任务
# Execute a task based on the objective and five previous tasks
def execution_agent(objective: str, task: str) -> str:"""Executes a task based on the given objective and previous context.Args:objective (str): The objective or goal for the AI to perform the task.task (str): The task to be executed by the AI.Returns:str: The response generated by the AI for the given task."""context = context_agent(query=objective, top_results_num=5)# print("\n****RELEVANT CONTEXT****\n")# print(context)# print('')prompt = f'Perform one task based on the following objective: {objective}.\n'if context:prompt += 'Take into account these previously completed tasks:' + '\n'.join(context)prompt += f'\nYour task: {task}\nResponse:'return openai_call(prompt, max_tokens=2000)

这两段代码不知道为什么带上注释了,可能是作者买了coplit了吧😂

4. 任务主体

下面就是整体代码的最终部分了,主程序部分

主程序是一个大的死循环,主要分为三步

  • 获取task_storage中第一个未完成的任务,执行execution_agent,传入目标和未完成的任务,获取result
  • 重新包装resultresult_id,并将结果存放到result_storage中,而这个result_storage正式之前配置的向量数据库
  • 创建新任务,并重新排列任务优先级,这里只有设置了cooperative mode才会执行这个,这里我们也可以理解,当有多个线程同时参与时,需要进行优先级排序,防止重复执行任务

整体项目来看的话,就是一个执行任务-分解任务-执行任务的循环,当列表为空时,任务执行完成。

# Add the initial task if starting new objective
if not JOIN_EXISTING_OBJECTIVE:initial_task = {"task_id": tasks_storage.next_task_id(),"task_name": INITIAL_TASK}tasks_storage.append(initial_task)def main():loop = Truewhile loop:# As long as there are tasks in the storage...if not tasks_storage.is_empty():# Print the task listprint("\033[95m\033[1m" + "\n*****TASK LIST*****\n" + "\033[0m\033[0m")for t in tasks_storage.get_task_names():print(" • " + str(t))# Step 1: Pull the first incomplete tasktask = tasks_storage.popleft()print("\033[92m\033[1m" + "\n*****NEXT TASK*****\n" + "\033[0m\033[0m")print(str(task["task_name"]))# Send to execution function to complete the task based on the contextresult = execution_agent(OBJECTIVE, str(task["task_name"]))print("\033[93m\033[1m" + "\n*****TASK RESULT*****\n" + "\033[0m\033[0m")print(result)# Step 2: Enrich result and store in the results storage# This is where you should enrich the result if neededenriched_result = {"data": result}# extract the actual result from the dictionary# since we don't do enrichment currently# vector = enriched_result["data"]result_id = f"result_{task['task_id']}"results_storage.add(task, result, result_id)# Step 3: Create new tasks and re-prioritize task list# only the main instance in cooperative mode does thatnew_tasks = task_creation_agent(OBJECTIVE,enriched_result,task["task_name"],tasks_storage.get_task_names(),)print('Adding new tasks to task_storage')for new_task in new_tasks:new_task.update({"task_id": tasks_storage.next_task_id()})print(str(new_task))tasks_storage.append(new_task)if not JOIN_EXISTING_OBJECTIVE:prioritized_tasks = prioritization_agent()if prioritized_tasks:tasks_storage.replace(prioritized_tasks)# Sleep a bit before checking the task list againtime.sleep(5)else:print('Done.')loop = Falseif __name__ == "__main__":main()

这就是整体的项目代码,下一篇,我们整体来看看AGI的原理,做个总结。

这篇关于BabyAGI源码解读(2)-核心agents部分的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/863816

相关文章

Redis与缓存解读

《Redis与缓存解读》文章介绍了Redis作为缓存层的优势和缺点,并分析了六种缓存更新策略,包括超时剔除、先删缓存再更新数据库、旁路缓存、先更新数据库再删缓存、先更新数据库再更新缓存、读写穿透和异步... 目录缓存缓存优缺点缓存更新策略超时剔除先删缓存再更新数据库旁路缓存(先更新数据库,再删缓存)先更新数

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

C#反射编程之GetConstructor()方法解读

《C#反射编程之GetConstructor()方法解读》C#中Type类的GetConstructor()方法用于获取指定类型的构造函数,该方法有多个重载版本,可以根据不同的参数获取不同特性的构造函... 目录C# GetConstructor()方法有4个重载以GetConstructor(Type[]

Andrej Karpathy最新采访:认知核心模型10亿参数就够了,AI会打破教育不公的僵局

夕小瑶科技说 原创  作者 | 海野 AI圈子的红人,AI大神Andrej Karpathy,曾是OpenAI联合创始人之一,特斯拉AI总监。上一次的动态是官宣创办一家名为 Eureka Labs 的人工智能+教育公司 ,宣布将长期致力于AI原生教育。 近日,Andrej Karpathy接受了No Priors(投资博客)的采访,与硅谷知名投资人 Sara Guo 和 Elad G

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟 开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚 第一站:海量资源,应有尽有 走进“智听

MCU7.keil中build产生的hex文件解读

1.hex文件大致解读 闲来无事,查看了MCU6.用keil新建项目的hex文件 用FlexHex打开 给我的第一印象是:经过软件的解释之后,发现这些数据排列地十分整齐 :02000F0080FE71:03000000020003F8:0C000300787FE4F6D8FD75810702000F3D:00000001FF 把解释后的数据当作十六进制来观察 1.每一行数据

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

poj 2976 分数规划二分贪心(部分对总体的贡献度) poj 3111

poj 2976: 题意: 在n场考试中,每场考试共有b题,答对的题目有a题。 允许去掉k场考试,求能达到的最高正确率是多少。 解析: 假设已知准确率为x,则每场考试对于准确率的贡献值为: a - b * x,将贡献值大的排序排在前面舍弃掉后k个。 然后二分x就行了。 代码: #include <iostream>#include <cstdio>#incl

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、