2026-03-08 16:07:08
随着 AI Agent 技术演进,从目前来看 AI Agent 架构大概被划分越来越清晰,我们参考《The Rise and Potential of Large Language Model Based Agents: A Survey》这篇论文里面的 Agent 架构定义,大概划分为以下几个部分:

那么我们本篇文章讨论的“记忆”其实是更广泛的存储区这块功能。对于 AI Agent 记忆来说,记忆其实就有点像脑容量,其核心必要性体现在以下三点:
上下文一致性 (Contextual Consistency):
Agent 需要记住之前的对话内容,才能理解当前的指令。例如,如果你先说“帮我订一张去上海的机票”,接着说“改到明天”,Agent 必须记得“去上海的机票”这个前提。
长期偏好学习 (Personalization):
通过记忆,Agent 可以学习用户的习惯(如:你偏好 Python 而不是 Java,或者你习惯在周五下午复盘)。
复杂任务拆解 (Task Decomposition & Planning):
在执行多步任务(如:写代码 -> 测试 -> 找 Bug -> 修复)时,Agent 需要记录每一步的状态,确保不会陷入循环或丢失进度。
在 AI Agent 领域记忆通常效仿人类的认知结构,分为以下层次:
短期记忆 (Short-term Memory):利用大模型的 Context Window(上下文窗口),将最近的几轮对话记录直接放入 Prompt 中发送给模型,抑或是工具调用结果、中间推理状态、任务临时变量,但是受限于模型能够处理的最大 Token 数量,一旦对话过长,旧的信息就会被“挤出”。
这部份数据我们可以存储在内存中,配合TTL(Time To Live)机制进行自动清理。这种设计的优势在于访问速度极快,但也意味着工作记忆的内容在系统重启后会丢失。这种特性正好符合工作记忆的定位,存储临时的、易变的信息。
长期记忆 (Long-term Memory):这相当于人类的“经验仓库”,可以存储海量信息并在需要时检索,可以通过各种数据库进行存储,一般来说可以做如下分类:

所以我这里借用一下 langchain 官方的一张图,agent memory 的存储其实就是选用合适存储的过程,针对不同数据类型将自然语言查询转化为特定数据库查询的方法。
记忆的核心操作其实就两个:
对于存储关于世界和 Agent 自身的事实性知识,我们通过 RAG(检索增强生成)所调用的外部知识库来实现,这部份我们单独拿出来说。这里我们先说说记录 Agent 过去的经历和日志的情境记忆 (Episodic Memory)。
我们将这种Episodic Memory分为三部份来进行存储:
存储的核心主要包含两个关键阶段:
提取阶段(Extraction Phase):
系统从当前的对话消息和历史背景中,动态地提取出“显著信息”(Salient Information)。它不是简单地存储对话记录,而是将其转化为简练、事实性的“记忆片断”。
更新与整合阶段(Update Phase):
当新的记忆提取出来后,将其与现有的相似记忆进行对比:
做提取核心原因是把“原始对话”压缩成“可检索、可更新、可复用”的结构化记忆。
我们这里使用 LLM 抽取,让模型按固定格式输出{"facts":[...]},灵活、效果好,是现在最常见方案。主要是分成这么几步来实现:
user:/assistant:/system ;{"facts":[...]}
我们来看个例子具体怎么提取的:
对话输入:
user 会提取出:
{
"facts": [
"名字是小王",
"在北京做后端开发",
"乳糖不耐受",
"平时喜欢跑步"
]
}
assistant 会提取出:
{
"facts": [
"擅长 Python 和系统设计",
"回答风格尽量简洁",
"偏好用表格总结"
]
}
再看一个“无可提取信息”的例子,对话输入:
{"facts": []}
这一阶段会用第一阶段提取出来的 facts 来进行记忆的更新与整合。主要是分成这么几步来实现:
我们在让 LLM 做更新决策的时候需要根据 4 个明确模块,降低 LLM 自由发挥空间:
放入“操作规则与判定标准”
在我们给定的 UPDATE_MEMORY_PROMPT 里面需要定义了 ADD/UPDATE/DELETE/NONE 的语义和多个 few-shot 示例,让 LLM 具体了解到更新规则;
放入“当前记忆状态”
如果有旧记忆,就把旧记忆数组包在代码块里;否则明确写 Current memory is empty。这样 LLM 是在“当前状态机”上做增删改,而不是凭空生成。比如这样拼接 prompt:
if retrieved_old_memory_dict:
current_memory_part = f"""
Below is the current content of my memory which I have collected till now. You have to update it in the following format only:
{retrieved_old_memory_dict}
else:
current_memory_part = """Current memory is empty. """
放入“新 facts 输入”
把新抽取的 facts 明确告诉模型:你只需要判断这些新事实对当前记忆该怎么处理。比如这样:
The new retrieved facts are mentioned in the triple backticks. You have to analyze the new retrieved facts and determine whether these facts should be added, updated, or deleted in the memory.
最后强约束输出格式
函数把目标输出 schema 写死为:{"memory":[{"id","text","event","old_memory"}]},并加“Do not return anything except JSON format”这能显著提高可解析性,方便后续程序按 event 执行。
我们举个完整例子:
假设旧记忆是:
[
{"id": "0", "text": "喜欢奶酪披萨"},
{"id": "1", "text": "是后端工程师"}
]
新 facts 是:
["喜欢鸡肉披萨", "在准备转管理岗"]
然后 LLM 可能返回:
{
"memory": [
{
"id": "0",
"text": "喜欢奶酪和鸡肉披萨",
"event": "UPDATE",
"old_memory": "喜欢奶酪披萨"
},
{
"id": "1",
"text": "是后端工程师",
"event": "NONE"
},
{
"id": "2",
"text": "在准备转管理岗",
"event": "ADD"
}
]
}
后续程序按 event 执行真正落库(新增/更新/删除)。
再来将一下Graph存储怎么做。Graph核心优势在于它不再是零散的“事实点”,而是形成了“知识网”。在处理复杂逻辑、跨时空关联和深度偏好挖掘时,这种方式比简单的纯文本记忆要强大得多,并且不像向量数据库只能进行相似度进行检索,而是可以沿着已知的节点和边,像找地图一样寻找关联。
我们来举例几个场景:
复杂的人际关系网(社交/CRM 场景)
如果一个 AI 助理只记录纯文本,它可能记得“王总喜欢红酒”和“李女士是王总的太太”。但当你要策划一场晚宴时,基于图的记忆能迅速通过“配偶”关系推导出两者的关联,AI 就可以根据提问信息进行实体和关系的抽取:
王总、李女士、红酒
[王总] --(配偶)--> [李女士],[王总] --(偏好)--> [红酒]
跨 session 的逻辑排产与项目追踪
在长期的项目管理中,任务之间存在前置、后置和依赖关系。比如根据我们的文档 AI 可以抽离出下面实体和关系:
模块 A 设计、前端开发、后端 API、张工
[前端开发] --(依赖于)--> [后端 API],[张工] --(负责)--> [后端 API]
如果张工今天请假了,基于图的记忆能立刻感知到:这不仅会耽误“后端 API”,还会连锁反应导致“前端开发”停滞。
个性化推荐中的“归因”与“反转”
传统的向量检索(Vector Search)有时会因为语义接近而产生误导,但图结构可以做到精准的时间戳与状态管理。比如用户在 2023 年说“我最讨厌吃香菜”,但在 2024 年说“我尝试了香菜拌牛肉,竟然觉得不错”,那么可以抽取出:
时间、态度、物品
[用户] --(2023 态度: 厌恶)--> [香菜],[用户] --(2024 态度: 接受)--> [香菜]
图结构可以带标签(如时间、强度)。当 AI 决定今天点餐建议时,它能通过有向边的“时间戳”属性,识别出最新的态度已经覆盖了旧的态度,从而避免因为检索到旧文本而一直提醒你“别放香菜”。
同样的我们也需要分几步通过约束和关系的抽取让我们产生的结果更加可控:
LLM 抽实体+类型
这一步主要是做主体的提取相应实体和类型,规范输出结果,主要用于后续入库时给节点打 label/type(以及默认类型回退),比如输入文本:
我叫小王,在字节跳动做后端开发,住在北京。
得到结果大致会变成:
{
"name": "extract_entities",
"arguments": {
"entities": [
{"entity": "小王", "entity_type": "person"},
{"entity": "字节跳动", "entity_type": "organization"},
{"entity": "后端开发", "entity_type": "profession"},
{"entity": "北京", "entity_type": "location"}
]
}
}
LLM 抽关系三元组
这一步是为了把上一步抽取的实体和类型让 LLM输出 source/relationship/destination,比如上面的例子这里会生成:
{
"name": "establish_relationships",
"arguments": {
"entities": [
{"source": "小王", "relationship": "works_at", "destination": "字节跳动"},
{"source": "小王", "relationship": "has_profession", "destination": "后端开发"},
{"source": "小王", "relationship": "lives_in", "destination": "北京"}
]
}
}
用实体 embedding 在图里查相近旧节点/关系,再用 LLM 判定要删哪些旧关系,再执行 ADD / UPDATE / DELETE
这里我举例说明一下,比如用户先后两次输入:
我在字节跳动做后端,住在北京。(首次输入)
抽到关系后入图:
我现在在字节工作,搬到北京市朝阳区了。(过了一段时间后)
新实体可能是:小王 / 字节 / 北京市朝阳区
接下来就会检索和新旧关系的判定
查 字节 最相近旧节点
查 北京市朝阳区 最相近旧节点
拿这些相近节点的旧关系给 LLM 看
最终图可能变成:
对记忆的提取也是分两块进行提取:
这里就是常规逻辑。
上面我们有提到过,当需要关于世界和 Agent 自身的事实性知识,它不依赖于具体的Agent经历。例如,“北京是中国的首都”或者用户的基本偏好,在技术实现上,这通常对应于 RAG(检索增强生成)所调用的外部知识库。
RAG 核心思想是:在生成回答之前,先从外部知识库中检索相关信息,然后将检索到的信息作为上下文提供给大语言模型,从而生成更准确、更可靠的回答。
一个完整的 RAG (Retrieval-Augmented Generation,检索增强生成) 应用流程可以分为两个核心阶段:离线数据处理 (Ingestion) 和 在线检索生成 (Inference)。
离线阶段:数据准备与索引 (Data Ingestion)
这是 RAG 的“地基”,目的是将非结构化的知识变成 AI 能够理解和检索的格式。
文档加载 (Loading): 从 PDF、Word、Markdown 或数据库中提取文本。
文本分割 (Chunking): 将长文章切分为较小的、语义完整的段落(Chunks)。
为什么? 因为 LLM 有上下文窗口限制,且过长的信息会稀释检索精度。
向量化 (Embedding): 调用 Embedding 模型(如 OpenAI text-embedding-3 或本地的 BGE),将文本转换为高维向量。
向量存储 (Vector Storage): 将这些向量连同原始文本存储在向量数据库中(如 Pinecone, Milvus, Chroma)。
在线阶段:检索 (Retrieval)
当用户提出问题时,系统开始“翻书”。
通过我们上面的简单介绍,应该可以知道写入流程是这样:
任意格式文档 → MarkItDown转换 → Markdown文本 → 智能分块 → 向量化 → 存储检索
下面我们简单的讨论一些细节。
MarkItDown 是微软(Microsoft)开源的一款非常实用的工具。它主要的目的是用来处理多模态的数据,无论是 PDF, Word (docx), PowerPoint (pptx), Excel (xlsx) 还是图片、音频内容,将各种格式的非结构化数据,一键转换为干净、标准的 Markdown 格式。
对于图片数据,它会调用多模态模型通常配置指向一个多模态大模型(如 GPT-4o 或 Claude 3.5 Sonnet),模型会分析图片中的场景、物体、文字(OCR)以及图表趋势,将生成的描述文字。比如 PDF 里面有一张图片,那么会抽取成:

对于音频内容,MarkItDown 一般会结合 OpenAI Whisper 等语音识别模型将音频中的对话或旁白完整转录为文本,转录后的文本会作为该音频文件的“代表内容”存入 Markdown 结果中,使其可以被向量化并检索。
在 RAG 应用中,分块(Chunking) 是决定检索质量的生死线。如果分块太小,会丢失上下文;如果分块太大,会引入过多噪音并导致 LLM 无法处理。
目前市面上主流的几种分块策略有:
# 标题、## 子标题)进行切分。 识别 Markdown 或 HTML 的标题标签,将属于同一标题的内容聚合成一个块;["\n\n", "\n", " ", ""]),首先尝试按段落(\n\n)切,如果某一段还是太长,再按句子(\n)切,依然太长,就按空格切;其实上面智能程度和计算成本是成反比的,越只能的策略通常来说也越贵。
| 策略 | 智能程度 | 计算成本 | 适用场景 |
|---|---|---|---|
| 固定字符 | 低 | 极低 | 性能要求极高的基准测试 |
| 递归结构 | 中 | 低 | 通用场景(推荐首选) |
| 语义相似度 | 高 | 中 | 缺乏明显格式的非结构化论文/报告 |
| Agentic/LLM | 极高 | 高 | 高价值、高准确度要求的核心文档 |
RAG系统将数据存好之后,核心的竞争力还是在检索。RAG 的基本思路是根据用户输入检索出最相关的内容,但是用户输入是不可控的,可能存在冗余、模糊或歧义等情况,如果直接拿着用户输入去检索,效果可能不理想。所以我们可以通过一些策略来优化查询效果。
查询扩展(Query Expansion) 就是把用户的原始提问“整容”或“分身”,变成更多、更丰富的表达方式。它的存在是为了解决 RAG 系统中的一个顽疾:词项不匹配(Term Mismatch)。比如用户搜“番茄”,但文档里写的是“西红柿”,基础检索可能就会完美错过。
查询扩展有多种不同的实现,比如:
多查询(Multi-Query)
这是最常见的扩展方式。让 LLM 站在不同角度,把你的问题重写成 3-5 个意思相近的问题。比如提问:“如何让猫爱上喝水?”,可以被扩展成:
“猫咪饮水习惯的诱导方法有哪些?”
“增加宠物猫饮水量的实用技巧。”
“哪些因素会影响猫对水源的偏好?”
后退提示 (Step-back Prompting)
它是 Google DeepMind 团队在论文 Take a Step Back: Evoking Reasoning via Abstraction in Large Language Models 中提出的一种新的提示技术。
基本原理简单来说就是,如果你的问题太细节,检索效果往往不好。查询扩展会先退一步,问一个更宏观的原理。比如提问:“为什么我的 2023 款 MacBook Pro 跑 Python 特别烫?”,后退一步可能是:
帮助系统先检索到大框架知识,辅助回答具体细分问题。
假设文档 (HyDE)
HyDE 是 Luyu Gao 在 Precise Zero-Shot Dense Retrieval without Relevance Labels ,它的核心思想是"用答案找答案"。传统的检索方法是用问题去匹配文档,但问题和答案在语义空间中的分布往往存在差异——问题通常是疑问句,而文档内容是陈述句。HyDE 与其用一个“问题”去搜“答案”,不如先编一个“假答案”,然后用“假答案”去搜“真答案”
比如提问:“那个两个粒子互相感应的物理现象叫什么?”,检索效果差往往是因为 Query(问题) 和 Document(文档) 处于不同的语义空间,因为文档通常很长且是陈述句: “当两个或多个粒子以特定的方式结合在一起时,它们的状态就变得不可分割。即使你把这两个粒子分别放在宇宙的两端,它们依然保持着这种奇……”。
所以,我们可以让LLM 生成假答案: “这种现象通常指量子纠缠,即两个粒子在空间上分离但状态紧密关联……”,带着这段话去搜。因为假答案里包含了“量子纠缠”、“空间分离”、“状态关联”等学术词汇,它能精准地在论文中找到对应的章节。
RAG Fusion
最后还需要提一下 RAG Fusion,这是它的论文地址 https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf 。比如当你用查询扩展生出了 5 个问题,去检索得到了 5 份不同的答案排名,这时候就会出现矛盾:文档 A 在问题 1 里排第一,在问题 2 里排第十。RAG Fusion 就是那个负责“打分合并”的裁判。它利用 RRF(倒数排名融合) 算法进行打分。
比如现在有个原始问题: “如何在北京申请居住证?”,然后我们扩展成:
分身 1: 北京居住证办理流程是什么?
分身 2: 北京居住证申请需要什么材料?
分身 3: 外地人在北京办居住证的条件。
然后我们得到检索结果,文档 A(《北京人口管理条例》):在分身 1 搜到排第 3,分身 2 搜到排第 2,分身 3 搜到排第 5。文档 B(一篇非官方博客):在分身 1 搜到排第 1,但在其他两个搜索里都没出现。
经过 RRF 计算,文档 A 虽然没有拿过第一,但因为它在三个维度都被认定为高度相关,最终总分会反超文档 B。这样就过滤掉了偶然性极高的干扰信息。
Xinbei Ma 等人在论文Query Rewriting for Retrieval-Augmented Large Language Models提出了一种 Rewrite-Retrieve-Read 的方法,对用户的输入进行改写,以改善检索效果。在传统的 RAG(检索 -> 阅读)流程中,用户的原始输入往往不是“搜索引擎友好”的,比如包含大量的冗余、代词或模糊表达等。
查询重写主要思想就是使用一个专门的“重写器”(Rewriter)将原始查询转化为一个或多个更适合搜索引擎的检索词(Search Queries),然后使用这些优化后的词去数据库中捞取知识。
总而言之,AI Agent 的记忆系统是其迈向高度智能的核心支柱,通过构建包含短期工作记忆与长期经验库的多层架构,结合基于大模型的事实提取、动态更新机制及知识图谱技术,并配合深度优化的 RAG 检索流程,Agent 能够实现精准的上下文维持与知识内化,从而在复杂场景中提供更具一致性、个性化且可靠的智能支持。
https://github.com/mem0ai/mem0
https://www.anthropic.com/engineering/effective-context-engineering-for-ai-agents https://blog.langchain.com/how-we-built-agent-builders-memory-system/
https://arxiv.org/abs/2309.02427
https://arxiv.org/abs/2504.19413
https://www.youtube.com/watch?v=cHQyugatz6M
https://www.aneasystone.com/archives/2024/06/advanced-rag-notes.html
2026-02-20 13:33:01
我们先以一个全局的视角看看 redis 的数据是怎么存放的:
redisDb (数据库)
└── dict (全局字典)
└── ht[0] (哈希表数组)
└── [Bucket] ──> dictEntry (节点)
├── key: [ SDS ("mykey") ]
└── val: [ redisObject ]
├── type: REDIS_STRING
├── encoding: EMBSTR (或 RAW)
└── ptr ──> [ SDS ("hello") ]
Redis 的顶层存储核心是用全局字典(Global Dict,也叫 Keyspace)来管理所有的数据,Dict 采用的是双哈希表结构来保存数据主要是用来做渐进式 rehash,双哈希表结构用ht[0] 和 ht[1]来表示,通常数据只在 ht[0] 中,当哈希表需要扩容或缩容时,Redis 会一边处理请求,一边分批将数据从 ht[0] 迁移到 ht[1]。
哈希表其实就是一张大 bucket 数组,每个 bucket 是 dictEntry,由 dictht 数据结构来进行管理:
typedef struct dictht {
// 哈希表的槽
dictEntry **table;
// 哈希表槽个数,是2的整数次幂
unsigned long size;
// size-1,计算出一个key的hash后,直接 hash & sizemask即可算出所属的槽
unsigned long sizemask;
// 已使用大小
unsigned long used;
} dictht;
在全局字典中,每一个键值对都被封装在一个 dictEntry 结构体中:
redisObject 结构体(或其指针)。redisObject 就像一个通用容器,它封装了所有 Redis 数据类型(String, List, Hash 等)。在 Redis 中使用 redisObject 统一来管理底层的数据结构,无论底层是SDS、ziplist 或 dict统一用 redisObject 来进行封装,然后通过 type 来进行标识。
在 Redis 的 C 语言源码中,它的定义如下(以 64 位系统为例):
| 字段名 | 占用空间 | 作用说明 |
|---|---|---|
| type | 4 bits | 逻辑类型:标识它是 String、List、Hash、Set 还是 ZSet。 |
| encoding | 4 bits | 物理编码:标识底层具体是用什么实现的(如 ziplist、skiplist、int 等)。 |
| lru / lfu | 24 bits | 对象热度:记录最后一次被访问的时间(LRU)或访问频率(LFU),用于内存淘汰。 |
| refcount | 4 bytes | 引用计数:记录有多少地方引用了这个对象。为 0 时对象被销毁。 |
| ptr | 8 bytes | 数据指针:指向底层真实数据的内存地址。 |
合计算下来,一个 redisObject 固定占用 16 字节。
这样做就是统一了接口,当你执行 DEL 命令时,Redis 不需要关心你删的是 String 还是 List,它只需要操作 redisObject 这个通用结构。
除此之外它有三大作用:
类型检查与多态
当你输入 LPOP key 时,Redis 会先检查这个 redisObject 的 type 是不是 REDIS_LIST。如果不是,直接返回错误。如果是,它会根据 encoding 字段去调用对应的函数(比如是从 linkedlist 弹出还是从 listpack 弹出)。
内存管理与共享
通过 refcount 的引用计数来控制内存的释放,当引用计数归零,Redis 才会真正释放内存。
内存淘汰(LRU/LFU 算法)
LRU 模式就会通过时间戳来看该对象是否应该被淘汰。LFU 模式它根据数据被访问的频率来决定淘汰对象,高 16 位存时间,低 8 位存访问计数。 如果这个字段很久没更新,当 Redis 内存不足时,它就会优先被“踢出”内存。
虽然在全局字典看来,所有的 Value 都是一个 redisObject,但 redisObject 内部通过 type 和 ptr 指向了完全不同的底层世界:
| 命令示例 | redisObject -> type | redisObject -> ptr 指向的内容 |
|---|---|---|
SET key "val" |
REDIS_STRING | 指向一个 SDS(可能是 int, embstr 或 raw) |
HSET user:1 name "A" |
REDIS_HASH | 指向一个 Dict 或 listpack/ziplist |
LPUSH list "item" |
REDIS_LIST | 指向一个 quicklist(由多个 listpack 组成的双端链表) |
SADD tags "java" |
REDIS_SET | 指向一个 Dict (value 为 NULL) 或 intset |
ZADD rank 100 "A" |
REDIS_ZSET | 指向一个 zset 结构(内含 Skiplist + Dict) |
Redis 设计了简单动态字符串(Simple Dynamic String,SDS)的结构,用来表示字符串,动态字符串结构如下图所示:

SDS 大致由两部分构成:header以及 数据段,其中 header 还包含3个字段 len、alloc、flags。len 表示数据长度,alloc 表示分配的内存长度,flags 表示了 sds 的数据类型。
在以前的版本中,sds 的header其实占用内存是固定8字节大小的,所以如果在redis中存放的都是小字符串,那么 sds 的 header 将会占用很多的内存空间。
但是随着 sds 的版本变迁,其实在内存占用方面还是做了一些优化:
__attribute__ 修饰,这里主要是防止编译器自动进行内存对齐,这样可以减少编译器因为内存对齐而引起的 padding 的数量所占用的内存。目前的版本中共定义了五种类型的 sds header,其中 sdshdr5 是没用的,所以没画:

当执行 SET key value 时,对于 key 来说存放方式就是:
DictEntry
│
└── key (指针)
│
▼
┌──────────────────────────────────────────────────────────────┐
│ [ SDS Header ] [ SDS Body (buf) ] [ \0 ] │
└──────────────────────────────────────────────────────────────┘
▲ ▲ ▲
│ │ │
│ │ └── 结尾 (1 byte)
│ │
│ └── 你的 1MB 甚至 512MB 的数据
│
└── 这里的元数据结构会根据大小变化
(sdshdr8 -> sdshdr16 -> ... -> sdshdr64)
对于 value 来说,Redis 会根据 value 的情况选择以下三者之一:
int 编码redisObject 的 ptr 指针位置(指针 8 字节,正好存下一个 long)。embstr 编码redisObject 结构体与 SDS 结构体在内存中是连续的一块空间。raw 编码redisObject 和 SDS 是两块独立的内存区域,通过指针连接。redisObject。
所以我们可以看到 key 和 value 其实是分两部分存储:
Value (值):可能会因为 RAW 编码 而导致 redisObject 和 SDS 分离(不挨着)。
Key (键):永远没有 redisObject 包装,它直接就是一个 SDS。所以 Key 的 Header 和数据永远是连在一起的,没有任何例外。
在估算容量之前,我们来看看 redis 使用的 jemalloc 是怎么做内存分配的。
jemalloc 预先定义了一系列固定的内存块大小(称为 Size Class)。当 Redis 请求分配 N 字节时,jemalloc 会查找第一个大于等于 N 的规格(Size Class)内存块进行分配。
为了减少浪费,jemalloc 的规格设计得很科学,并不是单纯的 2 的幂次方(2, 4, 8, 16…),而是更加细密:
| 规格区间 | 具体的 Size Class (字节) |
|---|---|
| 8B – 128B | 8, 16, 32, 48, 64, 80, 96, 112, 128 |
| 128B – … | 160, 192, 224, 256, 320 … |
假设你在 Redis 里存一个简单的字符串,算上 SDS 头部等开销,Redis 向系统申请了 20 字节。
结果:
虽然看起来浪费了一点点空间(内部碎片),但对整个系统来说,收益巨大:
所以根据我们上面的介绍,应该知道一个 String 键值对的总内存占用主要由三部分组成:

全局字典节点 (dictEntry):固定 24 字节
键 (Key):SDS 结构
值 (Value):取决于编码方式,上面我们有介绍,就不细说了 int、embstr、raw 编码;
| 编码方式 | 计算公式 | 说明 |
|---|---|---|
| INT | 16 字节 | 只有 redisObject,数值直接存在指针里。 |
| EMBSTR | $malloc(16 + 3 + len(Val) + 1)$ |
redisObject 与 SDS 连续分配,整体向上取整。 |
| RAW | $16 + malloc(3 + len(Val) + 1)$ |
redisObject 与 SDS 分开分配,各自取整后再求和。 |
SET "key" "value"
我们来算一下这个极小键值对实际占了多少地儿:
所以我们可以看到个有趣的事实,存储 8 字节的原始数据,Redis 实际需要 64 字节,膨胀率高达 8 倍。
不要试图用数学公式去死算每一个字节(因为 jemalloc 和 struct padding 很难完全算准),而是采用 “小规模采样 + 线性推演”。
我们可以启动一个空的 Redis 实例,记录初始内存 used_memory(通常在 1MB 左右,是 Redis 自身的启动开销)。编写脚本,写入 10,000 个 具有代表性的 Key-Value 数据(长度和类型要符合你的生产场景)。
然后计算初始内存使用 和 最终内存使用的差值,然后计算出单挑数据消耗,将单条数据消耗 X 预计总数据量就可以得到最终的预估结果。
如果你没法做测试,只能盲算,必须根据 Key/Value 的平均大小 来应用不同的膨胀系数。
小对象场景(最容易翻车)
场景:Key = 10 字节,Value = 10 字节。
原始数据:20 字节。
Redis 实际占用:约 64 ~ 80 字节。
膨胀系数:3倍 ~ 4倍。
dictEntry (24B) + redisObject (16B) 即使什么都不存就已经 40B 了。加上 jemalloc 的 8B/16B/32B 对齐,开销巨大。中等对象场景
大对象场景
场景:Key = 50 字节,Value = 10 KB。
膨胀系数:接近 1.05倍。
2026-01-25 15:31:36
本文章的实践代码提交在:https://github.com/luozhiyun993/skill-workflow
本文将深度解析 Agent Skill 的模块化设计:从 Skill 间的层级调用、工具脚本的自动化执行,到 Subagent 的专业化分工。我们将通过“小红书爆款生产线”这一实战案例,展示如何利用文件传递、状态追踪与清单模式,解决复杂任务中上下文过载与输出不可控的痛点。告别臃肿的单一 Prompt,让你的 Agent Workflow 变得可验证、可断点续传且高度精准。
有时候任务比较复杂,我们就可以抽取出不同的 skill,通过 skill 之间的调用来简化单个 skill 的复杂度,或者可以把一些公用到的 skill 抽取出来,变成单一的 skill。
比如我们每次在开发完之后都需要:运行测试,本地合并到基础分支、推送并创建 Pull Request,那么我们就可以创建一个 finishing-a-development-branch skill,然后在其他的 skill 里面指定调用:
### Step 5: Complete Development
After all tasks complete and verified:
- Announce: "I'm using the finishing-a-development-branch skill to complete this work."
- **REQUIRED SUB-SKILL:** Use finishing-a-development-branch skill
- Follow that skill to verify tests, present options, execute choice
比如我们可以在 skill 里面指定使用方法,运行脚本,以及输出结果是什么,让 agent 自动执行:
## 使用方法
这是一个基于 TypeScript 的脚本 Skill。
### 运行脚本
# 在项目根目录下运行
npx ts-node .claude/skills/demo.ts
### 输出结果
脚本运行后,会在 workflow-agent/outputs/demo/ 目录下生成两个文件:
1. demo_[timestamp].json: 原始数据。
2. tdemo_analysis_[timestamp].md: Claude 生成的分析报告。
当 Claude 执行复杂、开放式的任务时,它可能会出错。假设你让克劳德根据电子表格更新 PDF 中的 50 个表单字段,我们就可以通过添加一个中间的 changes.json 文件,在应用更改之前对其进行验证。工作流程变为:分析 → 创建文件 → 验证 → 执行 → 验证。
这一步特别重要:所有中间结果都保存成本地文件。
三个好处:
比如我们可以这样在 SKILL 里面指定文件的存放目录以及存放格式:
## Instructions
When this skill is invoked:
1. Create the `./input` directory if it doesn't exist
2. Get the user's input message (passed as arguments or prompt for it)
3. Generate a timestamp-based filename (format: `YYYY-MM-DD_HH-MM-SS.txt`)
4. Save the input to `./input/<timestamp>.txt`
5. Confirm the file has been saved with the full path
skill 里面是可以调用 subagent 的,subagent 有几个优势是:context 独立,可以并发执行,并且是可以进行专业化分工的,那么我们就可以在 skill 在有需要的时候调用 subagent,提升执行效率,比如下面我创建了一个 go-file-author-attribution agent,那么在 skill 里面就可以指明调用:
**Batch Process Files**
- For each eligible file, use the Task tool to invoke the `go-file-author-attribution` agent
- Pass the author name and file path to the agent
- Process files sequentially to avoid conflicts
但是如果这样简单的调用,有时候会把一大段内容直接塞给 subagent,上下文窗口很快就撑满了。但如果只传路径,subagent 自己去读文件,上下文就干净很多。
Subagent 之间只传文件路径,不传内容,这条规则很重要。
比如可以设置一个 writer-agent 启动时只需要三个参数:source 文件路径、analysis 文件路径、outline 文件路径。它自己读取内容,写完保存到指定路径,返回输出文件路径。
这样做还有个好处:可以并行启动多个 subagent。三个 writer-agent 同时跑,各自处理一个提纲方案,互不干扰。
在 skill 里面通常来说,不建议把所有的信息都平铺到 SKILL.md 里面,因为上下文太长会浪费很多不必要的 token,并且让 agent 不够聚焦,那么我们可以使用 reference 的方式提供外部的文档提供:
## References
See `references/` folder for detailed documentation:
- `bdi-ontology-core.md` - Core ontology patterns and class definitions
- `rdf-examples.md` - Complete RDF/Turtle examples
- `sparql-competency.md` - Full competency question SPARQL queries
- `framework-integration.md` - SEMAS, JADE, LAG integration patterns
将复杂的操作分解成清晰的、循序渐进的步骤。对于特别复杂的流程,提供一份清单 checklist,这样可以让 agent 逐步勾选完成,如下所示:

## Research synthesis workflow
Copy this checklist and track your progress:
Research Progress:
- [ ] Step 1: Read all source documents
- [ ] Step 2: Identify key themes
- [ ] Step 3: Cross-reference claims
- [ ] Step 4: Create structured summary
- [ ] Step 5: Verify citations
**Step 1: Read all source documents**
Review each document in the sources/ directory. Note the main arguments and supporting evidence.
**Step 2: Identify key themes**
Look for patterns across sources. What themes appear repeatedly? Where do sources agree or disagree?
**Step 3: Cross-reference claims**
For each major claim, verify it appears in the source material. Note which source supports each point.
**Step 4: Create structured summary**
Organize findings by theme. Include:
- Main claim
- Supporting evidence from sources
- Conflicting viewpoints (if any)
**Step 5: Verify citations**
Check that every claim references the correct source document. If citations are incomplete, return to Step 3.
除此之外,也可以让 claude 在 workflow 里面去执行代码,比如把代码放入到 scripts 中,我们可以看一下 claude pdf skill 的目录结构:
.
├── forms.md
├── LICENSE.txt
├── reference.md
├── scripts
│ ├── check_bounding_boxes_test.py
│ ├── check_bounding_boxes.py
│ ├── check_fillable_fields.py
│ ├── convert_pdf_to_images.py
│ ├── create_validation_image.py
│ ├── extract_form_field_info.py
│ ├── fill_fillable_fields.py
│ └── fill_pdf_form_with_annotations.py
└── SKILL.md
在 SKILL.md 里面直接指明什么时候去调用脚本: `python scripts/check_fillable_fields <file.pdf>。
下面提供一个demo:
## PDF form filling workflow
Copy this checklist and check off items as you complete them:
Task Progress:
- [ ] Step 1: Analyze the form (run analyze_form.py)
- [ ] Step 2: Create field mapping (edit fields.json)
- [ ] Step 3: Validate mapping (run validate_fields.py)
- [ ] Step 4: Fill the form (run fill_form.py)
- [ ] Step 5: Verify output (run verify_output.py)
**Step 1: Analyze the form**
Run: python scripts/analyze_form.py input.pdf
This extracts form fields and their locations, saving to fields.json.
**Step 2: Create field mapping**
Edit fields.json to add values for each field.
**Step 3: Validate mapping**
Run: python scripts/validate_fields.py fields.json
Fix any validation errors before continuing.
**Step 4: Fill the form**
Run: python scripts/fill_form.py input.pdf fields.json output.pdf
**Step 5: Verify output**
Run: python scripts/verify_output.py output.pdf
If verification fails, return to Step 2.
通过 Run validator → fix errors → repeat 这种循环模式来不断提升输出的质量

## Content review process
1. Draft your content following the guidelines in STYLE_GUIDE.md
2. Review against the checklist:
- Check terminology consistency
- Verify examples follow the standard format
- Confirm all required sections are present
3. If issues found:
- Note each issue with specific section reference
- Revise the content
- Review the checklist again
4. Only proceed when all requirements are met
5. Finalize and save the document
比如上面的例子中,使用 STYLE_GUIDE.md 作为验证器,agent 通过通过读取和比较来执行检查,不通过则循环修改之后再进行验证。
我们可以在 md 里面引导 agent 做出条件选择,运行符合条件的 workflow :

## Document modification workflow
1. Determine the modification type:
**Creating new content?** → Follow "Creation workflow" below
**Editing existing content?** → Follow "Editing workflow" below
2. Creation workflow:
- Use docx-js library
- Build document from scratch
- Export to .docx format
3. Editing workflow:
- Unpack existing document
- Modify XML directly
- Validate after each change
- Repack when complete
我们可以在 skill 里面提供示例以提升 agent 的能力,最好可以明确 input/output 这样更明确,如下所示:
## Commit message format
Generate commit messages following these examples:
**Example 1:**
Input: Added user authentication with JWT tokens
Output:
feat(auth): implement JWT-based authentication
Add login endpoint and token validation middleware
**Example 2:**
Input: Fixed bug where dates displayed incorrectly in reports
Output:
fix(reports): correct date formatting in timezone conversion
Use UTC timestamps consistently across report generation
比如我们现在输出的结果就是需要按照一定要求输出,那么我们可以在 skill 提供模版,让 agent 按照模版输出:
## Report structure
ALWAYS use this exact template structure:
# [Analysis Title]
## Executive summary
[One-paragraph overview of key findings]
## Key findings
- Finding 1 with supporting data
- Finding 2 with supporting data
- Finding 3 with supporting data
## Recommendations
1. Specific actionable recommendation
2. Specific actionable recommendation
一般的情况,我们用 传统workflow的做法(比如在dify里),需要这么做:
但是如果用 skill 就完全不需要这样,比如可以简单的用我上面讲的 pattern 就足够实现一套比较复杂的 workflow了。
比如目前要搭建一个一个小红书热门爆款写作的workflow,首先是从热门网站爬取,然后分析爆款热点,再来写作,最后输出到小红书,那么整个 workflow 的编排任务也可以通过 skill 来完成。
那么我们可以这样编排 workflow:
.claude/
├── skills/
│ ├── workflow-runner/ # 核心编排引擎
│ │ ├── SKILL.md # 解析 YAML 并调度任务的指令
│ │ └── workflow_schema.json # 约束 workflow.yaml 的格式
│ ├── web-scraper/ # 基础采集工具
│ │ ├── SKILL.md # 爬虫调用指令
│ │ └── scripts/ # 存放 Python/Playwright 爬虫脚本
│ └── xhs-utils/ # 小红书专用工具箱
│ ├── SKILL.md # 包含格式化、Emoji 注入、标签生成逻辑
│ └── templates/ # 爆款文案模板库
├── agents/ # 专门化的 Sub-agents 定义
│ ├── crawl-agent.md # 负责从乱码网页中清洗出有效信息的 Agent
│ ├── trend-analyst-agent.md # 负责拆解爆款逻辑、提炼“钩子”的 Agent
│ └── xhs-writer-agent.md # 负责不同人格化写作的文案 Agent
└── workspace/ # 运行时的中转站 (执行过程中动态生成)
└── xhs-factory/ # 存放 raw_data, analysis, drafts 等中间文件
我上面这套 workflow 可以利用到 skill 和 subagent 相互协调来实现。skill 主要用来运行脚本和润色;subagent 因为有单独的context,所以将拆分的任务并发执行,提升处理效率。

第一步:执行编排 workflow-runner (编排器) ,它会通过读取配置,我把它定义为 xhs_vlog.yaml,它里面规定了执行步骤,以及输出到什么文件夹:
name: "小红书爆款文案生产线"
version: "1.0"
workspace: "workspace/xhs-factory"
steps:
# 步骤 1:爬取小红书热门内容
- id: scraping_stage
type: skill
skill: web-scraper
params:
target: "xiaohongshu_trending" # 爬取小红书首页热门
limit: 20 # 爬取20篇热门笔记
output_dir: "{{workspace}}/raw_data"
# 步骤 2:清洗数据
- id: cleaning_stage
type: agent
agent: crawl-agent
depends_on: [scraping_stage]
params:
input: "{{steps.scraping_stage.output}}"
output: "{{workspace}}/cleaned_data.json"
# 步骤 3:趋势分析
- id: analysis_stage
type: agent
agent: trend-analyst-agent
depends_on: [cleaning_stage]
params:
input: "{{steps.cleaning_stage.output}}"
output: "{{workspace}}/analysis/hooks_and_patterns.json"
# 步骤 4:文案创作(并行生成3种风格)
- id: writing_stage
type: agent
agent: xhs-writer-agent
mode: parallel # 并行执行
depends_on: [analysis_stage]
params:
styles: ["干货风", "吐槽风", "故事风"]
analysis: "{{steps.analysis_stage.output}}"
output_dir: "{{workspace}}/drafts"
# 步骤 5:格式化文案
- id: formatting_stage
type: skill
skill: xhs-utils
depends_on: [writing_stage]
params:
drafts_dir: "{{steps.writing_stage.output}}"
output_dir: "{{workspace}}/final"
然后通过设置 run_state.json文件的方式每完成一个步骤,agent 必须强制更新这个文件,然后上一步和下一步通过 ouput 来进行对接,每一步完成之后会标记状态和完成时间,比如这样:
{
"workflow_file": ".claude/workflows/xhs_vlog.yaml",
"workspace": "workspace/xhs-factory",
"current_step_id": "writing_stage",
"global_context": {},
"steps": {
"scraping_stage": {
"status": "completed",
"output": "workspace/xhs-factory/raw_data",
"timestamp": "2026-01-19T14:17:19.344205",
"error": null
},
"cleaning_stage": {
"status": "completed",
"output": "workspace/xhs-factory/cleaned_data.json",
"timestamp": "2026-01-19T14:22:17.638192",
"error": null
},
"analysis_stage": {
"status": "completed",
"output": "workspace/xhs-factory/analysis/hooks_and_patterns.json",
"timestamp": "2026-01-19T14:29:11.210193",
"error": null
},
"writing_stage": {
"status": "completed",
"output": "workspace/xhs-factory/drafts",
"timestamp": "2026-01-19T14:34:22.027580",
"error": null
},
"formatting_stage": {
"status": "pending",
"output": null,
"timestamp": null,
"error": null
}
}
}
第二步:原子执行 web-scraper (Skill),Skill 会调用运行 Python 脚本进行网站的爬取,脚本运行成功并生成文件后,Runner 立即将 scrapping_stage 标记为 completed,并写入文件到当前项目的 raw_data 文件夹;
第三步:启动 crawl-agent 批量的对抓取的页面进行数据清洗,并且在 crawl-agent.md 文件中还用示例的方式指出了输出格式:
[
{
"id": "note_0",
"title": "绝绝子!这个方法让我一周瘦了5斤",
"content": "姐妹们,今天分享一个超好用的减肥方法...",
"likes": 12000,
"comments": 456,
"favorites": 0,
"tags": ["减肥", "健康", "生活方式"],
"published_at": null
}
]
第四步:启动并行创作xhs-writer-agent,启动多个 subagent 完成不同风格的文案写作工作,比如我在 agent 里面规定了三种风格,可以根据自己的运营经验进行微调:
### 干货风
- **标题**:数字+动词+效果(如"3招让你的皮肤嫩到发光✨")
- **开头**:直接抛出核心价值,吸引读者
- **正文**:步骤拆解,每步用 emoji 标记,内容具体可操作
- **结尾**:总结+互动引导(如"姐妹们快试试吧💕")
- **长度**:300-500字
### 吐槽风
- **标题**:痛点+共鸣(如"姐妹们,别再踩这些坑了!😭")
- **开头**:描述痛点场景,引发共鸣
- **正文**:吐槽+解决方案+对比,情绪化表达
- **结尾**:反转或金句收尾
- **长度**:250-400字
第五步:执行汇总格式化 xhs-utils (Skill),只有当 run_state.json 显示所有创作子任务都为 completed 时,才会触发最后的格式化 Skill。
最终生成的文件全部都通过文件来传递,可以极大的减少 token 的消耗:
└── workspace
└── xhs-factory
├── analysis
│ └── hooks_and_patterns.json
├── cleaned_data.json
├── drafts
│ ├── 吐槽风.md
│ ├── 干货风.md
│ └── 故事风.md
├── final
│ ├── 吐槽风_final.md
│ ├── 干货风_final.md
│ └── 故事风_final.md
├── raw_data
│ ├── note_0.json
│ ├── ....
│ └── note_9.json
└── run_state.json
Agent Skill 的核心魅力在于它将大模型的逻辑能力与软件工程的模块化思想深度融合。通过这篇文章的实践,我们可以体会到几个比较有用的实践:
https://x.com/dotey/status/2010176124450484638
https://platform.claude.com/docs/en/agents-and-tools/agent-skills/best-practices
2026-01-10 20:46:07
像经常用 LLM 的同学都知道现在最头疼的问题就是幻觉问题,在金融或精密计算领域,不确定性意味着风险。 如果 Agent 负责分析 NVDA 或 TSLA 的财报,开发者希望它在处理相同数据时,逻辑推导链条是严密的,而不是在不同时间给出自相矛盾的结论。或是需要 LLM 输出 JSON 来触发一个 API,我们不会希望 LLM 在 JSON 里多加了一个逗号或改变了字段名。
最后我还尝试用 LangGraph 的理念自己写了一个 smallest-LangGraph,
传统的 LangChain 核心逻辑是 DAG(有向无环图)。我们可以轻松定义 A -> B -> C 的步骤,但如果你想让 AI 在 B 步骤发现结果不满意,自动跳回 A 重新执行,LangChain 的普通 Chain 很难优雅地实现。并且在复杂的长对话或多步骤任务中,维护一个全局的、可持久化的“状态快照”非常困难。
所以为了解决这些问题,LangGraph 就诞生了。LangGraph 的主要有这些核心优势:
支持“循环(Cycles)”与“迭代”
思考 -> 2. 行动 -> 3. 观察结果 -> 4. 如果不满意,回到第1步。 LangGraph 允许你定义这种闭环逻辑,这在长任务、自我修正代码、多轮调研场景下是刚需。
状态管理
LangGraph 引入了 State 的概念,所有节点共享同一个 TypedDict,你可以精确定义哪些数据是追加的(operator.add),哪些是覆盖的。并且它可以自动保存每一步的状态。即使程序崩溃或需要人工审核,你也可以从特定的“存档点”恢复,而不需要从头运行。
人机协作
LangGraph 允许你将流程设计为“在某处强制停下”,等待人类信号后再继续。这在 LangChain 的线性模型中极难实现,但在 LangGraph 的状态机模型中只是一个节点属性。
高度可控
“如果工具返回报错,必须走 A 路径。” 这种确定性对于生产环境的后端服务至关重要。不能让模型乱输出,在生产环境上严格把控输出结果是很重要的。
由于 LangGraph 的核心思想是将 Agent 的工作流建模为一张有向图(Directed Graph)。所以 LangGraph 有如下几个结构组成
全局状态(State)
这个状态通常被定义为一个 Python 的 TypedDict,它可以包含任何你需要追踪的信息,如对话历史、中间结果、迭代次数等,所有的节点都能读取和更新这个中心状态。
节点(Nodes)
每个节点都是一个接收当前状态作为输入、并返回一个更新后的状态作为输出的 Python 函数。
边(Edges)
边负责连接节点,定义工作流的方向。最简单的边是常规边,它指定了一个节点的输出总是流向另一个固定的节点。而 LangGraph 最强大的功能在于条件边(Conditional Edges)。它通过一个函数来判断当前的状态,然后动态地决定下一步应该跳转到哪个节点。
基于上面的概念,我们来做一个例子,假设我们要开发一个 Agent:它先翻译一段话,然后自己检查是否有语法错误,如果有,就打回重新翻译;如果没有,就结束。
首先,我们先定义状态 (State):
from typing import TypedDict, List
class AgentState(TypedDict):
# 原始文本
input_text: str
# 翻译后的文本
translated_text: str
# 反思反馈
feedback: str
# 循环次数(防止死循环)
iterations: int
定义节点逻辑 (Nodes):
def translator_node(state: AgentState):
print("--- 正在翻译 ---")
# 这里通常会调用 LLM
new_text = f"Translated: {state['input_text']}"
return {"translated_text": new_text, "iterations": state.get("iterations", 0) + 1}
def critic_node(state: AgentState):
print("--- 正在自检 ---")
# 模拟检查逻辑,如果包含 'bad' 字符就认为不合格
if "bad" in state['translated_text']:
return {"feedback": "发现不当词汇,请重试"}
return {"feedback": "OK"}
定义路由逻辑 (Conditional Edges):
def should_continue(state: AgentState):
if state["feedback"] == "OK" or state["iterations"] > 3:
return "end"
else:
return "rephrase"
构建图 (Graph Construction):
from langgraph.graph import StateGraph, END
# 1. 初始化图
workflow = StateGraph(AgentState)
# 2. 添加节点
workflow.add_node("translator", translator_node)
workflow.add_node("critic", critic_node)
# 3. 设置入口点
workflow.set_entry_point("translator")
# 4. 连接节点
workflow.add_edge("translator", "critic")
# 5. 添加条件边 (根据 critic 的反馈决定去向)
workflow.add_conditional_edges(
"critic",
should_continue,
{
"rephrase": "translator", # 如果不 OK,回到翻译节点
"end": END # 如果 OK,结束
}
)
# 6. 编译成可执行应用
app = workflow.compile()
通过上面这种编排方式,可以让 LLM 概率性输出产生确定性的输出,通过各种限制节点,很好的控制了 LLM 的访问的节点。
下面我给出完整的例子,大家可以用这个例子去尝试一下:
from typing import TypedDict, List
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
from langchain_core.messages import SystemMessage, HumanMessage
llm = ChatOpenAI(
temperature=0.6,
model="glm-4.6V",
openai_api_key="",
openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)
class AgentState(TypedDict):
# 原始文本
input_text: str
# 翻译后的文本
translated_text: str
# 反思反馈
feedback: str
# 循环次数(防止死循环)
iterations: int
def translator_node(state: AgentState):
"""翻译节点:负责将中文翻译成英文"""
print(f"\n--- [节点:翻译器] 第 {state.get('iterations', 0) + 1} 次尝试 ---")
iters = state.get("iterations", 0)
feedback = state.get("feedback", "无")
# 构建提示词:如果是重试,带上反馈建议
system_prompt = "你是一个专业的翻译官。请将用户的中文翻译成地道、优雅的英文。"
if iters > 0:
system_prompt += f" 注意:这是第二次尝试,请参考之前的反馈进行改进:{feedback}"
response = llm.invoke([
SystemMessage(content=system_prompt),
HumanMessage(content=state["input_text"])
])
return {
"translated_text": response.content,
"iterations": iters + 1
}
def critic_node(state: AgentState):
"""评审节点:检查翻译质量"""
print("--- [节点:评审员] 正在检查翻译质量... ---")
system_prompt = (
"你是一个严苛的英文编辑。请评价以下翻译是否准确、地道。"
"如果翻译得很好,请只回复关键词:【PASS】。"
"如果翻译有改进空间,请直接指出问题并给出改进建议。"
)
user_content = f"原文:{state['input_text']}\n译文:{state['translated_text']}"
response = llm.invoke([
SystemMessage(content=system_prompt),
HumanMessage(content=user_content)
])
return {"feedback": response.content}
# 4. 定义路由逻辑
def should_continue(state: AgentState):
"""判断是继续修改还是直接结束"""
if "【PASS】" in state["feedback"] or state["iterations"] >= 3:
if state["iterations"] >= 3:
print("!!! 达到最大尝试次数,停止优化。")
return "end"
else:
print(f">>> 反馈建议:{state['feedback']}")
return "rephrase"
# 1. 初始化图
workflow = StateGraph(AgentState)
# 2. 添加节点
workflow.add_node("translator", translator_node)
workflow.add_node("critic", critic_node)
# 3. 设置入口点
workflow.set_entry_point("translator")
# 4. 连接节点
workflow.add_edge("translator", "critic")
# 5. 添加条件边 (根据 critic 的反馈决定去向)
workflow.add_conditional_edges(
"critic",
should_continue,
{
"rephrase": "translator", # 如果不 OK,回到翻译节点
"end": END # 如果 OK,结束
}
)
# 6. 编译成可执行应用
app = workflow.compile()
# 7. 运行时交互
if __name__ == "__main__":
print("=== LangGraph 智能翻译 Agent (输入 'exit' 退出) ===")
while True:
user_input = input("\n请输入想要翻译的中文内容: ")
if user_input.lower() == 'exit':
break
# 初始状态
initial_state = {
"input_text": user_input,
"iterations": 0
}
# 运行图并获取最终状态
final_state = app.invoke(initial_state)
print("\n" + "=" * 30)
print(f"最终翻译结果:\n{final_state['translated_text']}")
print("=" * 30)
Reducer 在 LangGraph 中就是一种更新状态的处理逻辑,如果没有指定默认行为是 用新值覆盖旧值。想要指定 Reducer 只需要通过 typing.Annotated 字段绑定一个 Reducer 函数即可。
比如使用 operator.add 定义这是一个“追加型”字段:
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, END
import operator
# 定义状态结构 (类似 Go 的 Struct)
class AgentState(TypedDict):
# 使用 Annotated 和 operator.add 定义这是一个“追加型”字段
# 每次节点返回消息,都会 append 到这个列表,而不是覆盖它
messages: Annotated[list[str], operator.add]
# 普通字段,默认行为是 Overwrite (覆盖)
# 适合存储状态机当前的步骤或分析结论
current_status: str
# 计数器,也可以使用 operator.add 实现增量累加
retry_count: Annotated[int, operator.add]
在 LangGraph 中,Checkpointer 是一个持久化层接口,这意味着历史的对话记录,可以被自动持久化到数据库(如 SQLite 或其他外部数据库)中。这使得即使应用程序重启或用户断开连接,对话历史也能被保存和恢复,从而实现“真正的多轮记忆”。
LangGraph 提供了多种 Checkpointer 以便应对不同的使用场景:
MemorySaver 保存在内存,适用开发调试、单元测试;
SqliteSaver 保存在本地的.db文件,轻量级应用、边缘计算适合单机部署;
PostgresSaver 保存在 PostgreSQL,适合用在生产环境、多实例部署;
RedisSaver 适合处理高频、短时会话;
LangGraph 通过 thread_id 会话的唯一标识,结合 Checkpointer 就可以实现状态的隔离:
首先指定一个 指定一个 thread_id,所有相关的状态都会被保存到这个线程中。
config = {"configurable": {"thread_id": "conversation_1"}}
graph.invoke(input_data, config)
编译的时候传入 Checkpointer 即可。
# 创建 checkpointer
checkpointer = InMemorySaver()
# 编译图时传入 checkpointer
graph = builder.compile(checkpointer=checkpointer)
完整示例:
from langgraph.checkpoint.memory import InMemorySaver
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END, MessagesState
llm = ChatOpenAI(
temperature=0.6,
model="glm-4.6v",
openai_api_key="",
openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)
# 定义节点函数
def call_model(state: MessagesState):
response = llm.invoke(state["messages"])
return {"messages": response}
# 构建图
builder = StateGraph(MessagesState)
builder.add_node("agent", call_model)
builder.add_edge(START, "agent")
# 创建 checkpointer
checkpointer = InMemorySaver()
# 编译图时传入 checkpointer
graph = builder.compile(checkpointer=checkpointer)
# 第一次对话
config = {"configurable": {"thread_id": "user_123"}}
response1 = graph.invoke(
{"messages": [{"role": "user", "content": "你好,我的名字是张三"}]},
config
)
print(f"AI: {response1['messages'][-1].content}")
# 第二次对话(相同 thread_id)
response2 = graph.invoke(
{"messages": [{"role": "user", "content": "我的名字是什么?"}]},
config # 使用相同的 thread_id
)
print(f"AI: {response2['messages'][-1].content}")
# 获取当前的状态信息
print(f"AI: {graph.get_state(config)}")
除此之外,可以 graph.get_state() / graph.get_state_history() 拿到当前/历史状态;也可以基于 checkpoint 做 replay、update_state(时间旅行能力通常要求启用 checkpointer)。
由于一个 node 也可以连接多个 node,多个 node 也可以连接到 一个 node,所以 LangGraph 设计了 Super-step 来作为原子循环单元。比如下面的例子:
graph.set_entry_point("n1")
graph.add_edge("n1", "n2")
graph.add_edge("n1", "n3")
graph.add_edge("n2", "n4")
graph.add_edge("n3", "n4")
graph.add_edge("n4", END)
LangGraph 只分了三步就执行完了该循环。如下图,第二步的时候会 n2、n3 节点并行执行。

并且每个 super-step 都会自动保存一个 checkpoint,这就是持久化机制的基础。即使程序中断,也能从最后一个 super-step 的 checkpoint 恢复执行。
Human-in-the-loop 本质上就是让 agent “关键时刻”暂停,它的底层靠的是 interrupt + 持久化(checkpoint):暂停时把状态存起来,恢复时从存档续跑。
比如我们想要是线一个场景就是让 AI 去判断是否应该要人工审核,如过需要人工审核,那么就 interrupt 进行中断,然后等人工输入之后根据执行逻辑进行恢复,然后配合Command(resume=...) 恢复。
基本流程可以是这样:
import uuid
from langgraph.types import interrupt, Command
def ask_human(state):
answer = interrupt("Do you approve?")
return {"approved": answer}
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
# 第一次跑:会中断,返回 __interrupt__
graph.invoke({"input": "x"}, config=config)
# 人给了答复后:用 Command(resume=...) 恢复
graph.invoke(Command(resume=True), config=config)
这个例子中interrupt() 会暂停图执行,把一个值(必须可 JSON 序列化)抛给调用方,并依赖 checkpointer 持久化状态;然后你用同一个 thread_id 重新调用图,并传入 Command(resume=...) 来继续。
接下来我们看一个完整的例子,设计一个常见的场景,当模型觉得需要“找专家/找人类”时,会调用一个工具 human_assistance,而这个工具会用 interrupt() 把流程暂停下来,等你在命令行里输入专家建议后,再用 Command(resume=...) 把图唤醒继续跑。
from langgraph.checkpoint.memory import InMemorySaver
from langchain_openai import ChatOpenAI
from typing_extensions import TypedDict
from typing import Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.types import Command, interrupt
from langchain_core.tools import tool
from langgraph.prebuilt import ToolNode, tools_condition
from langchain_core.messages import HumanMessage
llm = ChatOpenAI(
temperature=0.6,
model="glm-4.6v",
openai_api_key="",
openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)
class State(TypedDict):
messages: Annotated[list, add_messages]
@tool
def human_assistance(query: str) -> str:
"""Request assistance from a human."""
human_response = interrupt({"query": query})
return human_response["data"]
tools = [human_assistance]
llm_with_tools = llm.bind_tools(tools)
def chatbot(state: State):
message = llm_with_tools.invoke(state["messages"])
return {"messages": [message]}
tool_node = ToolNode(tools=tools)
graph_builder = StateGraph(State)
graph_builder.add_node("tools", tool_node)
graph_builder.add_conditional_edges(
"chatbot",
tools_condition,
)
graph_builder.add_node("chatbot", chatbot)
graph_builder.add_edge(START, "chatbot")
graph_builder.add_edge("chatbot", END)
memory = InMemorySaver()
graph = graph_builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "test_thread_123"}}
# 第一步:用户提出一个需要“人工协助”的问题
print("--- 第一阶段:AI 运行并遇到 interrupt ---")
initial_input = HumanMessage(content="你好,帮我找个专家回答我的问题")
for event in graph.stream({"messages": [initial_input]}, config, stream_mode="values"):
if "messages" in event:
event["messages"][-1].pretty_print()
# 此时,你会发现程序停止了,因为它卡在 `human_assistance` 的 `interrupt` 处。
# 第二步:模拟人类(你)在一段时间后看到了请求并回复
print("\n--- 第二阶段:模拟人类介入并提供答案 ---")
# 我们构造一个 Command 对象来“唤醒”它
# resume 里的内容会直接成为 interrupt() 函数的返回值
expert_input = input("专家建议: ")
human_feedback = {"data": expert_input}
for event in graph.stream(
Command(resume=human_feedback), # 这里是恢复运行的关键
config,
stream_mode="values"
):
if "messages" in event:
event["messages"][-1].pretty_print()
snapshot = graph.get_state(config)
print(snapshot.values)
ReAct由Shunyu Yao于2022年提出[1],其核心思想是模仿人类解决问题的方式,将推理 (Reasoning) 与行动 (Acting) 显式地结合起来,形成一个“思考-行动-观察”的循环。

ReAct范式通过一种特殊的提示工程来引导模型,使其每一步的输出都遵循一个固定的轨迹:
Action后从外部工具返回的结果,例如或API的返回值。智能体将不断重复这个 Thought -> Action -> Observation 的循环,将新的观察结果追加到历史记录中,形成一个不断增长的上下文,直到它在Thought中认为已经找到了最终答案,然后输出结果。
from typing import Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
# --- 1. 双手:定义查天气的工具 ---
@tool
def get_weather(city: str):
"""查询指定城市的天气"""
# 这里模拟后端 API 返回数据
if "北京" in city:
return "晴天,25度"
return "阴天,20度"
tools = [get_weather]
tool_node = ToolNode(tools)
# --- 2. 记忆:定义存储对话的状态 ---
class State(TypedDict):
messages: Annotated[list, add_messages]
# --- 3. 大脑:定义思考逻辑 ---
model = ChatOpenAI(
temperature=0.6,
model="glm-4.6v",
openai_api_key="",
openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
).bind_tools(tools)
def call_model(state: State):
# 大脑看一眼目前的对话,决定是直接说话还是去用手拿工具
return {"messages": [model.invoke(state["messages"])]}
# --- 4. 路由:判断下一步是干活还是结束 ---
def should_continue(state: State):
last_message = state["messages"][-1]
# 如果大脑发出的指令包含“调用工具”,就去 tools 节点
if last_message.tool_calls:
return "tools"
# 如果大脑直接说话了,就结束
return END
# --- 5. 编排图(把脑和手连起来) ---
workflow = StateGraph(State)
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
workflow.set_entry_point("agent")
# 条件边:agent 运行完,判断是去 tools 还是结束
workflow.add_conditional_edges("agent", should_continue)
# 普通边:tools 运行完(干完活了),必须把结果拿回给 agent 看
workflow.add_edge("tools", "agent")
app = workflow.compile()
# --- 6. 执行测试 ---
for chunk in app.stream({"messages": [("user", "北京今天天气怎么样?")]}):
print(chunk)
Plan-and-Solve 顾名思义,这种范式将任务处理明确地分为两个阶段:先规划 (Plan),后执行 (Solve)。Plan-and-Solve Prompting 由 Lei Wang 在2023年提出。其核心动机是为了解决思维链在处理多步骤、复杂问题时容易“偏离轨道”的问题。
Plan-and-Solve 将整个流程解耦为两个核心阶段:

import operator
from typing import Annotated, List, Tuple, Union
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
# 1. 定义状态 (State)
class PlanExecuteState(TypedDict):
input: str # 原始问题
plan: List[str] # 当前待办清单
past_steps: Annotated[List[Tuple], operator.add] # 已完成的步骤和结果
response: str # 最终答案
# 2. 定义结构化输出模型 (用于 Planner)
class Plan(BaseModel):
"""步骤清单"""
steps: List[str] = Field(description="为了回答问题需要执行的步骤")
# 3. 定义节点逻辑
model = ChatOpenAI(
temperature=0.6,
model="glm-4.6v",
openai_api_key="",
openai_api_base="https://open.bigmodel.cn/api/paas/v4/"
)
planner_model = model.with_structured_output(Plan, method="function_calling")
# --- 节点 A: 规划者 ---
def planner_node(state: PlanExecuteState):
plan = planner_model.invoke(f"针对以下问题制定计划: {state['input']}")
return {"plan": plan.steps}
# --- 节点 B: 执行者 (这里简化了工具调用) ---
def executor_node(state: PlanExecuteState):
step = state["plan"][0] # 取当前第一步
print(f"--- 正在执行: {step} ---")
# 模拟工具执行结果
result = f"已完成 {step} 的查询,结果为: [模拟数据]"
return {"past_steps": [(step, result)], "plan": state["plan"][1:]}
# --- 节点 C: 重规划者 (决定是继续还是结束) ---
def replanner_node(state: PlanExecuteState):
if not state["plan"]: # 如果清单空了,让 AI 生成最终总结
summary = model.invoke(
f"请基于已完成的步骤和结果给出最终答案:{state['past_steps']}"
)
return {"response": summary.content}
return {"response": None}
# 4. 路由逻辑
def should_continue(state: PlanExecuteState):
if state["response"]:
return END
return "executor"
# 5. 编排图
workflow = StateGraph(PlanExecuteState)
workflow.add_node("planner", planner_node)
workflow.add_node("executor", executor_node)
workflow.add_node("re-planner", replanner_node)
workflow.set_entry_point("planner")
workflow.add_edge("planner", "executor")
workflow.add_edge("executor", "re-planner")
# 循环逻辑:根据 re-planner 的判断决定是否回 executor
workflow.add_conditional_edges("re-planner", should_continue)
app = workflow.compile()
# 6. 测试
input_query = {"input": "对比北京和上海的天气,哪个更热?"}
for event in app.stream(input_query):
print(event)
Reflection 机制的核心思想,正是为智能体引入一种事后(post-hoc)的自我校正循环,使其能够像人类一样,审视自己的工作,发现不足,并进行迭代优化。 Reflection 框架是Shinn, Noah 在2023年提出,其核心工作流程可以概括为一个简洁的三步循环:执行 -> 反思 -> 优化。

from typing import TypedDict
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
class ReflectionState(TypedDict):
prompt: str
draft: str
critique: str
final: str
iteration: int
llm = ChatOpenAI(
temperature=0.6,
model="glm-4.6v",
openai_api_key="",
openai_api_base="https://open.bigmodel.cn/api/paas/v4/",
)
MAX_ITERS = 2
def generate_draft(state: ReflectionState):
msg = llm.invoke(f"请写一段简短答案:{state['prompt']}")
return {"draft": msg.content, "iteration": 0}
def reflect_on_draft(state: ReflectionState):
prompt = (
"你是严格的审稿人。请指出这段答案的问题并给出改进建议。"
"如果没有明显问题,请只输出 NO_ISSUES。\n\n"
f"答案:\n{state['draft']}"
)
critique = llm.invoke(prompt)
print(f"--- 正在执行: reflect,critique:\n {critique.content} ---")
return {"critique": critique.content}
def revise_draft(state: ReflectionState):
prompt = (
"请根据以下反馈重写答案,保持简短清晰:\n\n"
f"反馈:\n{state['critique']}\n\n"
f"原答案:\n{state['draft']}"
)
revision = llm.invoke(prompt)
print(f"--- 正在执行: revise,原答案:\n{state['draft']},改进后:\n{revision.content} ---")
return {"draft": revision.content, "iteration": state["iteration"] + 1}
def finalize(state: ReflectionState):
return {"final": state["draft"]}
def should_reflect(state: ReflectionState):
if state["critique"].strip() == "NO_ISSUES":
return "finalize"
if state["iteration"] >= MAX_ITERS:
return "finalize"
return "revise"
workflow = StateGraph(ReflectionState)
workflow.add_node("generate", generate_draft)
workflow.add_node("reflect", reflect_on_draft)
workflow.add_node("revise", revise_draft)
workflow.add_node("finalize", finalize)
workflow.add_edge(START, "generate")
workflow.add_edge("generate", "reflect")
workflow.add_conditional_edges("reflect", should_reflect)
workflow.add_edge("revise", "reflect")
workflow.add_edge("finalize", END)
app = workflow.compile()
if __name__ == "__main__":
input_data = {"prompt": "用三句话解释什么是 LangGraph。"}
result = app.invoke(input_data)
print("最终答案:")
print(result["final"])
大致流程就是,首先里面需要有两个角色:写稿人和审稿人,然后用 should_reflect 来判断是否需要重写,然后用 MAX_ITERS 来限制一下最大撰写次数。
[START]
|
v
(generate_draft)
|
v
(reflect_on_draft) -- NO_ISSUES --> (finalize) --> [END]
|
| iteration >= MAX_ITERS
+-----------------------> (finalize) --> [END]
|
+-- else --> (revise_draft) --+
|
v
(reflect_on_draft)
Multi-Agent 模式是将复杂的任务拆解为多个专门化、独立且可协同的微服务,每个服务(Agent)只负责一个特定的领域。
因为单个 Prompt 包含太多工具和指令会导致 LLM “迷失”,模型表现下降。所以通过使用doge Agent 进行职责分离,不同的 Agent 可以使用不同的 Prompt、不同的模型(如 GPT-4o 负责决策,Llama-3 负责写代码),甚至不同的工具集。
比如下面的例子中:一个“PM” Agent 负责拆解任务,并将子任务分发给不同的“员工(Workers)”。


from typing import Annotated, Literal
from typing_extensions import TypedDict
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
# Multi-agent pattern: a supervisor routes work between specialist agents.
llm = ChatOpenAI(
temperature=0.4,
model="glm-4.6v",
openai_api_key="",
openai_api_base="https://open.bigmodel.cn/api/paas/v4/",
)
class State(TypedDict):
messages: Annotated[list, add_messages]
next: str
turn: int
MAX_TURNS = 6
def _call_agent(system_prompt: str, messages: list, name: str):
response = llm.invoke([{"role": "system", "content": system_prompt}] + messages)
return {
"messages": [
{"role": "assistant", "name": name, "content": response.content}
],
"turn": 1,
}
def supervisor(state: State):
if state["turn"] >= MAX_TURNS:
return {"next": "finish"}
system = (
"You are a supervisor managing a team: researcher, writer, critic. "
"Choose who should act next or finish. "
"Respond with exactly one word: researcher, writer, critic, finish."
)
response = llm.invoke([{"role": "system", "content": system}] + state["messages"])
decision = response.content.strip().lower()
for option in ("researcher", "writer", "critic", "finish"):
if option in decision:
return {"next": option}
return {"next": "finish"}
def researcher(state: State):
system = (
"You are a researcher. Gather key facts and constraints for the task. "
"Be concise and list only essential points."
)
return _call_agent(system, state["messages"], "researcher")
def writer(state: State):
system = (
"You are a writer. Produce a clear, structured response using the context. "
"If facts are missing, note assumptions."
)
return _call_agent(system, state["messages"], "writer")
def critic(state: State):
system = (
"You are a critic. Identify gaps, risks, or unclear parts in the draft, "
"then suggest improvements."
)
return _call_agent(system, state["messages"], "critic")
def route_next(state: State) -> Literal["researcher", "writer", "critic", "finish"]:
return state["next"]
builder = StateGraph(State)
builder.add_node("supervisor", supervisor)
builder.add_node("researcher", researcher)
builder.add_node("writer", writer)
builder.add_node("critic", critic)
builder.add_edge(START, "supervisor")
builder.add_conditional_edges(
"supervisor",
route_next,
{
"researcher": "researcher",
"writer": "writer",
"critic": "critic",
"finish": END,
},
)
builder.add_edge("researcher", "supervisor")
builder.add_edge("writer", "supervisor")
builder.add_edge("critic", "supervisor")
app = builder.compile()
if __name__ == "__main__":
user_input = "创建一款广告招商的帖子"
initial_state = {
"messages": [{"role": "user", "content": user_input}],
"next": "supervisor",
"turn": 0,
}
for event in app.stream(initial_state):
for value in event.values():
if "messages" in value:
msg = value["messages"][-1]
name = msg.get("name", "assistant")
print(f"[{name}] {msg['content']}")
https://datawhalechina.github.io/
https://www.philschmid.de/agentic-pattern
https://blog.dailydoseofds.com/p/5-agentic-ai-design-patterns
https://zhuanlan.zhihu.com/p/1972437682400519404
https://www.zhihu.com/people/yuan-chelsea
2025-12-28 21:35:01
Agent Skills 是一种轻量级、开放的格式,用于扩展 AI Agent 的能力和专业知识。本质上,一个 Skill 就是一个包含 SKILL.md 文件的文件夹。
Agent Skills 的作用在于:
name 和 description,任务匹配时才加载完整指令那么这里就有个问题,为什么有了 MCP 之后还需要 Agent Skills?
这个问题其实有过很多争论,有开发者评论说:"Skill 和 MCP 是两种东西,Skill 是领域知识,告诉模型该如何做,本质上是高级 Prompt;而 MCP 对接外部工具和数据。" 也有人认为:"从 Function Call 到 Tool Call 到 MCP 到 Skill,核心大差不差,就是工程实践和表现形式的优化演进。"
其实我还是觉得要从 MCP 和 Agent Skills 设计上区分他们到底有什么不同。
MCP 其实就是指的提供了一个远程的接口,可以用这个接口来接外部世界:能读取数据库、能访问 API、能执行命令行;
Agent Skills 更像一个操作手册,主要存在本地的文件里面,不需要调用外部接口,主要是用来告诉 AI 有那些领域知识,然后教 AI 如何正确、高效地使用这些手,按照什么步骤去完成特定任务。
除此之外,Agent Skills 解决了 MCP 无法解决的三个核心问题:
节省 token
在使用 mcp 工具的时候,通常工具的定义(名字、参数、描述)全部塞进 AI 的提示词(Prompt)里,AI 才能知道怎么调用。这会极大地消耗 Token,可能占用数万个 token。据社区开发者反馈,仅加载一个 Playwright MCP 服务器就会占用 200k 上下文窗口的 8%,这在多轮对话中会迅速累积,导致成本飙升和推理能力下降。
而对于 Agent Skills 来说通过渐进式披露(Progressive Disclosure)机制,智能体按需逐步加载,既确保必要时不遗漏细节,又避免一次性将过多内容塞入上下文窗口,来解决这个问题。
解决“会用工具但不懂业务”的问题(业务流程固化)
AI 只懂 MCP 是不会理解业务的,比如 MCP 提供了 delete_database()(删除数据库)的工具。这很强大,但也危险。AI 可能因为你的一个模糊指令直接删库。这个时候就可以写一个 Skill,规定:
当用户要求删除数据库时,必须严格执行以下流程:
check_backup() 确认有备份。send_alert() 给管理员发通知。delete_database()。降低开发门槛
开发一个 MCP Server 需要后端开发能力,提供接口。Skills 只需要提供 SKILL.md 即可。比如你是资深运营,你可以写一个“小红书文案 Skill”,里面不需要代码,只需要写清楚:“第一步先分析竞品,第二步提取关键词,第三步套用这个模板…”。
所以综上 Agent Skills 至少为开发者带来三大核心价值:
能力复用:一次编写,在 Copilot、Cursor、Claude 等多个 Agent 产品中使用,还可跨团队共享或通过 GitHub 公开发布。
知识沉淀:将团队最佳实践固化为版本化的 Skills,如代码审查规范、部署流程、数据分析模板等,确保工作流程的一致性。
提升效率:通过明确的指导让 Agent 更准确地执行复杂任务,减少试错和修正,提供一致的输出质量。
Agent Skills 最核心的创新是渐进式披露(Progressive Disclosure)机制。AI 在使用 Agent Skills 的时候并没有将整个知识库加载到人工智能有限的上下文窗口中,而是以智能的、高效的层级方式加载信息。
SKILL.md 文件。该文件包含执行任务的分步指令和核心逻辑。这种分层方法使得整个 Agent Skills 系统具有极高的可扩展性。关键在于,当Agent Skills执行脚本时,代码本身永远不会进入上下文窗口;只有脚本的输出才会进入。
社区开发者分享的实践案例充分证明了渐进式披露的威力。在一个真实场景中:
一个最简单的 agent skill 其实只需要包含一个 SKILL.md 文件即可,其他的 scripts、 references、assets 都是可选的。
my-cool-skill/ <-- 技能文件夹
├── SKILL.md <-- 核心文件:包含元数据和指令(必须)
├── scripts/ <-- (可选) 包含 Python/Bash 脚本
│ └── analyze.py
└── templates/ <-- (可选) 包含模板文件
│ └── report.md
└── assets/ <-- Optional: 一些模版资源
SKILL.md
文件内容分为两部分:YAML 头信息 (Frontmatter) 和 Markdown 正文。
Frontmatter 必须用 — 包裹起来,像下面这样,最短可以只包含 name 和 description 两个字段:
---
name: skill-name
description: A description of what this skill does and when to use it.
---
name 字段规则:
a-z, -)--)description 字段规则:
包含可选字段:
---
name: pdf-processing
description: Extract text and tables from PDF files, fill forms, merge documents.
license: Apache-2.0
metadata:
author: example-org
version: "1.0"
---
这是 AI 加载技能后看到的具体操作指南。包括步骤、规则、输入输出格式等。
比如我想要写个示例:
构建一个“代码审查专家”技能。
假设你想创建一个技能,专门用来按你团队的风格审查 Python 代码。
那么我们可以写一个这样的 SKILL.md :
---
name: python-code-review
description: 当用户要求审查 Python 代码,或者需要检查代码质量、寻找 bug 时使用此技能。不要用于其他语言。
---
# Python Code Review Guidelines
作为 Python 代码审查专家,请遵循以下步骤审查代码:
## 1. 核心原则
- **类型提示 (Type Hints)**: 所有函数必须包含参数和返回值的类型提示。
- **文档字符串 (Docstrings)**: 使用 Google 风格的文档字符串。
- **错误处理**: 检查是否使用了裸露的 `except:`,必须捕获特定异常。
## 2. 审查清单
请按以下顺序检查代码:
1. 运行静态分析逻辑(如果在 scripts/ 文件夹中有 lint 脚本,请优先参考)。
2. 检查变量命名是否符合 snake_case。
3. 寻找潜在的 N+1 查询问题(如果涉及数据库)。
## 3. 输出格式
请以以下格式输出审查报告:
**🔍 审查摘要**
- 评分: [1-10]
- 主要问题: [摘要]
**📝 详细建议**
| 行号 | 问题 | 建议修改 | 优先级 |
|------|------|----------|--------|
| 12 | 缺少类型提示 | `def func(a: int) -> str:` | 高 |
为了让 Skill 更聪明、更好用:
精确的 Description: AI 只有在 description 与用户请求匹配时才会加载这个技能,尽量应包含:
❌ 坏的写法: "一个帮助代码的工具"
✅ 好的写法: "当用户需要根据 PEP8 标准审查 Python 代码并生成表格报告时使用。"
提供清晰的示例,在 SKILL.md 正文中提供:
原子化: 一个 Skill 最好只做一件事(例如:一个 Skill 做代码审查,另一个 Skill 做文档生成),不要把所有功能塞进一个文件。
对于复杂的 Skill,将详细文档分离,可以使用 References 目录:
data-analysis/
├── SKILL.md # 简要说明和快速开始
├── scripts/
│ └── analyze.cs
└── references/
├── REFERENCE.md # 详细 API 参考
├── examples.md # 更多示例
└── algorithms.md # 算法说明
Agent Skills 和 MCP 一样都是 anthropics 公司提出来的,所以他们也提供了很多好用的 skills 供大家使用,如果选择将官方 Skills 安装到当前项目,就在终端输入这条命令:
openskills install anthropics/skills
安装成功后,你就会在Cursor、Trae等工具的文件管理区看到 .claude/skills 的文件夹。
当然也可去下面三方的收集网站上面下载别人写好的 skills:
先在项目根目录创建一个 AGENTS.md 文件,然后运行
openskills sync
确认后按回车键,你选择的 Skills 就会写进之前空白的 AGENTS.md 文档中。它将作为 Cursor、Trae 等 Coding Agent 接下来使用 Skills 的指导文件。
Skills 是可以被自动调用的,如果你想手动调用,可以直接在提示词中指定要调用的具体 Skills,比如:
调用 frontend-design skills,用HTML开发一个视频剪辑软件的SaaS介绍页
最后在举个例子 MCP 如何协同 Agent Skills 一起完成工作。想象一下要实现一个自动化的金融分析代理:
在这种情况下,MCP 处理底层标准化的工具调用任务,而Agent Skills负责协调高层智能工作流程。 Agent Skills是一种可重用的资产,它捕捉了分析师的独特专业知识,使其能够立即扩展。这种强大的组合使开发人员能够构建健壮的系统,其中任务逻辑与其所用工具的实现完全分离。
https://www.cnblogs.com/sheng-jie/p/19381647
https://zhuanlan.zhihu.com/p/1986802048608527579
https://www.zhihu.com/question/1890546618509538123
https://agentskills.so/skills-blogs/agent-skills-compare-mcp
2025-12-20 17:50:28
这篇文章主要是看一下Rust有哪些比较有意思的设计,相比其他语言之下为什么要这么设计。
Rust 的变量在默认情况下是不可变的,但是可以通过 mut 关键字让变量变为可变的,让设计更灵活。也就是说,如果我们这么写,编译会报错:
fn main() {
let x = 5;
println!("The value of x is: {}", x);
x = 6;
println!("The value of x is: {}", x);
}
报错:
error[E0384]: cannot assign twice to immutable variable `x`
--> src/main.rs:4:5
|
2 | let x = 5;
| - first assignment to `x`
3 | println!("The value of x is: {}", x);
4 | x = 6;
| ^^^^^ cannot assign twice to immutable variable
这其实和 C++ 和 Go(以及 Java、Python 等绝大多数主流语言)的设计哲学完全相反。C++ 和 Go 默认就是可变的,除非加上 const 表示是个常量。
Rust 这样做主要是为了让代码变得清晰点,降低心智负担 。一个变量往往被多处代码所使用,其中一部分代码假定该变量的值永远不会改变,而另外一部分代码却无情的改变了这个值,在实际开发过程中,这个错误是很难被发现的,特别是在多线程编程中。再来就是编译器优化,如果编译器知道一个变量绝不会变,它可以更激进地进行常量折叠、寄存器分配等优化。
在 Rust 中,可变性很简单,只要在变量名前加一个 mut 即可:
fn main() {
let mut x = 5;
println!("The value of x is: {}", x);
x = 6;
println!("The value of x is: {}", x);
}
但是 Rust 提供了 Shadowing 的功能:
fn main() {
let x = 5;
// x = x + 1; // 报错,不能修改
let x = x + 1; // 合法!这是一个全新的 x,它遮蔽了旧的 x
let x = "Hello"; // 甚至可以改变类型!
println!("{}", x);
}
这点很有意思,在很多语言是不可以这么重复声明变量的。我觉得还是和不可变性有关,既然都不可变了,重复声明也是安全的,并且复用同一个变量名,而不需要想出 x_str, x_int, x_final 这种名字,相对来说代码会简洁一些。
现在内存管理一般分为两类:
malloc/new),需要手动负责释放 (free/delete),但是这是很痛苦的,有时候忘记释放就会内存泄露,或者释放两次就会导致崩溃或为定义的行为;而 Rust 使用所有权 (Ownership)来控制。编译器在编译阶段通过一套严格的规则,自动在合适的地方插入 free 代码。没有运行时 GC,也不依赖手动管理。这个编译器定义的所有权规则有以下几条:
比如这个例子:
fn main() {
let s1 = String::from("hello");
let s2 = s1; // 赋值操作
// println!("{}, world!", s1); // ??? 这里能打印 s1 吗?
}
在 C++ 中,如果 s1 申请的是一个堆上的对象,如果是浅拷贝 (Shallow Copy),s1 和 s2 指向堆上的同一块内存。如果函数结束,析构函数执行两次,导致 Double Free 错误。
在 Rust 中,由于所有权的存在,这一行 let s2 = s1;代码执行后,s1 会当场死亡!发生 所有权转移 (Move)。Rust 认为:堆上的那块 "hello" 内存,现在归 s2 管了。所以如果你后面再用 s1,编译直接报错。
报错:
error[E0382]: borrow of moved value: `s1`
--> src/main.rs:5:28
|
2 | let s1 = String::from("hello");
| -- move occurs because `s1` has type `String`, which does not implement the `Copy` trait
3 | let s2 = s1;
| -- value moved here
4 |
5 | println!("{}, world!", s1);
| ^^ value borrowed here after move
如果你确实需要两个独立的字符串数据(深拷贝),你需要显式调用 .clone()。
let s1 = String::from("hello");
let s2 = s1.clone(); // 在堆上重新开辟内存,复制数据
println!("s1 = {}, s2 = {}", s1, s2); // s1 依然活着
除此之外,要注意栈上的数据 ,对于基本类型,基本类型(存储在栈上),Rust 会自动拷贝,其他的非基本类型会存储在堆上,不能自动拷贝。
let x = 5;
let y = x;
代码首先将 5 绑定到变量 x,接着拷贝 x 的值赋给 y,最终 x 和 y 都等于 5,因为整数是 Rust 基本数据类型,是固定大小的简单值,因此这两个值都是通过自动拷贝的方式来赋值的,都被存在栈中,完全无需在堆上分配内存。
借用(Borrowing),就是允许你在不获取所有权 (Ownership) 的情况下访问数据。简单来说,借用就是创建数据的引用 (Reference)。
借用有两种方式,不可变借用 : &T,可变借用:&mut T。在任意给定的作用域中,你只能满足以下两个条件之一:
&T)。&mut T)。即:要么多读,要么独写,绝不能同时存在,这个规则非常像 读写锁(Read-Write Lock)。
比如下面就是合法的借用 (多读):
fn main() {
let s = String::from("hello"); // s 拥有所有权
let r1 = &s; // 不可变借用 1
let r2 = &s; // 不可变借用 2
// 可以同时存在多个读者
println!("{}, {}", r1, r2);
} // 借用结束
合法的借用 (独写):
fn main() {
let mut s = String::from("hello");
let r1 = &mut s; // 可变借用
r1.push_str(", world"); // 修改数据
println!("{}", r1);
}
非法的借用 (读写冲突):
fn main() {
let mut s = String::from("hello");
let r1 = &s; // r1 借用以此只读
let r2 = &mut s; // 错误!不能在有不可变引用的同时创建可变引用
// 因为 r2 可能会改变 s,导致 r1 看到的数据失效或不一致
println!("{}, {}", r1, r2);
}
需要注意的是,现在的 Rust 编译器非常聪明,它的“作用域”不再仅仅是花括号 {},而是看引用的最后一次使用位置,这叫做 Non-Lexical Lifetimes (NLL)。
fn main() {
let mut s = String::from("hello");
let r1 = &s;
let r2 = &s;
println!("{} and {}", r1, r2);
// --- r1 和 r2 的作用在这里就结束了!因为后面没再用过它们 ---
let r3 = &mut s; // 现在可以了!
// 因为上面的不可变引用已经不再使用了,Rust 判定冲突解除。
println!("{}", r3);
}
Rust 这么做其实也是为了安全,我们看看 C++ 中常见的坑:
// C++ 伪代码
vector<int> v = {1, 2, 3};
int& element = v[0]; // 获取第一个元素的引用
v.push_back(4);
// 危险!如果 push_back 导致 vector 扩容(重新分配内存),
// 原来的内存被释放,element 现在指向的是垃圾内存。
// 再次访问 element 会导致 Crash。
在 Rust 中:
element 是一个不可变借用 (&T)。v.push_back 需要获取 v 的可变借用 (&mut T)。& 了,不能再借出 &mut。&str 和 String
在其他很多语言用 "hello" 这种方式创建的一般就叫字符串,但在 rust 里面不一样,它实际上是申明了一个只读的字符串字面量 &str,这意味着它是不可变的,数据直接硬编码在编译后的可执行文件 (Binary) 中(静态存储区),有点像 const。
let name = "Rust"; // 类型是 &str
println!("Hello, {}", name);
// name.push_str(" World"); // 报错!&str 不能修改
在 rust 里面只有使用 String::from(...) 声明的字符串才是我们常规意义上理解的字符串,比如可以对它进行修改、拼接和传递。
修改字符串 (必须加 mut)
fn main() {
// 注意:如果要修改,必须加 mut 关键字
let mut s = String::from("Hello");
// 1. 追加字符串切片 push_str()
s.push_str(", world");
// 2. 追加单个字符 push()
s.push('!');
println!("{}", s); // 输出: Hello, world!
}
字符串拼接 (连接两个字符串),有两种主要方式:使用 + 运算符或 format! 宏。
fn main() {
let s1 = String::from("Tick");
let s2 = String::from("Tock");
// 注意细节:
// s1 必须交出所有权 (被移动了),后面不能再用了
// s2 必须传引用 (&s2)
let s3 = s1 + " " + &s2;
// 类似 C 语言的 sprintf,生成一个新的 String
let s4 = format!("{} - {}", s1, s2);
}
String 可以自动假装成 &str
fn main() {
let s = String::from("Hello World");
// 场景 1: 函数需要 String (拿走所有权)
take_ownership(s);
// println!("{}", s); // 报错,s 已经被拿走了
// --- 重新创建一个 s ---
let s = String::from("Hello Again");
// 场景 2: 函数需要 &str (只读借用) -> 【这是最常用的】
// 虽然 s 是 String,但 &s 可以被当做 &str 用
borrow_it(&s);
println!("s 还在: {}", s); // s 还在
}
fn take_ownership(input: String) {
println!("我拿到了所有权: {}", input);
} // input 在这里被释放
fn borrow_it(input: &str) {
println!("我只是借看一下: {}", input);
}
转换回切片 (Slicing)
let s = String::from("Hello World");
let hello = &s[0..5]; // 提取前5个字节
let world = &s[6..]; // 从第6个字节取到最后
需要注意的是,Rust 的字符串在底层是 UTF-8 编码的字节数组,不支持直接通过数组下标索引(Index)访问字符。比如这样是会报错的:
let s1 = String::from("hello");
let h = s1[0];
Rust 的 String 本质上是一个 Vec<u8>(字节向量)。对于纯英文: "hello",每个字母占 1 个字节。s[0] 确实是 'h'。对于中文/特殊符号: "你好"。在 UTF-8 中,’你’ 占用 3 个字节。
为了强迫开发者意识到 “字符 ≠ 字节” 这一事实,Rust 干脆在编译阶段就禁止了 String[index] 这种写法。
所以,为了获取第 N 个字符 (最常用,安全),需要使用 .chars() 迭代器:
fn main() {
let s1 = String::from("hello");
// .chars() 把字符串解析为 Unicode 字符
// .nth(0) 取出第 0 个元素
// 结果是 Option<char>,因为字符串可能是空的
match s1.chars().nth(0) {
Some(c) => println!("第一个字符是: {}", c),
None => println!("字符串是空的"),
}
}
String 是为了当你需要在运行时动态生成、修改或者持有字符串数据时使用的(比如从网络读取数据,拼接 SQL)。&str 是为了高性能传递。当你只需要“看”一下字符串,而不需要拥有它时,用 &str。因为它只是传两个整数(指针+长度),不需要拷贝堆上的数据,速度极快。Rust 的枚举和其他语言最大的不同应该就是能挂载数据。每一个枚举成员,都可以关联不同类型、不同数量的数据。
enum Message {
Quit, // 没有关联数据
Move { x: i32, y: i32 }, // 像 Struct 一样包含命名字段
Chat(String), // 包含一个 String
ChangeColor(i32, i32, i32), // 包含三个 i32
}
fn main() {
// 创建不同类型的消息
let msg1 = Message::Move { x: 10, y: 20 };
let msg2 = Message::Chat(String::from("你好"));
}
比如上面的 Message 这个枚举,Quit 成员没有挂载数据,其他三个都挂载了各不相同的数据类型。
然后我们就可以根据枚举挂载的不同数据,使用 match 来进行匹配,这有点像 C++ 里面的 switch-case:
fn process_message(msg: Message) {
match msg {
Message::Quit => {
println!("玩家退出了");
}
Message::Move { x, y } => {
println!("玩家移动到了: x={}, y={}", x, y);
}
Message::Chat(text) => {
println!("玩家发送消息: {}", text);
}
Message::ChangeColor(r, g, b) => {
println!("更改颜色为: R{} G{} B{}", r, g, b);
}
}
}
但是需要注意的是 match 会强制你处理所有可能的情况,如果你漏写了 Message::Quit,代码根本编译不过。这保证了你不会遗漏任何一种业务逻辑。如果不想在匹配时列出所有值的时候,可以使用特殊的模式 _ 替代:
let some_u8_value = 0u8;
match some_u8_value {
1 => println!("one"),
3 => println!("three"),
5 => println!("five"),
7 => println!("seven"),
_ => (),
}
顺带提一下,match 其实也可以用来返回赋值,这点就和很多其他语言不同:
enum IpAddr {
Ipv4,
Ipv6
}
fn main() {
let ip1 = IpAddr::Ipv6;
let ip_str = match ip1 {
IpAddr::Ipv4 => "127.0.0.1",
_ => "::1",
};
println!("{}", ip_str);
}
这里匹配到 _ 分支,所以将 "::1" 赋值给了 ip_str。
上面我们也说了枚举可以挂载数据,所以相应的 match 也能把数据解包出来:
enum Action {
Say(String),
MoveTo(i32, i32),
ChangeColorRGB(u16, u16, u16),
}
fn main() {
let actions = [
Action::Say("Hello Rust".to_string()),
Action::MoveTo(1,2),
Action::ChangeColorRGB(255,255,0),
];
for action in actions {
match action {
Action::Say(s) => {
println!("{}", s);
},
Action::MoveTo(x, y) => {
println!("point from (0, 0) move to ({}, {})", x, y);
},
Action::ChangeColorRGB(r, g, _) => {
println!("change color into '(r:{}, g:{}, b:0)', 'b' has been ignored",
r, g,
);
}
}
}
}
上面当代码执行到 match action 时 Rust 发现 action 是 Action::Say 这一类,就会解构这个类型里面的 String,Action::Say(s)的意思是:“如果是 Say,把它肚子里的数据拿出来,赋值给变量 s。”,其他的同理。
Rust 的枚举类型还有一个特点就是结构比较紧凑,枚举 = 标签(tag)+ 数据(payload),比如这个枚举:
enum Message {
Quit, // 0 字节 payload
Move { x: i32, y: i32 }, // 8 字节
Write(String), // 24 字节(64位平台上,大概是 3 个指针)
}
tag 就是当前是 Quit/Move/Write 中的哪一个,数据(payload)就是容纳“所有变体里最大的那个数据”的空间,这里就是24字节的 String 类型,当然还有根据对齐要求,可能在 tag 和 payload 之间加 padding,那么一个枚举的结构就是:
size_of::<Message>() ≈ size_of::<payload最大> + size_of::<tag> + 对齐填充
所以可以看到枚举的结构比使用结构体更加紧凑。
最后,枚举甚至还能可以有自己的方法:
#![allow(unused)]
enum Message {
Quit,
Move { x: i32, y: i32 },
Write(String),
ChangeColor(i32, i32, i32),
}
impl Message {
fn call(&self) {
// 在这里定义方法体
}
}
fn main() {
let m = Message::Write(String::from("hello"));
m.call();
}
Tony Hoare, null 的发明者,曾经说过一段非常有名的话:
我称之为我十亿美元的错误。当时,我在使用一个面向对象语言设计第一个综合性的面向引用的类型系统。我的目标是通过编译器的自动检查来保证所有引用的使用都应该是绝对安全的。不过在设计过程中,我未能抵抗住诱惑,引入了空引用的概念,因为它非常容易实现。就是因为这个决策,引发了无数错误、漏洞和系统崩溃,在之后的四十多年中造成了数十亿美元的苦痛和伤害。
所以 Rust 只有 Option 枚举,没有 null / nil / nullptr。Rust 标准库是这样定义的:
enum Option<T> {
None, // 相当于 null
Some(T), // 包含一个值
}
Option 可以配合 match 来使用:
let some_number = Some(5);
let absent_number: Option<i32> = None;
// let sum = some_number + 1; // 报错!不能把 Option<i32> 和 i32 相加
// 必须处理为空的情况
match some_number {
Some(i) => println!("数字是: {}", i),
None => println!("没有数字"),
}
使用 Option 还有个好处就是编译器会做一种优化叫做 “空指针优化 (Null Pointer Optimization)”。
对于 Option<&T>(引用类型的 Option)或 Option<Box<T>>(堆指针的 Option)Rust 编译器在底层依然把它看作一个指针,Some(ptr) 对应非零地址,None 对应 0 (null) 地址。所以在汇编层面,Rust 的 Option<&T> 和 C++ 的 T\* 是一模一样的,内存占用也是一样的(64位机器上都是8字节)。
所以把指针包在 Option 枚举里不会增加内存开销,也不会导致运行变慢。
Rust 泛型其实是有点像C++的模版的,而不是类似 Java 或 C# 的泛型。 Java 泛型是在编译期进行检查,但在运行时被“擦除”。例如,List<String> 和 List<Integer> 在 Java 虚拟机(JVM)看来,本质上都是 List<Object>。这样做优点是节省了代码空间,缺点是牺牲了很多性能,因为在运行的时候虚拟机必须进行大量的类型转换(Casting)。
Rust 泛型则不一样,它和C++的模版是一样的,会根据你传入的具体类型(比如 int/i32 或 float/f64),生成多份不同的机器码。这样做的好处就是执行效率极高,没有 Java/Go 那种装箱(Boxing)或运行时类型断言的开销。
但是相对的,Rust 又比 C++ 模版又要使用上要舒服很多,Rust 泛型因为 Trait Bounds 的原因,所以编译器在定义阶段就能发现错误,而不是等到调用阶段。假设我们要写一个打印函数:
C++:
template <typename T>
void print_it(T value) {
// 编译器在这里不检查 value 到底有没有 .print() 方法
// 直到你在 main 函数里调用 print_it(int) 时,它才发现 int 没有 .print()
value.print();
}
Rust:
// 必须显式约束:T 必须实现了 Debug trait
fn print_it<T: std::fmt::Debug>(value: T) {
println!("{:?}", value);
}
// 如果你写成下面这样,编译器直接报错,甚至不需要你调用它:
// fn print_it<T>(value: T) {
// println!("{:?}", value); // 错误!编译器说:T 没有实现 Debug,不能打印
// }
Rust 的特征对象 (dyn Trait) ≈ C++ 的虚基类指针 (Base) ≈ Go 的 interface。
在 Rust 中,如果你想编写一个函数,或者定义一个容器(如 Vec),让它可以接受多种不同类型的数据,只要这些数据实现了同一个 Trait,你有两种选择:
fn foo<T: Draw>(x: T)
Vec 里同时存 u8 和 f64,因为 Vec 只能存一种类型。fn foo(x: &dyn Draw)
u8 还是 f64,它只关心“这东西能 Draw”。Vec<Box<dyn Draw>> 里混存 u8 和 f64。特征对象(Trait Objects)实现的方式其实和 C++ 的虚表实现很像,比如当你把一个具体类型(如 &u8)转换成特征对象(&dyn Draw)时,Rust 会生成一个胖指针(Fat Pointer)。
这个胖指针包含两部分(占用 16 字节):
data 指针:指向具体的数据(如堆上的 u8 值)。vtable 指针:指向该具体类型针对该 Trait 的虚函数表(Virtual Method Table)。当你调用 x.draw() 时,如果 x 是特征对象,机器码执行的逻辑如下:
读取 vtable:从胖指针的第二个字段找到 vtable 的地址。
查找方法:在 vtable 中找到 draw 方法对应的函数指针(比如偏移量为 0 的位置)。
跳转执行:调用该函数指针,并将胖指针的第一个字段(data 指针)作为 self 传进去。
举个例子,如果我们使用泛型来实现下面的 Screen 类,里面的 components 想要放多个元素:
pub struct Screen<T: Draw> {
pub components: Vec<T>,
}
如果你这样写,Screen 实例被创建时,T 必须被确定为某一种具体的类型。这意味着 components 里的所有元素都必须是同一个类型。如果想要混装是不行的,如下面:
let screen = Screen {
components: vec![
10u8, // 编译器推断 T 是 u8
3.14f64, // 报错!期望是 u8,但你给了 f64
],
};
// error[E0308]: mismatched types
只能全部都是同一类型:
let screen = Screen {
components: vec![10u8, 20u8, 30u8], // 全是 u8,没问题
};
如果使用特征对象(Trait Objects),就可以实现混装,在列表中存储多种不同类型的实例。
pub struct Screen {
pub components: Vec<Box<dyn Draw>>,
}
这样是 ok 的:
let v: Vec<Box<dyn Draw>> = vec![
Box::new(10u8),
Box::new(3.14f64),
];
生命周期就是控制有效作用域,防止 悬垂引用(Dangling Reference)的 。
fn main() {
let r; // ---------+-- r 的生命周期开始
{ // |
let x = 5; // -+-- x 的生命周期开始
r = &x; // | 试图让 r 指向 x
} // -+-- x 在这里死了(被释放)!
// |
println!("r: {}", r); // | r 依然活着,但它指向的 x 已经没了!
// | -> 报错:借用了活得不够久的值
} // ---------+
比如这段代码,Rust 编译器会拒绝编译,因为它发现 r 活得比 x 久,为了安全考虑,然后就拒绝编译。
用过 C++ 的同学知道, 在线上经常会因为这种问题而导致程序的 panic,因为C++ 编译器通常会“相信”你,比如下面的例子编译器根本不会管你:
#include <iostream>
#include <string>
// C++ 代码
const std::string& get_dangling() {
std::string s = "Hello";
return s; // 危险!s 在这里会被销毁
}
int main() {
// 这里拿到了一个引用,指向了一块已经被释放的栈内存
const std::string& ref = get_dangling();
// 运行时表现:
// 1. 可能崩溃 (Segmentation Fault)
// 2. 可能打印出乱码
// 3. 可能打印出 "Hello" (如果在内存被覆盖前运气好) -> 这是最可怕的“未定义行为”
std::cout << ref << std::endl;
}
所以相对而言,Rust 编译器的严格管控,实际上是”为了你好“。
Rust 为了实现这种严格管控,就出现了生命周期标注这种东西。生命周期不是“运行时的计时器”,也不是你手动管理内存的东西。它是编译器做静态分析时用的标记/约束,表示:
你写的 'a 这种符号,就是一种生命周期参数。大多数时候,Rust 编译器能自动推断生命周期,但在一些模糊的情况下,就需要手动标注。手动标注是以 ' 开头,名称往往是一个单独的小写字母,大多数人都用 'a 来作为生命周期的名称。
关于什么时候需要使用标注其实就一句话:输出的引用到底是从哪个输入里借来的有歧义的时候。具体来说,主要有以下 4 种场景
场景 1:结构体中包含引用
只要你的 struct 定义里包含字段是引用(而不是像 String 或 i32 这样的拥有所有权的类型),你就必须给整个结构体加上生命周期参数。
// 报错:missing lifetime specifier
struct Book {
author: &str, // 这是一个引用,编译器慌了:这个引用指向谁?能活多久?
}
// 正确
// 读作:Book 实例活多久,'a 就得活多久;author 引用的数据至少也要活 'a 这么久。
struct Book<'a> {
author: &'a str,
}
场景 2:函数有多个引用参数,且返回值也是引用
如果你有两个输入引用,且返回一个引用。编译器就蒙了:“返回的这个引用,是借用了参数 A,还是参数 B?”
// 报错
// 编译器困惑:返回的 &str 到底是谁的?
// 如果 user 活 10s,data 活 5s,我该让返回值活多久?
fn choose_one(user: &str, data: &str) -> &str {
if user.len() > 5 { user } else { data }
}
// 正确
// 显式告诉编译器:返回值的生命周期取 user 和 data 中较短的那个('a)
fn choose_one<'a>(user: &'a str, data: &'a str) -> &'a str {
if user.len() > 5 { user } else { data }
}
如果返回值只跟其中一个参数有关,只标那个参数就行:
// 这里的 'a 表示返回值只和 x 有关,和 y 无关
fn verify<'a>(x: &'a str, y: &str) -> &'a str {
x
}
当然,如果只有一个输入引用那就没有歧义:
fn capitalize(s: &str) -> &str { ... }
// 编译器自动脑补为:
// fn capitalize<'a>(s: &'a str) -> &'a str { ... }
场景 3:在 impl 块中实现方法时
这是语法要求,防止你忘记这个结构体是有“保质期”的。
struct Book<'a> {
author: &'a str,
}
// 必须写 impl<'a>,把 'a 声明出来
impl<'a> Book<'a> {
// 这里的 &self 其实隐含了生命周期
fn get_author(&self) -> &str {
self.author
}
}
注意:在方法内部,通常不需要给参数标生命周期,因为 Rust 有一条强大的规则:“如果是方法(有 &self),那么返回值的生命周期默认和 self 一样。”