MetaGPT部分源码解读--Role

2024-03-31 20:12

本文主要是介绍MetaGPT部分源码解读--Role,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RoleReactMode

class RoleReactMode(str, Enum):REACT = "react"BY_ORDER = "by_order"PLAN_AND_ACT = "plan_and_act"@classmethoddef values(cls):return [item.value for item in cls]
  1. REACT:表示标准的反应模式,即在思考和行动之间交替,通过 LLN 选择动作。
  2. BY_ORDER:表示按顺序选择动作的模式,即每次执行一个动作,然后进行下一个动作的选择。
  3. PLAN_AND_ACT:表示首先进行规划,然后执行动作序列的模式。

通过定义 values() 方法,可以轻松地获取枚举类中所有常量的值。这个方法会返回一个列表,包含枚举类中所有常量的字符串值。

RoleContext

    model_config = ConfigDict(arbitrary_types_allowed=True)# # env exclude=True to avoid `RecursionError: maximum recursion depth exceeded in comparison`env: "Environment" = Field(default=None, exclude=True)  # # avoid circular import# TODO judge if ser&desermsg_buffer: MessageQueue = Field(default_factory=MessageQueue, exclude=True)  # Message Buffer with Asynchronous Updatesmemory: Memory = Field(default_factory=Memory)# long_term_memory: LongTermMemory = Field(default_factory=LongTermMemory)state: int = Field(default=-1)  # -1 indicates initial or termination state where todo is Nonetodo: Action = Field(default=None, exclude=True)watch: set[str] = Field(default_factory=set)news: list[Type[Message]] = Field(default=[], exclude=True)  # TODO not usedreact_mode: RoleReactMode = (RoleReactMode.REACT)  # see `Role._set_react_mode` for definitions of the following two attributesmax_react_loop: int = 1
  1. model_config: 用于配置模型的参数字典,允许其中包含任意类型的数据。

  2. env: 表示环境的字段,类型为"Environment",默认值为None。由于可能出现循环导入的问题,因此在序列化时需要排除该字段。

  3. msg_buffer: 表示消息缓冲区的字段,类型为MessageQueue,默认工厂函数为MessageQueue,用于异步更新消息。

  4. memory: 表示记忆的字段,类型为Memory,默认工厂函数为Memory,用于存储角色的记忆信息。

  5. state: 表示状态的字段,类型为int,默认值为-1,表示初始或终止状态,其中-1表示没有要执行的操作。

  6. todo: 表示要执行的操作的字段,类型为Action,默认值为None,用于存储要执行的下一步操作。

  7. watch: 表示观察的字段,类型为set[str],默认工厂函数为set,用于存储角色感兴趣的操作。

  8. news: 表示新闻的字段,类型为list[Type[Message]],默认值为空列表,用于存储角色观察到的新消息。

  9. react_mode: 表示反应模式的字段,类型为RoleReactMode,默认值为RoleReactMode.REACT,用于定义角色在反应观察到的消息时的策略。

  10. max_react_loop: 表示最大反应循环次数的字段,类型为int,默认值为1,用于控制角色在反应模式为RoleReactMode.REACT时的最大循环次数。

    def check(self, role_id: str):# if hasattr(CONFIG, "long_term_memory") and CONFIG.long_term_memory:#     self.long_term_memory.recover_memory(role_id, self)#     self.memory = self.long_term_memory  # use memory to act as long_term_memory for unify operationpass@propertydef important_memory(self) -> list[Message]:"""Retrieve information corresponding to the attention action."""return self.memory.get_by_actions(self.watch)@propertydef history(self) -> list[Message]:return self.memory.get()

important_memory

    def get_by_actions(self, actions: Set) -> list[Message]:"""Return all messages triggered by specified Actions"""rsp = []indices = any_to_str_set(actions)for action in indices:if action not in self.index:continuersp += self.index[action]return rsp
  1. 接收一个参数 actions,类型为集合(Set),其中包含了要查询的动作集合。

  2. 将传入的动作集合转换为字符串类型的集合,这是因为内部索引是以字符串形式存储的。

  3. 遍历转换后的动作集合,对每个动作进行处理:

    • 如果该动作不在内部索引中,则跳过该动作。
    • 如果该动作在内部索引中,则将与该动作相关联的消息添加到结果列表中。
  4. 返回包含了所有相关消息的结果列表。

history

    def get(self, k=0) -> list[Message]:"""Return the most recent k memories, return all when k=0"""return self.storage[-k:]

返回storage的最近k个消息

Role

    model_config = ConfigDict(arbitrary_types_allowed=True, exclude=["llm"])name: str = ""profile: str = ""goal: str = ""constraints: str = ""desc: str = ""is_human: bool = Falsellm: BaseLLM = Field(default_factory=LLM, exclude=True)  # Each role has its own LLM, use different system messagerole_id: str = ""states: list[str] = []actions: list[SerializeAsAny[Action]] = Field(default=[], validate_default=True)rc: RoleContext = Field(default_factory=RoleContext)subscription: set[str] = set()# builtin variablesrecovered: bool = False  # to tag if a recovered rolelatest_observed_msg: Optional[Message] = None  # record the latest observed message when interrupted__hash__ = object.__hash__  # support Role as hashable type in `Environment.members`
  1. model_config: 这是一个 ConfigDict 对象,用于配置模型的参数。arbitrary_types_allowed=True 表示允许任意类型的值,exclude=["llm"] 表示在序列化时排除 llm 属性。

  2. name, profile, goal, constraints, desc, is_human: 这些是类的属性,分别表示角色的名称、概要、目标、约束、描述和是否为人类。它们都有默认值,类型分别为字符串和布尔值。

  3. llm: 这是一个属性,表示每个角色具有自己的 BaseLLM 对象。默认使用 LLM 的默认函数创建,并在序列化时排除。

  4. role_id, states, actions, rc, subscription, recovered, latest_observed_msg: 这些也是类的属性,分别表示角色的ID、状态列表、动作列表、角色上下文、订阅集合、是否已恢复、最新观察到的消息等。它们都有默认值,类型不同。

  5. __hash__: 这是类的特殊属性,用于定义对象的哈希值,以支持 Role 作为可哈希类型在环境的成员中使用。Role 对象可以作为集合的元素或字典的键,而不会引发错误或产生意外结果

@model_validator(mode="after")def check_subscription(self):if not self.subscription:self.subscription = {any_to_str(self), self.name} if self.name else {any_to_str(self)}return self

检查角色对象的订阅内容是否为空,如果为空,则设置默认的订阅内容。如果角色对象的名称不为空,则将角色对象的名称作为订阅内容之一,否则将角色对象本身转换为字符串后作为订阅内容之一。

__init__

    def __init__(self, **data: Any):# --- avoid PydanticUndefinedAnnotation name 'Environment' is not defined #from metagpt.environment import EnvironmentEnvironment# ------Role.model_rebuild()super().__init__(**data)self.llm.system_prompt = self._get_prefix()self._watch(data.get("watch") or [UserRequirement])
@classmethoddef model_rebuild(cls,*,force: bool = False,raise_errors: bool = True,_parent_namespace_depth: int = 2,_types_namespace: dict[str, Any] | None = None,) -> bool | None:"""Try to rebuild the pydantic-core schema for the model.This may be necessary when one of the annotations is a ForwardRef which could not be resolved duringthe initial attempt to build the schema, and automatic rebuilding fails.Args:force: Whether to force the rebuilding of the model schema, defaults to `False`.raise_errors: Whether to raise errors, defaults to `True`._parent_namespace_depth: The depth level of the parent namespace, defaults to 2._types_namespace: The types namespace, defaults to `None`.Returns:Returns `None` if the schema is already "complete" and rebuilding was not required.If rebuilding _was_ required, returns `True` if rebuilding was successful, otherwise `False`."""if not force and cls.__pydantic_complete__:return Noneelse:if '__pydantic_core_schema__' in cls.__dict__:delattr(cls, '__pydantic_core_schema__')  # delete cached value to ensure full rebuild happensif _types_namespace is not None:types_namespace: dict[str, Any] | None = _types_namespace.copy()else:if _parent_namespace_depth > 0:frame_parent_ns = _typing_extra.parent_frame_namespace(parent_depth=_parent_namespace_depth) or {}cls_parent_ns = (_model_construction.unpack_lenient_weakvaluedict(cls.__pydantic_parent_namespace__) or {})types_namespace = {**cls_parent_ns, **frame_parent_ns}cls.__pydantic_parent_namespace__ = _model_construction.build_lenient_weakvaluedict(types_namespace)else:types_namespace = _model_construction.unpack_lenient_weakvaluedict(cls.__pydantic_parent_namespace__)types_namespace = _typing_extra.get_cls_types_namespace(cls, types_namespace)# manually override defer_build so complete_model_class doesn't skip building the model againconfig = {**cls.model_config, 'defer_build': False}return _model_construction.complete_model_class(cls,cls.__name__,_config.ConfigWrapper(config, check=False),raise_errors=raise_errors,types_namespace=types_namespace,)
  • force: 这个参数是一个布尔值,默认为 False。如果设置为 True,则强制重新构建模型模式,即使模式已经是完整的也会重新构建。

  • raise_errors: 这个参数也是一个布尔值,默认为 True。如果设置为 True,则在出现错误时会抛出异常;如果设置为 False,则在出现错误时会返回错误信息但不会抛出异常。

  • _parent_namespace_depth: 这是父命名空间的深度级别,默认为 2。这个参数用于构建类型命名空间。

  • _types_namespace: 这是一个字典类型,用于存储类型命名空间。默认为 None,表示没有传入类型命名空间。

    def _get_prefix(self):"""Get the role prefix"""if self.desc:return self.descprefix = PREFIX_TEMPLATE.format(**{"profile": self.profile, "name": self.name, "goal": self.goal})if self.constraints:prefix += CONSTRAINT_TEMPLATE.format(**{"constraints": self.constraints})if self.rc.env and self.rc.env.desc:other_role_names = ", ".join(self.rc.env.role_names())env_desc = f"You are in {self.rc.env.desc} with roles({other_role_names})."prefix += env_descreturn prefix
  1. 如果角色的描述 (desc) 不为空,则直接返回描述。
  2. 否则,根据一定的模板格式化角色的配置、名称和目标信息,生成前缀字符串。
  3. 如果角色有约束 (constraints),则将约束信息也添加到前缀字符串中。
  4. 如果角色的上下文环境 (rc.env) 不为空并且环境描述 (env.desc) 也不为空,则获取环境描述和环境中其他角色的名称,并添加到前缀字符串中。
  5. 最后返回生成的前缀字符串。

    def serialize(self, stg_path: Path = None):stg_path = (SERDESER_PATH.joinpath(f"team/environment/roles/{self.__class__.__name__}_{self.name}")if stg_path is Noneelse stg_path)role_info = self.model_dump(exclude={"rc": {"memory": True, "msg_buffer": True}, "llm": True})role_info.update({"role_class": self.__class__.__name__, "module_name": self.__module__})role_info_path = stg_path.joinpath("role_info.json")write_json_file(role_info_path, role_info)self.rc.memory.serialize(stg_path)  # serialize role's memory alone@classmethoddef deserialize(cls, stg_path: Path) -> "Role":"""stg_path = ./storage/team/environment/roles/{role_class}_{role_name}"""role_info_path = stg_path.joinpath("role_info.json")role_info = read_json_file(role_info_path)role_class_str = role_info.pop("role_class")module_name = role_info.pop("module_name")role_class = import_class(class_name=role_class_str, module_name=module_name)role = role_class(**role_info)  # initiate particular Rolerole.set_recovered(True)  # set True to make a tagrole_memory = Memory.deserialize(stg_path)role.set_memory(role_memory)return role

将角色对象序列化到文件中,以及从文件中反序列化出角色对象

将角色对象序列化到文件中用于实现持久化数据与不同平台之间共享

_init_actions

    def _init_actions(self, actions):self._reset()for idx, action in enumerate(actions):if not isinstance(action, Action):## 默认初始化i = action(name="", llm=self.llm)else:if self.is_human and not isinstance(action.llm, HumanProvider):logger.warning(f"is_human attribute does not take effect, "f"as Role's {str(action)} was initialized using LLM, "f"try passing in Action classes instead of initialized instances")i = actionself._init_action_system_message(i)self.actions.append(i)self.states.append(f"{idx}. {action}")

对于每个动作,它首先检查其是否已经是一个 Action 类的实例。如果不是,则默认以空字符串和当前角色的 llm(长期记忆)实例化一个新的动作对象

将初始化的动作对象添加到角色对象的动作列表中

将动作的描述添加到角色对象的状态列表中,格式为动作索引加动作描述

    def _set_react_mode(self, react_mode: str, max_react_loop: int = 1):"""Set strategy of the Role reacting to observed Message. Variation lies in howthis Role elects action to perform during the _think stage, especially if it is capable of multiple Actions.Args:react_mode (str): Mode for choosing action during the _think stage, can be one of:"react": standard think-act loop in the ReAct paper, alternating thinking and acting to solve the task, i.e. _think -> _act -> _think -> _act -> ...Use llm to select actions in _think dynamically;"by_order": switch action each time by order defined in _init_actions, i.e. _act (Action1) -> _act (Action2) -> ...;"plan_and_act": first plan, then execute an action sequence, i.e. _think (of a plan) -> _act -> _act -> ...Use llm to come up with the plan dynamically.Defaults to "react".max_react_loop (int): Maximum react cycles to execute, used to prevent the agent from reacting forever.Take effect only when react_mode is react, in which we use llm to choose actions, including termination.Defaults to 1, i.e. _think -> _act (-> return result and end)"""assert react_mode in RoleReactMode.values(), f"react_mode must be one of {RoleReactMode.values()}"self.rc.react_mode = react_modeif react_mode == RoleReactMode.REACT:self.rc.max_react_loop = max_react_loop

用于设置角色对观察到的消息做出反应的策略。它允许根据不同的反应模式选择角色应该执行的动作,这些模式如下:

  • react: 在思考阶段交替思考和执行动作,以解决任务。使用长期记忆(llm)动态选择思考阶段的动作。每个循环中包含一个思考和一个执行动作的步骤。max_react_loop 参数用于指定最大的反应循环次数,以防止角色永远执行。默认情况下,设置为1,即只执行一次思考和执行动作的循环,然后返回结果并结束。

  • by_order: 按照初始化动作列表中定义的顺序每次切换一个动作。在每个循环中,按照动作列表中的顺序依次执行动作。

  • plan_and_act: 先计划,然后执行一个动作序列。在思考阶段,角色制定一个计划,然后执行该计划中的动作序列。使用长期记忆(llm)动态生成计划。

_watch

    def _watch(self, actions: Iterable[Type[Action]] | Iterable[Action]):"""Watch Actions of interest. Role will select Messages caused by these Actions from its personal messagebuffer during _observe."""self.rc.watch = {any_to_str(t) for t in actions}# check RoleContext after adding watch actionsself.rc.check(self.role_id)

用于观察感兴趣的动作。在这个方法中,角色将选择由这些动作引起的消息,并在观察阶段从其个人消息缓冲区中选择这些消息。

在添加观察动作后,方法会调用 self.rc.check(self.role_id) 来检查角色上下文,以确保角色的一些设置已经完整


    def is_watch(self, caused_by: str):return caused_by in self.rc.watchdef subscribe(self, tags: Set[str]):"""Used to receive Messages with certain tags from the environment. Message will be put into personal messagebuffer to be further processed in _observe. By default, a Role subscribes Messages with a tag of its own nameor profile."""self.subscription = tagsif self.rc.env:  # According to the routing feature plan in Chapter 2.2.3.2 of RFC 113self.rc.env.set_subscription(self, self.subscription)

is_watch 方法用于检查某个消息是否是由角色感兴趣的动作引起的

subscribe 方法用于订阅特定标签的消息,这些消息将来自环境。消息将被放置到个人消息缓冲区中,以便在观察阶段进一步处理。默认情况下,角色会订阅具有自己名称或配置文件的标签的消息

_set_state

    def _set_state(self, state: int):"""Update the current state."""self.rc.state = statelogger.debug(f"actions={self.actions}, state={state}")self.rc.todo = self.actions[self.rc.state] if state >= 0 else None

state大于0时将动作设为待办操作


    def set_env(self, env: "Environment"):"""Set the environment in which the role works. The role can talk to the environment and can also receivemessages by observing."""self.rc.env = envif env:env.set_subscription(self, self.subscription)self.refresh_system_message()  # add env message to system message@propertydef action_count(self):"""Return number of action"""return len(self.actions)

set_env 将角色上下文中的环境属性 self.rc.env 更新为指定的环境对象。如果环境对象不为空,则会调用环境对象的 set_subscription 方法,将角色订阅的消息传递给环境。接着,它调用 refresh_system_message 方法,将环境消息添加到系统消息中

action_count返回角色的动作数量

_think

    async def _think(self) -> bool:"""Consider what to do and decide on the next course of action. Return false if nothing can be done."""if len(self.actions) == 1:# If there is only one action, then only this one can be performedself._set_state(0)return Trueif self.recovered and self.rc.state >= 0:self._set_state(self.rc.state)  # action to run from recovered stateself.set_recovered(False)  # avoid max_react_loop out of workreturn Trueprompt = self._get_prefix()prompt += STATE_TEMPLATE.format(history=self.rc.history,states="\n".join(self.states),n_states=len(self.states) - 1,previous_state=self.rc.state,)next_state = await self.llm.aask(prompt)next_state = extract_state_value_from_output(next_state)logger.debug(f"{prompt=}")if (not next_state.isdigit() and next_state != "-1") or int(next_state) not in range(-1, len(self.states)):logger.warning(f"Invalid answer of state, {next_state=}, will be set to -1")next_state = -1else:next_state = int(next_state)if next_state == -1:logger.info(f"End actions with {next_state=}")self._set_state(next_state)return True

先检查角色是否只有一个动作,如果是,则直接设置状态为 0,并返回 True

如果角色处于恢复状态且状态值大于等于 0,则将状态设置为恢复状态,并取消角色的恢复状态,以避免超出最大反应循环次数

(恢复状态通常意味着角色在某种情况下中断了之前的操作,可能是由于意外中断、系统故障或者其他原因导致的。当角色处于恢复状态时,它可能希望在继续执行操作之前回到之前的状态或者继续执行之前的操作)

然后,构建提示信息 prompt,包括角色的前缀、历史状态、当前可用状态等信息,并通过 llm.aask 方法向长期记忆模型(LLM)询问下一个状态。如果提取的状态值不是有效的数字或不在合法范围内,则将其设置为 -1

最后,根据得到的下一个状态值,调用 _set_state 方法设置角色的状态,并返回 True 表示已经完成思考和决策

_act

    async def _act(self) -> Message:logger.info(f"{self._setting}: to do {self.rc.todo}({self.rc.todo.name})")response = await self.rc.todo.run(self.rc.history)if isinstance(response, (ActionOutput, ActionNode)):msg = Message(content=response.content,instruct_content=response.instruct_content,role=self._setting,cause_by=self.rc.todo,sent_from=self,)elif isinstance(response, Message):msg = responseelse:msg = Message(content=response, role=self.profile, cause_by=self.rc.todo, sent_from=self)self.rc.memory.add(msg)return msg

执行动作,并将信息添加到角色的记忆中,返回Message

_observe

    async def _observe(self, ignore_memory=False) -> int:"""Prepare new messages for processing from the message buffer and other sources."""# Read unprocessed messages from the msg buffer.news = []if self.recovered:news = [self.latest_observed_msg] if self.latest_observed_msg else []if not news:news = self.rc.msg_buffer.pop_all()# Store the read messages in your own memory to prevent duplicate processing.old_messages = [] if ignore_memory else self.rc.memory.get()self.rc.memory.add_batch(news)# Filter out messages of interest.self.rc.news = [n for n in news if (n.cause_by in self.rc.watch or self.name in n.send_to) and n not in old_messages]self.latest_observed_msg = self.rc.news[-1] if self.rc.news else None  # record the latest observed msg# Design Rules:# If you need to further categorize Message objects, you can do so using the Message.set_meta function.# msg_buffer is a receiving buffer, avoid adding message data and operations to msg_buffer.news_text = [f"{i.role}: {i.content[:20]}..." for i in self.rc.news]if news_text:logger.debug(f"{self._setting} observed: {news_text}")return len(self.rc.news)

首先检查是否处于恢复状态,如果是,则将最新观察到的消息添加到要处理的消息列表中。如果不处于恢复状态,则从消息缓冲区中获取所有未处理的消息

然后,将读取的消息添加到角色的内存中,以防止重复处理

接下来,根据角色关注的动作或者消息接收者的名称,筛选出感兴趣的消息,并将其存储在 self.rc.news 属性中

最后,记录最新观察到的消息,并返回观察到的新消息数量


    def publish_message(self, msg):"""If the role belongs to env, then the role's messages will be broadcast to env"""if not msg:returnif not self.rc.env:# If env does not exist, do not publish the messagereturnself.rc.env.publish_message(msg)def put_message(self, message):"""Place the message into the Role object's private message buffer."""if not message:returnself.rc.msg_buffer.push(message)

_react

    async def _react(self) -> Message:"""Think first, then act, until the Role _think it is time to stop and requires no more todo.This is the standard think-act loop in the ReAct paper, which alternates thinking and acting in task solving, i.e. _think -> _act -> _think -> _act -> ...Use llm to select actions in _think dynamically"""actions_taken = 0rsp = Message(content="No actions taken yet", cause_by=Action)  # will be overwritten after Role _actwhile actions_taken < self.rc.max_react_loop:# thinkawait self._think()if self.rc.todo is None:break# actlogger.debug(f"{self._setting}: {self.rc.state=}, will do {self.rc.todo}")rsp = await self._act()  # 这个rsp是否需要publish_message?actions_taken += 1return rsp  # return output from the last action

_think -> _act -> _think -> _act -> ...

在每个循环中,角色会先调用 _think 方法来考虑下一步的行动。如果角色确定有待执行的行动(self.rc.todo 不为 None),则会执行行动(调用 _act 方法)。在执行完毕后,会更新响应消息(rsp),并累加已执行的行动次数

至于是否需要在 _act 方法中调用 publish_message,取决于具体的应用场景和需求。通常情况下,如果执行的行动会产生对环境或其他角色的影响,并且需要将这些影响通知给其他角色或者环境,那么就需要在 _act 方法中调用 publish_message 方法来发布消息

_act_by_order

    async def _act_by_order(self) -> Message:"""switch action each time by order defined in _init_actions, i.e. _act (Action1) -> _act (Action2) -> ..."""start_idx = self.rc.state if self.rc.state >= 0 else 0  # action to run from recovered statersp = Message(content="No actions taken yet")  # return default message if actions=[]for i in range(start_idx, len(self.states)):self._set_state(i)rsp = await self._act()return rsp  # return output from the last action

角色按照在 _init_actions 方法中定义的顺序执行行动。如果角色处于恢复状态(self.rc.state >= 0),则从恢复的状态开始执行行动,否则从第一个行动开始执行

_plan_and_act

    async def _plan_and_act(self) -> Message:"""first plan, then execute an action sequence, i.e. _think (of a plan) -> _act -> _act -> ... Use llm to come up with the plan dynamically."""# TODO: to be implementedreturn Message(content="")

接口,等用户自己写

react

    async def react(self) -> Message:"""Entry to one of three strategies by which Role reacts to the observed Message"""if self.rc.react_mode == RoleReactMode.REACT:rsp = await self._react()elif self.rc.react_mode == RoleReactMode.BY_ORDER:rsp = await self._act_by_order()elif self.rc.react_mode == RoleReactMode.PLAN_AND_ACT:rsp = await self._plan_and_act()self._set_state(state=-1)  # current reaction is complete, reset state to -1 and todo back to Nonereturn rsp

按照策略执行不同的反应模式

get_memories

    def get_memories(self, k=0) -> list[Message]:"""A wrapper to return the most recent k memories of this role, return all when k=0"""return self.rc.memory.get(k=k)

run

    async def run(self, with_message=None) -> Message | None:"""Observe, and think and act based on the results of the observation"""if with_message:msg = Noneif isinstance(with_message, str):msg = Message(content=with_message)elif isinstance(with_message, Message):msg = with_messageelif isinstance(with_message, list):msg = Message(content="\n".join(with_message))if not msg.cause_by:msg.cause_by = UserRequirementself.put_message(msg)if not await self._observe():# If there is no new information, suspend and waitlogger.debug(f"{self._setting}: no news. waiting.")returnrsp = await self.react()# Reset the next action to be taken.self.rc.todo = None# Send the response message to the Environment object to have it relay the message to the subscribers.self.publish_message(rsp)return rsp

角色首先观察当前环境的消息。如果传入了 with_message 参数,则将其转换为消息对象并放入角色的消息缓冲区中。接着,角色观察环境中的新消息,并根据观察结果进行反应,即调用 react 方法

如果没有观察到新的信息,角色将进入等待状态,暂停执行。如果观察到了新的信息,则执行反应,并将得到的响应消息发送给环境对象,让环境对象将消息传递给订阅者


    @propertydef is_idle(self) -> bool:"""If true, all actions have been executed."""return not self.rc.news and not self.rc.todo and self.rc.msg_buffer.empty()async def think(self) -> Action:"""The exported `think` function"""await self._observe()await self._think()return self.rc.todoasync def act(self) -> ActionOutput:"""The exported `act` function"""msg = await self._act()return ActionOutput(content=msg.content, instruct_content=msg.instruct_content)

todo

    @propertydef todo(self) -> str:"""AgentStore uses this attribute to display to the user what actions the current role should take."""if self.rc.todo:if self.rc.todo.desc:return self.rc.todo.descreturn any_to_name(self.rc.todo)if self.actions:return any_to_name(self.actions[0])return ""

如果 rc.todo 不为空,则返回该动作的描述(如果有的话),否则返回第一个操作的名称。如果没有任何操作可执行,则返回空字符串

这篇关于MetaGPT部分源码解读--Role的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/865231

相关文章

MySQL中时区参数time_zone解读

《MySQL中时区参数time_zone解读》MySQL时区参数time_zone用于控制系统函数和字段的DEFAULTCURRENT_TIMESTAMP属性,修改时区可能会影响timestamp类型... 目录前言1.时区参数影响2.如何设置3.字段类型选择总结前言mysql 时区参数 time_zon

MySQL中的锁和MVCC机制解读

《MySQL中的锁和MVCC机制解读》MySQL事务、锁和MVCC机制是确保数据库操作原子性、一致性和隔离性的关键,事务必须遵循ACID原则,锁的类型包括表级锁、行级锁和意向锁,MVCC通过非锁定读和... 目录mysql的锁和MVCC机制事务的概念与ACID特性锁的类型及其工作机制锁的粒度与性能影响多版本

Redis过期键删除策略解读

《Redis过期键删除策略解读》Redis通过惰性删除策略和定期删除策略来管理过期键,惰性删除策略在键被访问时检查是否过期并删除,节省CPU开销但可能导致过期键滞留,定期删除策略定期扫描并删除过期键,... 目录1.Redis使用两种不同的策略来删除过期键,分别是惰性删除策略和定期删除策略1.1惰性删除策略

Redis与缓存解读

《Redis与缓存解读》文章介绍了Redis作为缓存层的优势和缺点,并分析了六种缓存更新策略,包括超时剔除、先删缓存再更新数据库、旁路缓存、先更新数据库再删缓存、先更新数据库再更新缓存、读写穿透和异步... 目录缓存缓存优缺点缓存更新策略超时剔除先删缓存再更新数据库旁路缓存(先更新数据库,再删缓存)先更新数

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

C#反射编程之GetConstructor()方法解读

《C#反射编程之GetConstructor()方法解读》C#中Type类的GetConstructor()方法用于获取指定类型的构造函数,该方法有多个重载版本,可以根据不同的参数获取不同特性的构造函... 目录C# GetConstructor()方法有4个重载以GetConstructor(Type[]

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

MCU7.keil中build产生的hex文件解读

1.hex文件大致解读 闲来无事,查看了MCU6.用keil新建项目的hex文件 用FlexHex打开 给我的第一印象是:经过软件的解释之后,发现这些数据排列地十分整齐 :02000F0080FE71:03000000020003F8:0C000300787FE4F6D8FD75810702000F3D:00000001FF 把解释后的数据当作十六进制来观察 1.每一行数据

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get