MetaGPT部分源码解读--Role

2024-03-31 20:12

本文主要是介绍MetaGPT部分源码解读--Role,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RoleReactMode

class RoleReactMode(str, Enum):REACT = "react"BY_ORDER = "by_order"PLAN_AND_ACT = "plan_and_act"@classmethoddef values(cls):return [item.value for item in cls]
  1. REACT:表示标准的反应模式,即在思考和行动之间交替,通过 LLN 选择动作。
  2. BY_ORDER:表示按顺序选择动作的模式,即每次执行一个动作,然后进行下一个动作的选择。
  3. PLAN_AND_ACT:表示首先进行规划,然后执行动作序列的模式。

通过定义 values() 方法,可以轻松地获取枚举类中所有常量的值。这个方法会返回一个列表,包含枚举类中所有常量的字符串值。

RoleContext

    model_config = ConfigDict(arbitrary_types_allowed=True)# # env exclude=True to avoid `RecursionError: maximum recursion depth exceeded in comparison`env: "Environment" = Field(default=None, exclude=True)  # # avoid circular import# TODO judge if ser&desermsg_buffer: MessageQueue = Field(default_factory=MessageQueue, exclude=True)  # Message Buffer with Asynchronous Updatesmemory: Memory = Field(default_factory=Memory)# long_term_memory: LongTermMemory = Field(default_factory=LongTermMemory)state: int = Field(default=-1)  # -1 indicates initial or termination state where todo is Nonetodo: Action = Field(default=None, exclude=True)watch: set[str] = Field(default_factory=set)news: list[Type[Message]] = Field(default=[], exclude=True)  # TODO not usedreact_mode: RoleReactMode = (RoleReactMode.REACT)  # see `Role._set_react_mode` for definitions of the following two attributesmax_react_loop: int = 1
  1. model_config: 用于配置模型的参数字典,允许其中包含任意类型的数据。

  2. env: 表示环境的字段,类型为"Environment",默认值为None。由于可能出现循环导入的问题,因此在序列化时需要排除该字段。

  3. msg_buffer: 表示消息缓冲区的字段,类型为MessageQueue,默认工厂函数为MessageQueue,用于异步更新消息。

  4. memory: 表示记忆的字段,类型为Memory,默认工厂函数为Memory,用于存储角色的记忆信息。

  5. state: 表示状态的字段,类型为int,默认值为-1,表示初始或终止状态,其中-1表示没有要执行的操作。

  6. todo: 表示要执行的操作的字段,类型为Action,默认值为None,用于存储要执行的下一步操作。

  7. watch: 表示观察的字段,类型为set[str],默认工厂函数为set,用于存储角色感兴趣的操作。

  8. news: 表示新闻的字段,类型为list[Type[Message]],默认值为空列表,用于存储角色观察到的新消息。

  9. react_mode: 表示反应模式的字段,类型为RoleReactMode,默认值为RoleReactMode.REACT,用于定义角色在反应观察到的消息时的策略。

  10. max_react_loop: 表示最大反应循环次数的字段,类型为int,默认值为1,用于控制角色在反应模式为RoleReactMode.REACT时的最大循环次数。

    def check(self, role_id: str):# if hasattr(CONFIG, "long_term_memory") and CONFIG.long_term_memory:#     self.long_term_memory.recover_memory(role_id, self)#     self.memory = self.long_term_memory  # use memory to act as long_term_memory for unify operationpass@propertydef important_memory(self) -> list[Message]:"""Retrieve information corresponding to the attention action."""return self.memory.get_by_actions(self.watch)@propertydef history(self) -> list[Message]:return self.memory.get()

important_memory

    def get_by_actions(self, actions: Set) -> list[Message]:"""Return all messages triggered by specified Actions"""rsp = []indices = any_to_str_set(actions)for action in indices:if action not in self.index:continuersp += self.index[action]return rsp
  1. 接收一个参数 actions,类型为集合(Set),其中包含了要查询的动作集合。

  2. 将传入的动作集合转换为字符串类型的集合,这是因为内部索引是以字符串形式存储的。

  3. 遍历转换后的动作集合,对每个动作进行处理:

    • 如果该动作不在内部索引中,则跳过该动作。
    • 如果该动作在内部索引中,则将与该动作相关联的消息添加到结果列表中。
  4. 返回包含了所有相关消息的结果列表。

history

    def get(self, k=0) -> list[Message]:"""Return the most recent k memories, return all when k=0"""return self.storage[-k:]

返回storage的最近k个消息

Role

    model_config = ConfigDict(arbitrary_types_allowed=True, exclude=["llm"])name: str = ""profile: str = ""goal: str = ""constraints: str = ""desc: str = ""is_human: bool = Falsellm: BaseLLM = Field(default_factory=LLM, exclude=True)  # Each role has its own LLM, use different system messagerole_id: str = ""states: list[str] = []actions: list[SerializeAsAny[Action]] = Field(default=[], validate_default=True)rc: RoleContext = Field(default_factory=RoleContext)subscription: set[str] = set()# builtin variablesrecovered: bool = False  # to tag if a recovered rolelatest_observed_msg: Optional[Message] = None  # record the latest observed message when interrupted__hash__ = object.__hash__  # support Role as hashable type in `Environment.members`
  1. model_config: 这是一个 ConfigDict 对象,用于配置模型的参数。arbitrary_types_allowed=True 表示允许任意类型的值,exclude=["llm"] 表示在序列化时排除 llm 属性。

  2. name, profile, goal, constraints, desc, is_human: 这些是类的属性,分别表示角色的名称、概要、目标、约束、描述和是否为人类。它们都有默认值,类型分别为字符串和布尔值。

  3. llm: 这是一个属性,表示每个角色具有自己的 BaseLLM 对象。默认使用 LLM 的默认函数创建,并在序列化时排除。

  4. role_id, states, actions, rc, subscription, recovered, latest_observed_msg: 这些也是类的属性,分别表示角色的ID、状态列表、动作列表、角色上下文、订阅集合、是否已恢复、最新观察到的消息等。它们都有默认值,类型不同。

  5. __hash__: 这是类的特殊属性,用于定义对象的哈希值,以支持 Role 作为可哈希类型在环境的成员中使用。Role 对象可以作为集合的元素或字典的键,而不会引发错误或产生意外结果

@model_validator(mode="after")def check_subscription(self):if not self.subscription:self.subscription = {any_to_str(self), self.name} if self.name else {any_to_str(self)}return self

检查角色对象的订阅内容是否为空,如果为空,则设置默认的订阅内容。如果角色对象的名称不为空,则将角色对象的名称作为订阅内容之一,否则将角色对象本身转换为字符串后作为订阅内容之一。

__init__

    def __init__(self, **data: Any):# --- avoid PydanticUndefinedAnnotation name 'Environment' is not defined #from metagpt.environment import EnvironmentEnvironment# ------Role.model_rebuild()super().__init__(**data)self.llm.system_prompt = self._get_prefix()self._watch(data.get("watch") or [UserRequirement])
@classmethoddef model_rebuild(cls,*,force: bool = False,raise_errors: bool = True,_parent_namespace_depth: int = 2,_types_namespace: dict[str, Any] | None = None,) -> bool | None:"""Try to rebuild the pydantic-core schema for the model.This may be necessary when one of the annotations is a ForwardRef which could not be resolved duringthe initial attempt to build the schema, and automatic rebuilding fails.Args:force: Whether to force the rebuilding of the model schema, defaults to `False`.raise_errors: Whether to raise errors, defaults to `True`._parent_namespace_depth: The depth level of the parent namespace, defaults to 2._types_namespace: The types namespace, defaults to `None`.Returns:Returns `None` if the schema is already "complete" and rebuilding was not required.If rebuilding _was_ required, returns `True` if rebuilding was successful, otherwise `False`."""if not force and cls.__pydantic_complete__:return Noneelse:if '__pydantic_core_schema__' in cls.__dict__:delattr(cls, '__pydantic_core_schema__')  # delete cached value to ensure full rebuild happensif _types_namespace is not None:types_namespace: dict[str, Any] | None = _types_namespace.copy()else:if _parent_namespace_depth > 0:frame_parent_ns = _typing_extra.parent_frame_namespace(parent_depth=_parent_namespace_depth) or {}cls_parent_ns = (_model_construction.unpack_lenient_weakvaluedict(cls.__pydantic_parent_namespace__) or {})types_namespace = {**cls_parent_ns, **frame_parent_ns}cls.__pydantic_parent_namespace__ = _model_construction.build_lenient_weakvaluedict(types_namespace)else:types_namespace = _model_construction.unpack_lenient_weakvaluedict(cls.__pydantic_parent_namespace__)types_namespace = _typing_extra.get_cls_types_namespace(cls, types_namespace)# manually override defer_build so complete_model_class doesn't skip building the model againconfig = {**cls.model_config, 'defer_build': False}return _model_construction.complete_model_class(cls,cls.__name__,_config.ConfigWrapper(config, check=False),raise_errors=raise_errors,types_namespace=types_namespace,)
  • force: 这个参数是一个布尔值,默认为 False。如果设置为 True,则强制重新构建模型模式,即使模式已经是完整的也会重新构建。

  • raise_errors: 这个参数也是一个布尔值,默认为 True。如果设置为 True,则在出现错误时会抛出异常;如果设置为 False,则在出现错误时会返回错误信息但不会抛出异常。

  • _parent_namespace_depth: 这是父命名空间的深度级别,默认为 2。这个参数用于构建类型命名空间。

  • _types_namespace: 这是一个字典类型,用于存储类型命名空间。默认为 None,表示没有传入类型命名空间。

    def _get_prefix(self):"""Get the role prefix"""if self.desc:return self.descprefix = PREFIX_TEMPLATE.format(**{"profile": self.profile, "name": self.name, "goal": self.goal})if self.constraints:prefix += CONSTRAINT_TEMPLATE.format(**{"constraints": self.constraints})if self.rc.env and self.rc.env.desc:other_role_names = ", ".join(self.rc.env.role_names())env_desc = f"You are in {self.rc.env.desc} with roles({other_role_names})."prefix += env_descreturn prefix
  1. 如果角色的描述 (desc) 不为空,则直接返回描述。
  2. 否则,根据一定的模板格式化角色的配置、名称和目标信息,生成前缀字符串。
  3. 如果角色有约束 (constraints),则将约束信息也添加到前缀字符串中。
  4. 如果角色的上下文环境 (rc.env) 不为空并且环境描述 (env.desc) 也不为空,则获取环境描述和环境中其他角色的名称,并添加到前缀字符串中。
  5. 最后返回生成的前缀字符串。

    def serialize(self, stg_path: Path = None):stg_path = (SERDESER_PATH.joinpath(f"team/environment/roles/{self.__class__.__name__}_{self.name}")if stg_path is Noneelse stg_path)role_info = self.model_dump(exclude={"rc": {"memory": True, "msg_buffer": True}, "llm": True})role_info.update({"role_class": self.__class__.__name__, "module_name": self.__module__})role_info_path = stg_path.joinpath("role_info.json")write_json_file(role_info_path, role_info)self.rc.memory.serialize(stg_path)  # serialize role's memory alone@classmethoddef deserialize(cls, stg_path: Path) -> "Role":"""stg_path = ./storage/team/environment/roles/{role_class}_{role_name}"""role_info_path = stg_path.joinpath("role_info.json")role_info = read_json_file(role_info_path)role_class_str = role_info.pop("role_class")module_name = role_info.pop("module_name")role_class = import_class(class_name=role_class_str, module_name=module_name)role = role_class(**role_info)  # initiate particular Rolerole.set_recovered(True)  # set True to make a tagrole_memory = Memory.deserialize(stg_path)role.set_memory(role_memory)return role

将角色对象序列化到文件中,以及从文件中反序列化出角色对象

将角色对象序列化到文件中用于实现持久化数据与不同平台之间共享

_init_actions

    def _init_actions(self, actions):self._reset()for idx, action in enumerate(actions):if not isinstance(action, Action):## 默认初始化i = action(name="", llm=self.llm)else:if self.is_human and not isinstance(action.llm, HumanProvider):logger.warning(f"is_human attribute does not take effect, "f"as Role's {str(action)} was initialized using LLM, "f"try passing in Action classes instead of initialized instances")i = actionself._init_action_system_message(i)self.actions.append(i)self.states.append(f"{idx}. {action}")

对于每个动作,它首先检查其是否已经是一个 Action 类的实例。如果不是,则默认以空字符串和当前角色的 llm(长期记忆)实例化一个新的动作对象

将初始化的动作对象添加到角色对象的动作列表中

将动作的描述添加到角色对象的状态列表中,格式为动作索引加动作描述

    def _set_react_mode(self, react_mode: str, max_react_loop: int = 1):"""Set strategy of the Role reacting to observed Message. Variation lies in howthis Role elects action to perform during the _think stage, especially if it is capable of multiple Actions.Args:react_mode (str): Mode for choosing action during the _think stage, can be one of:"react": standard think-act loop in the ReAct paper, alternating thinking and acting to solve the task, i.e. _think -> _act -> _think -> _act -> ...Use llm to select actions in _think dynamically;"by_order": switch action each time by order defined in _init_actions, i.e. _act (Action1) -> _act (Action2) -> ...;"plan_and_act": first plan, then execute an action sequence, i.e. _think (of a plan) -> _act -> _act -> ...Use llm to come up with the plan dynamically.Defaults to "react".max_react_loop (int): Maximum react cycles to execute, used to prevent the agent from reacting forever.Take effect only when react_mode is react, in which we use llm to choose actions, including termination.Defaults to 1, i.e. _think -> _act (-> return result and end)"""assert react_mode in RoleReactMode.values(), f"react_mode must be one of {RoleReactMode.values()}"self.rc.react_mode = react_modeif react_mode == RoleReactMode.REACT:self.rc.max_react_loop = max_react_loop

用于设置角色对观察到的消息做出反应的策略。它允许根据不同的反应模式选择角色应该执行的动作,这些模式如下:

  • react: 在思考阶段交替思考和执行动作,以解决任务。使用长期记忆(llm)动态选择思考阶段的动作。每个循环中包含一个思考和一个执行动作的步骤。max_react_loop 参数用于指定最大的反应循环次数,以防止角色永远执行。默认情况下,设置为1,即只执行一次思考和执行动作的循环,然后返回结果并结束。

  • by_order: 按照初始化动作列表中定义的顺序每次切换一个动作。在每个循环中,按照动作列表中的顺序依次执行动作。

  • plan_and_act: 先计划,然后执行一个动作序列。在思考阶段,角色制定一个计划,然后执行该计划中的动作序列。使用长期记忆(llm)动态生成计划。

_watch

    def _watch(self, actions: Iterable[Type[Action]] | Iterable[Action]):"""Watch Actions of interest. Role will select Messages caused by these Actions from its personal messagebuffer during _observe."""self.rc.watch = {any_to_str(t) for t in actions}# check RoleContext after adding watch actionsself.rc.check(self.role_id)

用于观察感兴趣的动作。在这个方法中,角色将选择由这些动作引起的消息,并在观察阶段从其个人消息缓冲区中选择这些消息。

在添加观察动作后,方法会调用 self.rc.check(self.role_id) 来检查角色上下文,以确保角色的一些设置已经完整


    def is_watch(self, caused_by: str):return caused_by in self.rc.watchdef subscribe(self, tags: Set[str]):"""Used to receive Messages with certain tags from the environment. Message will be put into personal messagebuffer to be further processed in _observe. By default, a Role subscribes Messages with a tag of its own nameor profile."""self.subscription = tagsif self.rc.env:  # According to the routing feature plan in Chapter 2.2.3.2 of RFC 113self.rc.env.set_subscription(self, self.subscription)

is_watch 方法用于检查某个消息是否是由角色感兴趣的动作引起的

subscribe 方法用于订阅特定标签的消息,这些消息将来自环境。消息将被放置到个人消息缓冲区中,以便在观察阶段进一步处理。默认情况下,角色会订阅具有自己名称或配置文件的标签的消息

_set_state

    def _set_state(self, state: int):"""Update the current state."""self.rc.state = statelogger.debug(f"actions={self.actions}, state={state}")self.rc.todo = self.actions[self.rc.state] if state >= 0 else None

state大于0时将动作设为待办操作


    def set_env(self, env: "Environment"):"""Set the environment in which the role works. The role can talk to the environment and can also receivemessages by observing."""self.rc.env = envif env:env.set_subscription(self, self.subscription)self.refresh_system_message()  # add env message to system message@propertydef action_count(self):"""Return number of action"""return len(self.actions)

set_env 将角色上下文中的环境属性 self.rc.env 更新为指定的环境对象。如果环境对象不为空,则会调用环境对象的 set_subscription 方法,将角色订阅的消息传递给环境。接着,它调用 refresh_system_message 方法,将环境消息添加到系统消息中

action_count返回角色的动作数量

_think

    async def _think(self) -> bool:"""Consider what to do and decide on the next course of action. Return false if nothing can be done."""if len(self.actions) == 1:# If there is only one action, then only this one can be performedself._set_state(0)return Trueif self.recovered and self.rc.state >= 0:self._set_state(self.rc.state)  # action to run from recovered stateself.set_recovered(False)  # avoid max_react_loop out of workreturn Trueprompt = self._get_prefix()prompt += STATE_TEMPLATE.format(history=self.rc.history,states="\n".join(self.states),n_states=len(self.states) - 1,previous_state=self.rc.state,)next_state = await self.llm.aask(prompt)next_state = extract_state_value_from_output(next_state)logger.debug(f"{prompt=}")if (not next_state.isdigit() and next_state != "-1") or int(next_state) not in range(-1, len(self.states)):logger.warning(f"Invalid answer of state, {next_state=}, will be set to -1")next_state = -1else:next_state = int(next_state)if next_state == -1:logger.info(f"End actions with {next_state=}")self._set_state(next_state)return True

先检查角色是否只有一个动作,如果是,则直接设置状态为 0,并返回 True

如果角色处于恢复状态且状态值大于等于 0,则将状态设置为恢复状态,并取消角色的恢复状态,以避免超出最大反应循环次数

(恢复状态通常意味着角色在某种情况下中断了之前的操作,可能是由于意外中断、系统故障或者其他原因导致的。当角色处于恢复状态时,它可能希望在继续执行操作之前回到之前的状态或者继续执行之前的操作)

然后,构建提示信息 prompt,包括角色的前缀、历史状态、当前可用状态等信息,并通过 llm.aask 方法向长期记忆模型(LLM)询问下一个状态。如果提取的状态值不是有效的数字或不在合法范围内,则将其设置为 -1

最后,根据得到的下一个状态值,调用 _set_state 方法设置角色的状态,并返回 True 表示已经完成思考和决策

_act

    async def _act(self) -> Message:logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")response = await self.rc.todo.run(self.rc.history)if isinstance(response, (ActionOutput, ActionNode)):msg = Message(content=response.content,instruct_content=response.instruct_content,role=self._setting,cause_by=self.rc.todo,sent_from=self,)elif isinstance(response, Message):msg = responseelse:msg = Message(content=response, role=self.profile, cause_by=self.rc.todo, sent_from=self)self.rc.memory.add(msg)return msg

执行动作,并将信息添加到角色的记忆中,返回Message

_observe

    async def _observe(self, ignore_memory=False) -> int:"""Prepare new messages for processing from the message buffer and other sources."""# Read unprocessed messages from the msg buffer.news = []if self.recovered:news = [self.latest_observed_msg] if self.latest_observed_msg else []if not news:news = self.rc.msg_buffer.pop_all()# Store the read messages in your own memory to prevent duplicate processing.old_messages = [] if ignore_memory else self.rc.memory.get()self.rc.memory.add_batch(news)# Filter out messages of interest.self.rc.news = [n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages]self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None  # record the latest observed msg# Design Rules:# If you need to further categorize Message objects, you can do so using the Message.set_meta function.# msg_buffer is a receiving buffer, avoid adding message data and operations to msg_buffer.news_text = [f"{i.role}: {i.content[:20]}..." for i in self.rc.news]if news_text:logger.debug(f"{self._setting} observed: {news_text}")return len(self.rc.news)

首先检查是否处于恢复状态,如果是,则将最新观察到的消息添加到要处理的消息列表中。如果不处于恢复状态,则从消息缓冲区中获取所有未处理的消息

然后,将读取的消息添加到角色的内存中,以防止重复处理

接下来,根据角色关注的动作或者消息接收者的名称,筛选出感兴趣的消息,并将其存储在 self.rc.news 属性中

最后,记录最新观察到的消息,并返回观察到的新消息数量


    def publish_message(self, msg):"""If the role belongs to env, then the role's messages will be broadcast to env"""if not msg:returnif not self.rc.env:# If env does not exist, do not publish the messagereturnself.rc.env.publish_message(msg)def put_message(self, message):"""Place the message into the Role object's private message buffer."""if not message:returnself.rc.msg_buffer.push(message)

_react

    async def _react(self) -> Message:"""Think first, then act, until the Role _think it is time to stop and requires no more todo.This is the standard think-act loop in the ReAct paper, which alternates thinking and acting in task solving, i.e. _think -> _act -> _think -> _act -> ...Use llm to select actions in _think dynamically"""actions_taken = 0rsp = Message(content="No actions taken yet", cause_by=Action)  # will be overwritten after Role _actwhile actions_taken < self.rc.max_react_loop:# thinkawait self._think()if self.rc.todo is None:break# actlogger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}")rsp = await self._act()  # 这个rsp是否需要publish_message?actions_taken += 1return rsp  # return output from the last action

_think -> _act -> _think -> _act -> ...

在每个循环中,角色会先调用 _think 方法来考虑下一步的行动。如果角色确定有待执行的行动(self.rc.todo 不为 None),则会执行行动(调用 _act 方法)。在执行完毕后,会更新响应消息(rsp),并累加已执行的行动次数

至于是否需要在 _act 方法中调用 publish_message,取决于具体的应用场景和需求。通常情况下,如果执行的行动会产生对环境或其他角色的影响,并且需要将这些影响通知给其他角色或者环境,那么就需要在 _act 方法中调用 publish_message 方法来发布消息

_act_by_order

    async def _act_by_order(self) -> Message:"""switch action each time by order defined in _init_actions, i.e. _act (Action1) -> _act (Action2) -> ..."""start_idx = self.rc.state if self.rc.state >= 0 else 0  # action to run from recovered statersp = Message(content="No actions taken yet")  # return default message if actions=[]for i in range(start_idx, len(self.states)):self._set_state(i)rsp = await self._act()return rsp  # return output from the last action

角色按照在 _init_actions 方法中定义的顺序执行行动。如果角色处于恢复状态(self.rc.state >= 0),则从恢复的状态开始执行行动,否则从第一个行动开始执行

_plan_and_act

    async def _plan_and_act(self) -> Message:"""first plan, then execute an action sequence, i.e. _think (of a plan) -> _act -> _act -> ... Use llm to come up with the plan dynamically."""# TODO: to be implementedreturn Message(content="")

接口,等用户自己写

react

    async def react(self) -> Message:"""Entry to one of three strategies by which Role reacts to the observed Message"""if self.rc.react_mode == RoleReactMode.REACT:rsp = await self._react()elif self.rc.react_mode == RoleReactMode.BY_ORDER:rsp = await self._act_by_order()elif self.rc.react_mode == RoleReactMode.PLAN_AND_ACT:rsp = await self._plan_and_act()self._set_state(state=-1)  # current reaction is complete, reset state to -1 and todo back to Nonereturn rsp

按照策略执行不同的反应模式

get_memories

    def get_memories(self, k=0) -> list[Message]:"""A wrapper to return the most recent k memories of this role, return all when k=0"""return self.rc.memory.get(k=k)

run

    async def run(self, with_message=None) -> Message | None:"""Observe, and think and act based on the results of the observation"""if with_message:msg = Noneif isinstance(with_message, str):msg = Message(content=with_message)elif isinstance(with_message, Message):msg = with_messageelif isinstance(with_message, list):msg = Message(content="\n".join(with_message))if not msg.cause_by:msg.cause_by = UserRequirementself.put_message(msg)if not await self._observe():# If there is no new information, suspend and waitlogger.debug(f"{self._setting}: no news. waiting.")returnrsp = await self.react()# Reset the next action to be taken.self.rc.todo = None# Send the response message to the Environment object to have it relay the message to the subscribers.self.publish_message(rsp)return rsp

角色首先观察当前环境的消息。如果传入了 with_message 参数,则将其转换为消息对象并放入角色的消息缓冲区中。接着,角色观察环境中的新消息,并根据观察结果进行反应,即调用 react 方法

如果没有观察到新的信息,角色将进入等待状态,暂停执行。如果观察到了新的信息,则执行反应,并将得到的响应消息发送给环境对象,让环境对象将消息传递给订阅者


    @propertydef is_idle(self) -> bool:"""If true, all actions have been executed."""return not self.rc.news and not self.rc.todo and self.rc.msg_buffer.empty()async def think(self) -> Action:"""The exported `think` function"""await self._observe()await self._think()return self.rc.todoasync def act(self) -> ActionOutput:"""The exported `act` function"""msg = await self._act()return ActionOutput(content=msg.content, instruct_content=msg.instruct_content)

todo

    @propertydef todo(self) -> str:"""AgentStore uses this attribute to display to the user what actions the current role should take."""if self.rc.todo:if self.rc.todo.desc:return self.rc.todo.descreturn any_to_name(self.rc.todo)if self.actions:return any_to_name(self.actions[0])return ""

如果 rc.todo 不为空,则返回该动作的描述(如果有的话),否则返回第一个操作的名称。如果没有任何操作可执行,则返回空字符串

这篇关于MetaGPT部分源码解读--Role的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/865231

相关文章

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

MCU7.keil中build产生的hex文件解读

1.hex文件大致解读 闲来无事,查看了MCU6.用keil新建项目的hex文件 用FlexHex打开 给我的第一印象是:经过软件的解释之后,发现这些数据排列地十分整齐 :02000F0080FE71:03000000020003F8:0C000300787FE4F6D8FD75810702000F3D:00000001FF 把解释后的数据当作十六进制来观察 1.每一行数据

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

poj 2976 分数规划二分贪心(部分对总体的贡献度) poj 3111

poj 2976: 题意: 在n场考试中,每场考试共有b题,答对的题目有a题。 允许去掉k场考试,求能达到的最高正确率是多少。 解析: 假设已知准确率为x,则每场考试对于准确率的贡献值为: a - b * x,将贡献值大的排序排在前面舍弃掉后k个。 然后二分x就行了。 代码: #include <iostream>#include <cstdio>#incl

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除

GPT系列之:GPT-1,GPT-2,GPT-3详细解读

一、GPT1 论文:Improving Language Understanding by Generative Pre-Training 链接:https://cdn.openai.com/research-covers/languageunsupervised/language_understanding_paper.pdf 启发点:生成loss和微调loss同时作用,让下游任务来适应预训