基于MongoDB的AI代理记忆层
记忆是处理复杂用户交互的 AI 代理的关键组件。没有记忆,代理会像一个在对话之间忘记一切的令人沮丧的同事。但记忆不仅仅是回忆文字——它是代理的经验。随着时间的推移,代理可以从交互中学习,优化其响应,并将知识转移到新任务上——就像一个训练有素的员工。
在这篇博客中,我将介绍如何使用PydanticAI 和 MongoDB 实现结构化的记忆系统,强调实际操作方面而不是理论上的记忆概念。
1、概述:AI代理的记忆
在构建基于 AI 的应用程序时,短期和长期记忆使代理能够:
- 回忆对话中的过去交互。
- 存储并检索用户的偏好。
- 通过记住之前的讨论来改善用户体验。
- 维护程序性知识以增强决策能力。
- 随着时间的推移进化并转移其专业知识,就像一个训练有素的员工一样。
我把记忆分为以下组件:
- 用户资料
存储持久性细节(例如姓名、年龄、兴趣、对话偏好)。 - 记忆
从过去的交互中提取的单个数据点,分类为事实、偏好或经验。 - 代理经验
从交互中积累的知识,告知代理的响应策略,并将其传递到新的环境中。 - 任务历史记录
跟踪工作流程进度、用户与代理之间的交互以及中间结果(例如对话记录)。
2、MongoDB 文档模型
以下是这些概念如何映射到我们的 MongoDB 文档:
class Profile(BaseModel):
name: str = ""
age: int | None = None
interests: list[str] = Field(default_factory=list)
home: str = Field(default="", description="描述用户家乡/社区等")
occupation: str = Field(default="", description="用户当前的职业或专业")
conversation_preferences: list[str] = Field(
default_factory=list,
description="用户偏好的对话风格列表,他们想避免的话题等",
)
def __str__(self) -> str:
res = ""
if any(v for v in self.model_dump().values()):
res += f"<user_profile>\n{self.model_dump_json()}\n</user_profile>"
return res.strip()
@classmethod
def user_prompt(cls) -> str:
return "根据现有信息创建一个详细的用户资料。如果存在现有的资料,请包含在 <user_specific_experience> 中。尽量添加新内容而不是覆盖现有内容。除非当然有必要覆盖现有内容。例如,如果用户说他们25岁了,而资料中显示他们20岁了,那么用新信息覆盖资料是有意义的。"
class Memory(BaseModel):
"保存用户与你分享的重要记忆,以便以后回忆。"
id: SkipJsonSchema[UUID4] = Field(default_factory=uuid4)
created_at: SkipJsonSchema[datetime] = Field(default_factory=datetime.now)
context: str = Field(
description="这个记忆可能相关的环境或情况。包括任何限制条件来上下文化记忆。例如,如果用户分享了一个偏好,注意它只在某些情况下适用(例如,'仅限工作场合')。添加任何其他相关‘元’细节,以帮助充分理解何时以及如何使用这个记忆。"
)
category: str = Field(description="记忆类别(例如,'偏好'、'事实'、'经验')")
content: str = Field(description="被记住的具体信息、偏好或事件。")
superseded_ids: list[str] = Field(
default_factory=list, description="这个记忆明确取代的记忆ID"
)
def __str__(self) -> str:
return self.model_dump_json(exclude={"superseded_ids"})
@classmethod
def user_prompt(cls) -> str:
return """
分析对话以识别应在未来互动中记住的重要信息。重点关注:
1. 个人详情与偏好:
- 表达的偏好、喜欢和不喜欢
- 个人信息背景
- 职业或教育细节
- 提到的重要关系
2. 上下文信息:
- 时间敏感信息(即将发生的事件、截止日期)
- 地理位置特定的详细信息
- 当前项目或目标
- 最近分享的经历
3. 交互模式:
- 沟通风格偏好
- 他们喜欢讨论的主题
- 应避免或谨慎处理的主题
- 他们使用的特定术语或行话
4. 之前的承诺:
- 承诺的后续跟进或延续
- 未完成的讨论
- 表达的未来互动意图
对于每个识别的记忆:
- 包括关于何时/如何使用它的相关信息
- 注意任何时间限制或条件
- 考虑它如何影响未来的互动
当创建更新现有信息的记忆时:
- 如果可以访问 <user_specific_experience> 中的以前记忆,检查是否有任何新信息与它们矛盾或更新它们
- 在 `superseded_ids` 字段中包含任何被取代的记忆ID
- 示例:如果用户之前说他们住在纽约(记忆ID: abc-123),但现在提到搬到波士顿,
创建一个新的记忆,其中 superseded_ids=["abc-123"]
- 只生成新的记忆,旧的记忆将基于 `superseded_ids` 字段自动被覆盖。
返回一个结构化的记忆列表,每个记忆都有清晰的上下文、类别和内容。
优先考虑那些有助于跨会话保持对话连续性的信息。
""".strip()
class AgentExperience(BaseModel):
procedural_knowledge: str = Field(
default="", description="在代理领域内处理任务的累积理解"
)
common_scenarios: list[str] = Field(
description="经常遇到的情况及其典型上下文", default_factory=list
)
effective_strategies: list[str] = Field(
description="已被证明有效的方法和策略", default_factory=list
)
known_pitfalls: list[str] = Field(
description="常见的挑战、边缘案例及应对方法", default_factory=list
)
tool_patterns: list[str] = Field(
description="有效使用不同工具的方式,按工具名称组织", default_factory=list
)
heuristics: list[str] = Field(
description="从经验中产生的规则和决策指南",
default_factory=list,
)
user_feedback: list[str] = Field(
description="用户不满意代理响应时的反馈。这有助于改进代理的技术技能和行为。因此,基本的用户回应在这里不适用。",
default_factory=list,
)
improvement_areas: list[str] = Field(
description="确定的优化或改进区域。查看用户反馈也可以帮助识别改进区域。",
default_factory=list,
)
@classmethod
def user_prompt(cls) -> str:
return """
回顾这次互动并更新代理的累积经验,重点关注适用于所有用户和会话的一般模式和学习:
1. 知识演化:
- 收获了哪些普遍受益于所有用户的领域见解?
- 哪些通用模式或反模式出现?
- 有哪些策略在用户上下文中证明有效?
2. 模式识别:
- 这次互动代表了哪些常见场景或用例?
- 哪些工具使用模式普遍有效?
- 哪些决策原则可以广泛适用?
3. 启发式开发:
- 从这次经验中可以得出哪些通用规则?
- 如何提炼现有的启发式方法使其更普遍适用?
- 有哪些一致影响成功的上下文因素?
将此经验整合到现有知识中 <agent_experience>:
- 关注可能在不同用户中重复出现的模式
- 开发用户无关的启发式方法
- 记录在一般场景中有效的工具使用模式
- 识别所有用户都会受益的改进区域
重要提示:
- 排除用户特定的详细信息或偏好
- 专注于普遍适用的技术和程序知识
- 抓住一般原则而不是具体实例
- 通过避免任何可识别的个人信息来维护隐私
重点是建立一个稳健且不断发展的知识库,随着时间的推移提高代理对所有用户的有效性。
记住这是累积的经验——不要覆盖现有知识,而是增强和完善它。
""".strip()
# ---------- MongoDB 文档 ----------
def validate_memories(memories: list[Memory]) -> list[Memory]:
if not memories:
return []
superseded_ids = set()
for memory in memories:
superseded_ids.update(memory.superseded_ids)
memory_dict = {}
for memory in memories:
if str(memory.id) not in superseded_ids:
memory_dict[str(memory.id)] = Memory(**memory.model_dump(exclude={"superseded_ids"}))
return sorted(memory_dict.values(), key=lambda x: x.created_at)
class User(Document):
"""存储用户资料和累积记忆的用户文档"""
profile: Profile = Field(default_factory=Profile)
memories: list[Memory] = Field(default_factory=list)
class Settings:
name = "users"
validate_on_save = True
def __str__(self) -> str:
res = str(self.profile)
if self.memories:
mems = "\n\n".join([str(m) for m in self.memories])
res += f"\n\n<memories>\n{mems}\n</memories>\n\n"
return res.strip()
@field_validator("memories")
@classmethod
def validate_memories(cls, v: list[Memory]) -> list[Memory]:
return validate_memories(v)
def update_from_generated_user(self, generated_user: GeneratedUser) -> None:
self.profile = generated_user.profile or self.profile
self.memories.extend(generated_user.memories)
class Agent(Document):
"""存储配置和累积经验的代理文档"""
name: Annotated[str, Indexed(unique=True)]
model: KnownModelName
description: str = ""
system_prompt: str = ""
experience: AgentExperience = Field(default_factory=AgentExperience)
class Settings:
name = "agents"
validate_on_save = True
class Task(Document):
"""跟踪工作流程进度的任务文档"""
user: Link[User]
agent: Link[Agent]
status: TaskStatus = TaskStatus.CREATED
message_history: list[_messages.ModelMessage | dict[str, Any]] = Field(default_factory=list)
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
class Settings:
name = "tasks"
indexes = ["user._id", "agent.name", "status", "created_at"]
validate_on_save = True
def experience_str(self) -> str:
return (
f"<agent_experience>\n{self.agent.experience.model_dump_json()}\n</agent_experience>\n\n" # type: ignore
f"<user_specific_experience>\n{self.user}\n</user_specific_experience>\n\n"
)
3、记忆更新与流程
以下是完整的流程,包括更新逻辑。每一步都展示了用户和代理数据如何更新,以及如何将所有内容集成到动态 system_prompt
中,以便将来任务使用。
3.1 用户发起对话
在 MongoDB 中创建一个新的 Task
实例,链接用户和代理。我们在此处存储 message_history
作为对话展开的一部分。
3.2 代理检索先前经验
当代理运行时,它加载任何现有的用户详细信息和代理经验。这将被注入到动态 system_prompt
中。
from pydantic_ai import Agent, RunContext
task_assistant = Agent(
name="task_assistant",
model="google-gla:gemini-1.5-flash",
system_prompt="你是一个任务助手。使用你的存储知识来帮助。",
deps_type=Task
)
@task_assistant.system_prompt(dynamic=True)
def system_prompt(ctx: RunContext[Task]) -> str:
# 步骤 #2:检索先前的用户和代理数据
return ctx.deps.experience_str()
3.3 代理交互;message_history 增长
用户消息和代理响应追加到 message_history
。记忆更新被排队但尚未处理。
3.4 记忆代理处理对话
我们有一个独立的代理(通常称为 memory_agent
),其工作是分析对话并生成对用户资料和记忆以及代理的经验的更新。这发生在对话之后。
关键洞察:记忆代理处理完整的 message_history
,其中包括原始任务代理的动态 system_prompt
(来自 experience_str()
)。这意味着:
- 新的记忆是在先前代理经验的背景下创建的
- 资料更新考虑到以前会话中的现有用户数据
- 记忆取代决定理解整个历史背景
async def create_user_experience(memory_agent: Agent, message_history: list):
prepared_messages = prepare_message_history(message_history)
# prepared_messages 包含:
# 1. 动态系统提示与先前经验_str()
# 2. 新对话消息
# 4a. 更新资料(意识到现有资料数据)
profile = await memory_agent.run(
user_prompt=Profile.user_prompt(),
result_type=Profile,
message_history=prepared_messages
)
# 4b. 生成新的记忆对象
memories = await memory_agent.run(
user_prompt=Memory.user_prompt(),
result_type=list[Memory],
message_history=prepared_messages
)
return GeneratedUser(profile=profile.data, memories=memories.data)
async def create_agent_experience(memory_agent: Agent, message_history: list):
prepared_messages = prepare_message_history(message_history)
# 4c. 识别代理级别的启发式或陷阱
return await memory_agent.run(
user_prompt=AgentExperience.user_prompt(),
result_type=AgentExperience,
message_history=prepared_messages
).data
3.5 记忆被验证或取代
请注意,每个新的 Memory
可以引用较旧的记忆ID来覆盖。一旦我们有了最终的新记忆列表,我们就整合它们,丢弃那些被标记为被取代的旧记忆。
3.6 将更新后的经验保存到 MongoDB
最后,我们将新生成的用户信息(资料 + 记忆)和更新后的代理经验合并到数据库中。
async def save_experience(
user: User,
generated_user: GeneratedUser,
agent: Agent,
agent_experience: AgentExperience
):
user.update_from_generated_user(generated_user) # 覆盖现有资料并更新记忆
agent.experience = agent_experience
await user.save()
await agent.save()
3.7 未来的任务看到更新的知识
任何新的 Task
对于相同的用户/代理会自动包含更新的记忆在动态 system_prompt
中(步骤 #2)。即使用户开始全新的对话或上下文,代理也会保留或扩展以前的学习。
4、示例场景
1. 初始推荐
- 用户:“给我推荐一部电影”
- 用户消息添加到
Task.message\_history
- 代理基于空资料建议浪漫喜剧
- 代理消息添加到
Task.message\_history
2. 对话期间的用户反馈
- 用户:“我喜欢动作片”
- 用户消息添加到
Task.message\_history
- 代理继续对话,但没有记忆/经验更新,但带有更新的
message\_history
- 最终消息:“你觉得‘动作大片标题’怎么样?”
- 用户批准,任务标记为
completed
- 完整对话存储在
Task.message\_history
中
3. 对话后处理
记忆代理分析完整历史记录:
- 更新资料:
interests: ["动作电影"]
- 创建记忆:“用户表达了对动作电影的偏好”
- 记录代理经验:“动作推荐得到更好的参与度”
4. 第二天的新任务
system\_prompt
现在包括更新的偏好- 代理基于存储的记忆建议“另一部动作惊悚片”
- 用户无需重新解释偏好就能获得个性化响应
5、结束语
这种结构化的记忆方法通过结合PydanticAI 的验证与MongoDB 的灵活性,使 AI 代理能够通过交互进化。
每次 run
都通过资料细化、上下文感知的记忆和累积经验得到改进。MongoDB 确保原子性和验证更新,并高效地处理嵌套数据结构和时间查询。关键创新在于一种对话后处理流程,它将实时交互与后台记忆分析解耦,保持聊天响应速度的同时实现复杂的更新。
原文链接:Adding a Memory layer to PydanticAI Agents
汇智网翻译整理,转载请标明出处