
2024-03-31 07:52


话不多说,我们直接进入babyAGI的核心部分,也就是task agent部分。

1. 创建任务agent


  • objective 目标
  • result 结果,dict类型
  • task_list 任务清单
  • task_descritption 任务描述



You are to use the result from an execution agent to create new tasks with the following objective: {objective}.





  1. 指定角色和任务
  2. 描述详细内容
  3. 指定输出格式和规则


def task_creation_agent(objective: str, result: Dict, task_description: str, task_list: List[str]
):prompt = f"""
You are to use the result from an execution agent to create new tasks with the following objective: {objective}.
The last completed task has the result: \n{result["data"]}
This result was based on this task description: {task_description}.\n"""if task_list:prompt += f"These are incomplete tasks: {', '.join(task_list)}\n"prompt += "Based on the result, return a list of tasks to be completed in order to meet the objective. "if task_list:prompt += "These new tasks must not overlap with incomplete tasks. "prompt += """
Return one task per line in your response. The result must be a numbered list in the format:#. First task
#. Second taskThe number of each entry must be followed by a period. If your list is empty, write "There are no tasks to add at this time."
Unless your list is empty, do not include any headers before your numbered list or follow your numbered list with any other output."""print(f'\n*****TASK CREATION AGENT PROMPT****\n{prompt}\n')response = openai_call(prompt, max_tokens=2000)print(f'\n****TASK CREATION AGENT RESPONSE****\n{response}\n')new_tasks = response.split('\n')new_tasks_list = []for task_string in new_tasks:task_parts = task_string.strip().split(".", 1)if len(task_parts) == 2:task_id = ''.join(s for s in task_parts[0] if s.isnumeric())task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip()if task_name.strip() and task_id.isnumeric():new_tasks_list.append(task_name)# print('New task created: ' + task_name)out = [{"task_name": task_name} for task_name in new_tasks_list]return out

2. 任务优先级排序agent



def prioritization_agent():task_names = tasks_storage.get_task_names()bullet_string = '\n'prompt = f"""
You are tasked with prioritizing the following tasks: {bullet_string + bullet_string.join(task_names)}
Consider the ultimate objective of your team: {OBJECTIVE}.
Tasks should be sorted from highest to lowest priority, where higher-priority tasks are those that act as pre-requisites or are more essential for meeting the objective.
Do not remove any tasks. Return the ranked tasks as a numbered list in the format:#. First task
#. Second taskThe entries must be consecutively numbered, starting with 1. The number of each entry must be followed by a period.
Do not include any headers before your ranked list or follow your list with any other output."""print(f'\n****TASK PRIORITIZATION AGENT PROMPT****\n{prompt}\n')response = openai_call(prompt, max_tokens=2000)print(f'\n****TASK PRIORITIZATION AGENT RESPONSE****\n{response}\n')if not response:print('Received empty response from priotritization agent. Keeping task list unchanged.')returnnew_tasks = response.split("\n") if "\n" in response else [response]new_tasks_list = []for task_string in new_tasks:task_parts = task_string.strip().split(".", 1)if len(task_parts) == 2:task_id = ''.join(s for s in task_parts[0] if s.isnumeric())task_name = re.sub(r'[^\w\s_]+', '', task_parts[1]).strip()if task_name.strip():new_tasks_list.append({"task_id": task_id, "task_name": task_name})return new_tasks_list

3. 执行任务agent



# Get the top n completed tasks for the objective
def context_agent(query: str, top_results_num: int):"""Retrieves context for a given query from an index of tasks.Args:query (str): The query or objective for retrieving context.top_results_num (int): The number of top results to retrieve.Returns:list: A list of tasks as context for the given query, sorted by relevance."""results = results_storage.query(query=query, top_results_num=top_results_num)# print("****RESULTS****")# print(results)return results

这段代码是通过OpenAI API执行agent,整个prompt的结构同样是

  1. 先说明目标
  2. 拼接已完成的上下文,详细内容
  3. 再表明你的任务
# Execute a task based on the objective and five previous tasks
def execution_agent(objective: str, task: str) -> str:"""Executes a task based on the given objective and previous context.Args:objective (str): The objective or goal for the AI to perform the task.task (str): The task to be executed by the AI.Returns:str: The response generated by the AI for the given task."""context = context_agent(query=objective, top_results_num=5)# print("\n****RELEVANT CONTEXT****\n")# print(context)# print('')prompt = f'Perform one task based on the following objective: {objective}.\n'if context:prompt += 'Take into account these previously completed tasks:' + '\n'.join(context)prompt += f'\nYour task: {task}\nResponse:'return openai_call(prompt, max_tokens=2000)


4. 任务主体



  • 获取task_storage中第一个未完成的任务,执行execution_agent,传入目标和未完成的任务,获取result
  • 重新包装resultresult_id,并将结果存放到result_storage中,而这个result_storage正式之前配置的向量数据库
  • 创建新任务,并重新排列任务优先级,这里只有设置了cooperative mode才会执行这个,这里我们也可以理解,当有多个线程同时参与时,需要进行优先级排序,防止重复执行任务


# Add the initial task if starting new objective
if not JOIN_EXISTING_OBJECTIVE:initial_task = {"task_id": tasks_storage.next_task_id(),"task_name": INITIAL_TASK}tasks_storage.append(initial_task)def main():loop = Truewhile loop:# As long as there are tasks in the storage...if not tasks_storage.is_empty():# Print the task listprint("\033[95m\033[1m" + "\n*****TASK LIST*****\n" + "\033[0m\033[0m")for t in tasks_storage.get_task_names():print(" • " + str(t))# Step 1: Pull the first incomplete tasktask = tasks_storage.popleft()print("\033[92m\033[1m" + "\n*****NEXT TASK*****\n" + "\033[0m\033[0m")print(str(task["task_name"]))# Send to execution function to complete the task based on the contextresult = execution_agent(OBJECTIVE, str(task["task_name"]))print("\033[93m\033[1m" + "\n*****TASK RESULT*****\n" + "\033[0m\033[0m")print(result)# Step 2: Enrich result and store in the results storage# This is where you should enrich the result if neededenriched_result = {"data": result}# extract the actual result from the dictionary# since we don't do enrichment currently# vector = enriched_result["data"]result_id = f"result_{task['task_id']}"results_storage.add(task, result, result_id)# Step 3: Create new tasks and re-prioritize task list# only the main instance in cooperative mode does thatnew_tasks = task_creation_agent(OBJECTIVE,enriched_result,task["task_name"],tasks_storage.get_task_names(),)print('Adding new tasks to task_storage')for new_task in new_tasks:new_task.update({"task_id": tasks_storage.next_task_id()})print(str(new_task))tasks_storage.append(new_task)if not JOIN_EXISTING_OBJECTIVE:prioritized_tasks = prioritization_agent()if prioritized_tasks:tasks_storage.replace(prioritized_tasks)# Sleep a bit before checking the task list againtime.sleep(5)else:print('Done.')loop = Falseif __name__ == "__main__":main()





