MoreRSS

site iconChaofa Yuan修改

大模型算法工程师, 写过《LLMs-Zero-to-Hero》
请复制 RSS 到你的阅读器,或快速订阅到 :

Inoreader Feedly Follow Feedbin Local Reader

Chaofa Yuan的 RSS 预览

2025-09-合法赚钱就是高尚的(月度小结)

2025-10-05 06:18:00

合法赚钱的高尚性

这个词还是来源于孙宇晨的财富自由革命之路,我真的被孙宇晨圈粉了,甚至可以说部分行为都受其影响。

我比较想提的一点是「合法赚钱的高尚性」,音频中提到大多数国人都有一个赚钱的羞耻心,当然我也有。首先需要科普一下什么叫做合法赚钱的高尚性。

  • 财富即贡献
    • 在市场经济中,一个人能合法赚钱财富,本身意味着其产品或服务得到了市场广泛的认可,从而间接证明了对他人需求和社会的贡献。
    • 依法交税本身对于社会就是一个重要贡献。很多企业家贡献了大量的税收,通过纳税、慈善和提供就业支撑起了部分社会的运转。
    • 如果道理无法说服一个人,换一个地方交税本身就是最大的投票,这也就是所谓的"现实的教训比道德说教更有效"。
  • 财富即意味着个人自由
    • 财富自由是实现人格独立自由的先决条件。财富可以让人摆脱为生存而做出的价值观妥协(如时间、注意力、情感甚至尊严),从而更纯粹地追求理想。
    • 通过个人奋斗获得成功、突破阶层板结,是这个世界上最体面、最值得骄傲的事情
    • 这也许是我们所有人毕生追求的目标,让自己的注意力分配到更有价值的事情上面,实现注意力自由。

我个人绝不是一个社会达尔文主义支持者,也不是优绩主义的支持者,但孙宇晨提出的合法赚钱是高尚的却击中了我。我们从小受到的教育是“只有不求汇报才是高尚的”,以至于很多人忽略了商业逻辑,只是一味的认为:

  • 满嘴谈钱是腐朽不堪的,那些企业家都是黑心的,不然赚不到那么多钱。
  • 看轻赚钱的困难程度,认为只要我怎么样就可以赚到钱。

而事实上要赚钱真的很难,无论是工作还是投资,都必须付出大量精力和努力才有可能有所回报。就以我自己做视频为例:一开始我是存粹的知识分享,但是后面无论是从频道未来发展,还是个人收益层面看,都需要有一定的商业化才能更健康可持续。

因此我个人思考逻辑也有了一定的变化,这里说两个比较好玩的例子:

case 1: chaofa 做视频初衷变了吗?

我在 25 年 5 月份的月度总结中提到同样的金钱的有不同的分量,那时候的我根本没想靠这个赚钱,说话非常的硬气,甚至前几天(2025-09-29)有一个粉丝朋友给我发消息说「我的文章有一些追求和人生理想」,我感动之余又变得有一丝惭愧。

image.png

原因是什么呢?

我最近两个视频都是广告,一个是显示器,一个是沉浸式翻译。其中显示器广告是因为我想要换一个显示器,而沉浸式翻译虽然是我自己就想推荐的东西,说得也都是我自己想说的(甲方几乎没有任何要求),但在外人看人总归是不客观的。

但广告可能都是其次,核心原因是我在 Q3 期间几乎没有输入任何技术相关的内容,看起来我像是只是为了赚钱做视频。

出现这个问题的原因有很多:

  • 我听了孙宇晨的课程之后觉得我应该进行商业化
    • 赚钱本身是很难的,只有尝试之后才知道要走通一次商业合作闭环才知道其他的艰难。
    • 和我前面做课程付出的时间相比,这个收益虽然不值一提,但是这是我应得的合法回报。
  • Q3 的工作实在是太忙了,根本没有时间学习输入,更不用说输出内容
    • 7 月份我女儿出生,忙前忙后
    • 8 9 月份因为项目太着急,经常出差,基本每天干到 12 点多[1]

回到小标题:chaofa 做视频的初衷变了吗?

我觉得答案是:暂时还没有变,但是等完成目标之后一定会变化的

那么我做视频的初衷是什么呢?

去年10 月份我刚开始做视频的时候,写了一篇文章普通人从零开始做公开表达的增长策略,提到,出发点主要有两个:

  • 满足自己的虚荣心
    • 原文:做这些东西本质上是为了满足自己的虚荣心,得到他人的关注和肯定
    • 例子:比如 llama_factory 的作者说我的文章写的不错,并且在要离职做事的时候主动提出要加我微信,这都是让我感到非常有成就的事情。
  • 表达能力的练习
    • 原文: 我的目标是让更多人看到我,90% 的精力是工作,10% 写书,做视频是一种表达和总结练习。
    • 例子:今年说话口癖少了一些,虽然现在还是不太好,但比以前好我已经很满足了。

所以说等我写完书[2]之后,我一定会更多的考虑赚钱的事情,合法赚钱本身就是高尚的。这也是我听课之后思维的变化,所以孙宇晨真的牛逼,值得学习。

case 2: 赚钱真的很难—粉丝上涨意味着收入增加吗?

相对于工作,做自媒体肯定是不赚钱的,这也是我为什么在这个上面投入的时间非常的少,因此一定是工作为主,做视频依然要保持图一乐心态。

为什么我说赚钱很难?有两点

  • 专业性限制了视频传播性。现在公开表达的人越来越多,粉丝数量不代表播放数据,因此甲方不会因为视频的专业性就投放你,反而会因为数据或者报价 pass 你。
  • 粉丝一样有「又怕兄弟苦、又怕兄弟开路虎」。因为大多粉丝朋友不知道中小 UP 真实收入,所以粉丝数上升之后,就会觉得 UP 赚钱了,只要视频有一点瑕疵可能就不点赞支持了。
    • 这里可以说一下例子:我在 3k 粉丝的时候,还有不少人给我打赏(原来我还做了一个表达,大概有 3/400);现在 30k 粉丝,就几乎没人给我打赏。(几乎可以说,粉丝上万之后,就没人打赏了)。

2025 年,还有遍地黄金的机会吗?我想应该是没有了。

Q4 的规划

  • 月度总结我会继续写下去的 (同步更新于公众号))
    • 虽然这些东西会让很多人很烦,如果有骚扰到可以屏蔽;但是我想说,我个人真的很爱看这些东西,比如
      • 我今天(2025-10-04)刚发现的一个 UP 主——久远寺千歳,我把他所有总结视频都看了,我真的很佩服这种人。
      • 还有我很喜欢的 blog 作者们——微扰理论hawstein,以及各种心灵按摩类的播客,比如孟岩的无人知晓、少楠的奇想驿、厚望的面基等,当然还有很多很多...
    • 这些东西总能给我力量,我也想把生活、工作中的迷茫写下来,希望能把这种力量传递下去,所以我会继续写下去

image.png

  • 工作还要坚持努力干,但是 Q4 可预期的压力就很大了,以至于我写这篇 blog 的时候就感觉到非常的痛苦,我真的没有信心完成 Q4 定的 OKR。
    • 让人幸福的工作一定是存在的,只是我们需要更多的思考
    • Q4 的 Chaofa 请一定「不要用战术上勤奋,掩盖战略上的懒惰」
      • I need more time to think about my life.
  • 视频,也许会月更视频,「动手学习大模型」的系列我还是会坚持更新完的。
  • 写书,我要开始了。等我!

最后

最后欢迎来探讨对世界的认知,基本全网同名 chaofa用代码打点酱油 (推荐)


  1. 一般 10 点多下班后回家接着干到 12 点多,真的觉得干不完,压力很大。 ↩︎

  2. 写书虽然不赚钱,但真的很酷,一直想有一本自己的出版物。 ↩︎

RAG 进化之路:传统 RAG 到工具与强化学习双轮驱动的 Agentic RAG

2025-10-03 19:53:20

1. 阅读收获 (takeaway)

本文旨在祛魅【Agentic RAG】的概念,因此本文的阅读收获包括:

2. 前言

如果说 2024 年,LLM(Large Language Model) 落地最广泛且最有实用价值的一项技术,那么我提名 RAG(Retrieval Augmented Generation) 应该不会有太多的反对。但 2025 年最火的概念变成 Agent,而 RAG 似乎变成了一个基础组件,提的不多却是融合到了 Agent 的日常使用中了,尤其是 OpenAI DeepResearch 的出现,让 Agentic RAG 成了 2025 年最成功的 RAG 应用之一。

但网络上有很多文章,把 Agentic RAG 说得玄乎,故意制造难懂的概念从而达到抬高自身的目的。但实际上我们只需要理清楚两个概念,就可以知道什么是 Agentic RAG。

  • 传统 RAG 是什么?
    • 预先通过检索排序将知识放到 Prompt 中,然后利用 LLM 生成回复
  • Agent 是什么?
    • 使用具有自主决策能力的 Agent 实现的 RAG 系统就可以称为 Agentic RAG。 因此 Agentic RAG 实际上就是指在传统 RAG 基础上,加入了 Agent 组件的 RAG 系统,任何实现了 Agentic Search 能力的 RAG 系统都可以称为 Agentic RAG

3. 传统 RAG (Native RAG)

传统的 RAG(Native RAG)并不是一个复杂的概念,核心概念就两个:检索(Retrieval)和生成(生成)。因此要做好 RAG 就是两件事情:

  • 怎么检索到更有用的知识?
  • 怎么让模型更好的利用知识生成回复?

因此 RAG 系统架构可以如下图所示:

image.png

NATIVE RAG一般来说可以分成两个不同的链路:离线和在线。具体的代码可以参考:动手学习大模型-中文版-第八章-native-rag 源代码

requires-python = ">=3.12"
dependencies = [
    "langchain>=0.3.27",
    "langchain-chroma>=0.2.6",
    "langchain-community>=0.3.30",
    "langchain-deepseek>=0.1.4",
    "langchain-openai>=0.3.34",
    "langgraph>=0.6.8",
]

3.1 RAG 离线入库

离线入库是指将文档处理成向量并存储到向量数据库中,以便后续检索使用。这个过程主要包括:文档加载、文本切分、向量化、存储。

from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma

# 1. 加载文档
loader = TextLoader("knowledge_base.txt")
documents = loader.load()

# 2. 文本切分
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,  # 每个文本块的大小
    chunk_overlap=50,  # 文本块之间的重叠部分
)
splits = text_splitter.split_documents(documents)

# 3. 向量化并存储
embeddings = OpenAIEmbeddings(
    base_url="https://api.siliconflow.cn/v1",
    model="Qwen/Qwen3-Embedding-0.6B",
)
vectorstore = Chroma.from_documents(
    documents=splits,
    embedding=embeddings,
    persist_directory="./chroma_db",  # 持久化存储路径
)

print(f"成功将 {len(splits)} 个文本块存入向量数据库")

3.2 RAG 在线应用

在线应用是指用户提问时,系统检索相关文档并生成回答的过程。主要包括:用户查询、检索相关文档、构建提示词、LLM 生成回答。

from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate

# 1. 加载已有的向量数据库
embeddings = OpenAIEmbeddings(
    base_url="https://api.siliconflow.cn/v1",
    model="Qwen/Qwen3-Embedding-0.6B",
)
vectorstore = Chroma(persist_directory="./chroma_db", embedding_function=embeddings)

# 2. 用户提问
query = "什么是RAG?"

# 3. 检索相关文档(返回最相关的 3 个)
docs = vectorstore.similarity_search(query, k=3)

# 4. 将检索到的文档内容拼接成上下文
context = "\n\n".join([doc.page_content for doc in docs])

# 5. 构建 Prompt 模板
prompt_template = """
你是一个专业的问答助手。请根据以下参考文档回答用户的问题。
如果参考文档中没有相关信息,请诚实地说不知道,不要编造答案。

参考文档:
{context}

用户问题:{question}

回答:
"""

prompt = PromptTemplate(
    template=prompt_template,
    input_variables=["context", "question"],
)

# 6. 创建 LLM 并生成回答
llm = ChatOpenAI(
    model="THUDM/glm-4-9b-chat",
    temperature=0,
    max_retries=3,
    base_url="https://api.siliconflow.cn/v1",
)
final_prompt = prompt.format(context=context, question=query)

print(f"最终的 Prompt 内容:{final_prompt}")
response = llm.predict(final_prompt)

# 7. 输出结果
print(f"问题: {query}")
print(f"回答: {response}")
print(f"\n参考文档数量: {len(docs)}")

4. Agentic RAG

4.1 Native RAG 有哪些不够好的地方?

  • 一次性流水线:通常“检索→拼接→生成”一步到位,没有让模型根据需要调整检索策略、递进式地钻研文档。
  • 缺乏任务拆解:问题可能需要先定位文件、再选片段、再比对与总结;Native RAG 往往缺少这样的多步拆解能力。
  • 工具编排不足:只会相似度检索,不会进一步查看文件元数据、选择需要阅读的 chunk,更不会在不够时换一种检索或补充查询。
  • 证据利用浅:Top-K 拼接容易“糊”上下文,无法进行“先粗后细”的证据收集(coarse→fine),也不容易明确引用到具体片段。
  • 适应性差:面对多跳问题(multi-hop)或信息不足的场景,通常不会回溯重试、改写查询、换路子继续找。

4.2 什么是 Agentic RAG?

Agentic RAG的核心“不是更复杂的模型”,而是“让模型学会做事”。和一次性把文档塞进 Prompt 就生成答案的 Native RAG 相比,Agentic RAG 让大模型扮演一个“决策-执行”的控制器:先制定策略,再调用工具逐步收集证据,最后基于证据作答并给出引用

所以说:模型通过自主决策实现的 RAG 过程,我们就可以称之为 Agentic RAG。无论这个过程是发现在离线入库阶段(当然 Agentic RAG 其实可以不严格区分 offline/online 截断,都可以让 Agent 自主决策),还是 RAG 生成阶段的 search query rewritererank 还是 dynamic search等,只要有模型的自主决策过程,那么就可以称为 Agentic RAG。具体的形式可以参考 Agentic RAG 流程图(将 search 能力变成一个工具,模型可以根据需要调用):

image.png|700x264

  • 让 LLM 作为“智能体(Agent)”充当控制器,结合一组工具(检索、查看元数据、读取片段等)执行“思考→行动→观察”的循环(Reason–Act–Observe)。
  • 在回答之前,按需多轮调用工具,逐步从“找到相关文件”走到“读取关键片段”,最后基于被读取的证据组织答案,并给出引用。

给模型更多的自主决策空间、配备合适的工具,LLM 会给你出乎意料的智能。 好处:更强的适应性(可改写查询/追加搜索)、更深的证据利用(读到再答)、更可归因(引用具体来源)。

如果想了解更多的 Agentic RAG的工业级别的实现,我觉得可以参考「开源项目 chatbox」的实现,该项目是一个比较早的 LLM Chat 集成的项目,并且算是比较早的实现了 Agentic RAG。因为作为一个离线的 LLM chat 项目,对于时延等问题可以有更少的考虑,从而更激进的、更早阶段将 naive chat 变成 Agentic Chat

4.3 基于提示词和工具的 Agentic RAG

ReAct 是一个常见的 Agent 实现方式,因此只要给 LLM 配备合适的 Tool以及适当的引导 Prompt,就可以将一个 Native RAG 转换成 Agentic RAG。这里我通过解读 36.8k star开源企业级项目——chatbox来讲解一个 Agentic RAG 是怎么实现的,以及为什么它在复杂场景下效果好[^1]。

下面是 Chatbox 的整体流程图,可以分为两个部分,左半部分是 Agentic RAG,右半部分是介于 Native RAGAgentic RAG之间的 Native RAG

image.png|700x889

因此我们重点来解读 chatbox 到底是怎么设置工具,来实现更好的 Agentic Search,然后再给出最小示例代码:

包括 Anthropic 的 context engineering 文章中也提到了Agentic Seach 对于 Agent 应用是非常重要的。

  • query_knowledge_base
    • 在知识库中进行语义搜索,快速找到候选文件或片段的“线索”。通常作为最基础的检索工具
  • get_files_meta
    • 查看候选文件的元信息(如文件名、大小、chunk 数量),帮助模型决定“读哪几个文件的哪部分”。
  • read_file_chunks
    • 按文件 ID + chunkIndex 精读具体片段,用于“取证”。建议一次只读少量最相关的 chunk,以降低噪声。
  • list_files
    • 列出知识库中的文件清单,作为兜底浏览或当搜索线索不充分时的探索手段。

4.3.1 Agentic RAG 样例

这里我想通过一个例子让读者理解什么是 Agentic RAG

LLM MOE的进化之路,从普通简化 MOE,到 sparse moe,再到 deepseek 使用的 share_expert sparse moe

2025-01-28 03:30:00

1. 阅读前提

本次课一共讲解三个不同版本的 MOE,分别是基础版MOE,大模型训练用的 SparseMoE,还有 DeepSeek 用的比较多的 shared_expert 的 SparseMoE。

2. 版本1:基础版本MOE

输入是一个 Token, 输出是一个 Token Embedding。暂时先不考虑 MOE 得到的 Embedding 怎么使用。

因为 MOE 网络对应着 Expert,这个 Expert 一般是一个 FeadFoward Network,FFN。而为了简化,后续我们都用一层的 Linear 代替,更高级版本的 Expert 留给大家当做课后作业。下面是一个专家的定义。

class BasicExpert(nn.Module):
    # 一个 Expert 可以是一个最简单的, linear 层即可
    # 也可以是 MLP 层
    # 也可以是 更复杂的 MLP 层(active function 设置为 swiglu)
    def __init__(self, feature_in, feature_out):
        super().__init__()
        self.linear = nn.Linear(feature_in, feature_out)
    
    def forward(self, x):
        return self.linear(x)

基础版本的 MOE 可以看这个图,非常的简单。

llms-zero-to-hero-basic-moe-model


class BasicMOE(nn.Module):
    def __init__(self, feature_in, feature_out, expert_number):
        super().__init__()
        self.experts = nn.ModuleList(
            [
                BasicExpert(feature_in, feature_out) for _ in range(expert_number)
            ]
        )
        # gate 就是选一个 expert 
        self.gate = nn.Linear(feature_in, expert_number)
    
    def forward(self, x):
        # x 的 shape 是 (batch, feature_in)
        expert_weight = self.gate(x)  # shape 是 (batch, expert_number)
        expert_out_list = [
            expert(x).unsqueeze(1) for expert in self.experts
        ]  # 里面每一个元素的 shape 是: (batch, ) ??

        # concat 起来 (batch, expert_number, feature_out)
        expert_output = torch.cat(expert_out_list, dim=1)

        # print(expert_output.size())

        expert_weight = expert_weight.unsqueeze(1) # (batch, 1, expert_nuber)

        # expert_weight * expert_out_list
        output = expert_weight @ expert_output  # (batch, 1, feature_out)
        
        return output.squeeze()


def test_basic_moe():
    x = torch.rand(2, 4)

    basic_moe = BasicMOE(4, 3, 2)
    out = basic_moe(x)
    print(out)


test_basic_moe()

2. 版本2:SparseMoE (大模型训练使用)

这个一般我们用 switch transformers 这篇文章的图作为演示,详情看:

llms-zero-to-hero-switch-transformers-moe-model

和 Basic 区别是,MOE 选择 topK 个专家,然后对这 topK 个专家的输出进行加权求和,并且把输入样本变成了大模型中真实的输入 Shape,(batch, seq_len, hidden_dim)


# 主要参考自 mistral MOE 的实现

class MOERouter(nn.Module):
    def __init__(self, hidden_dim, expert_number, top_k):
        super().__init__()
        self.gate = nn.Linear(hidden_dim, expert_number)
        self.expert_number = expert_number
        self.top_k = top_k
    
    def forward(self, hidden_states):
        # 计算路由logits
        router_logits = self.gate(hidden_states)  # shape is (b * s, expert_number)
        
        # 计算专家经过softmax之后的概率
        routing_probs = F.softmax(router_logits, dim=-1, dtype=torch.float)
        
        # 计算topk的专家的输出
        router_weights, selected_experts = torch.topk(
            routing_probs, self.top_k, dim=-1
        )  # shape都是 (b * s, top_k)
        
        # 专家权重归一化
        router_weights = router_weights / router_weights.sum(dim=-1, keepdim=True)
        router_weights = router_weights.to(hidden_states.dtype)
        
        # 生成专家掩码
        expert_mask = F.one_hot(
            selected_experts,
            num_classes=self.expert_number
        )  # shape是 (b * s, top_k, expert_number)
        expert_mask = expert_mask.permute(2, 1, 0)  # (expert_number, top_k, b * s)
        
        return router_logits, router_weights, selected_experts, expert_mask


class MOEConfig:
    def __init__(
            self, 
            hidden_dim, 
            expert_number, 
            top_k, 
            shared_experts_number=2,
        ):
        self.hidden_dim = hidden_dim
        self.expert_number = expert_number
        self.top_k = top_k
        self.shared_experts_number = shared_experts_number

class SparseMOE(nn.Module):
    # 稀疏 MOE 模型,这里每一个 token 都会过 topk 个专家,得到对应token 的 hidden_embeddings
    def __init__(self, config):
        super().__init__()

        self.hidden_dim = config.hidden_dim

        self.expert_number = config.expert_number
        self.top_k = config.top_k

        self.experts = nn.ModuleList(
            [
                BasicExpert(self.hidden_dim, self.hidden_dim) for _ in range(self.expert_number)
            ]
        )

        self.router = MOERouter(self.hidden_dim, self.expert_number, self.top_k)
    
    def forward(self, x):
        # x shape is (b, s, hidden_dim)
        batch_size, seq_len, hidden_dim = x.size()

        # 合并前两个维度,因为不是 Sample 维度了,而是 token 维度
        hidden_states = x.view(-1, hidden_dim) # shape is(b * s, hidden_dim)

        router_logits, router_weights, selected_experts_indices, expert_mask = self.router(hidden_states)
        # 其中 selected_experts_indices shape 是 (b * s, top_k)
        # 其中 expert_mask shape 是 (expert_number, top_k, b * s)
        
        final_hidden_states = torch.zeros(
            (batch_size * seq_len, hidden_dim),
            dtype=hidden_states.dtype,
            device=hidden_states.device
        )

        for expert_idx in range(self.expert_number):
            expert_layer = self.experts[expert_idx]
            # expert_mask[expert_idx] shape 是 (top_k, b * s)
            idx, top_x = torch.where(expert_mask[expert_idx]) 
            # idx 和 top_x 都是一维 tensor
            # idx 的值是 0 或 1, 表示这个 token 是作为当前专家的 top1 还是 top2
            # top_x 的值是 token 在 batch*seq_len 中的位置索引
            # 例如对于 batch_size=2, seq_len=4 的输入:
            # top_x 的值范围是 0-7, 表示在展平后的 8 个 token 中的位置
            # idx 的值是 0/1, 表示这个 token 把当前专家作为其 top1/top2 专家

            # hidden_states 的 shape 是 (b * s, hidden_dim)
            # 需要取到 top_x 对应的 hidden_states
            current_state = hidden_states.unsqueeze(
                0
            )[:, top_x, :].reshape(-1, hidden_dim) # (selected_token_number, hidden_dim)

            # router_weight 的 shape 是 (b * s, top_k)
            current_hidden_states = expert_layer(
                current_state
            ) * router_weights[top_x, idx].unsqueeze(-1)  # (selected_token_number, 1) 这里有广播

            # 把当前专家的输出加到 final_hidden_states 中
            # 方式1 的写法性能更好,并且方式1容易出现
            final_hidden_states.index_add_(0, top_x, current_hidden_states.to(hidden_states.dtype))
            # 方式2
            # final_hidden_states[top_x] += current_hidden_states.to(hidden_states.dtype)
            # 方式2 的写法性能更差,并且方式2容易出现错误,+= 操作在处理重复索引时需要多次读写内存,可能会导致竞争条件

        # 把 final_hidden_states 还原到原来的 shape
        final_hidden_states = final_hidden_states.reshape(batch_size, seq_len, hidden_dim)

        return final_hidden_states, router_logits # shape 是 (b * s, expert_number)


def test_token_level_moe():
    x = torch.rand(2, 4, 16)
    config = MOEConfig(16, 2, 2)
    token_level_moe = SparseMOE(config)
    out = token_level_moe(x)
    print(out[0].shape, out[1].shape)


test_token_level_moe()

3. 版本3:ShareExpert SparseMoE (deepseek 版本)

备注:这里是参考 deepseek moe 思想,写的一个共享 expert 的 MOE 网络,有一定的简化,但是可以方便理解训练过程。

和 版本2 的 SparseMOE 区别是,这里多了一个 shared experts 的模型,这个模型是所有 token 共享的,也就是说,所有 token 都过这个 shared experts 模型,然后每个 token 会用计算的 Router 权重,来选择 topK 个专家,然后和共享的专家的输出一起加权求和。

具体结构图为:

llms-zero-to-hero-deepseek-v3-model-architecture

class ShareExpertMOE(nn.Module):
    def __init__(self, config):
        super().__init__()

        self.moe_model = SparseMOE(config)
        self.shared_experts = nn.ModuleList(
            [
                BasicExpert(
                    config.hidden_dim, config.hidden_dim
                ) for _ in range(config.shared_experts_number)
            ]
        )

    def forward(self, x):
        # x shape 是 (b, s, hidden_dim)
        # 首先过 moe 模型
        sparse_moe_out, router_logits = self.moe_model(x)
        
        # 针对的还是 x 的每一个 
        # 然后过 shared experts
        shared_experts_out = [
            expert(x) for expert in self.shared_experts
        ] # 每一个 expert 的输出 shape 是 (b, s, hidden_dim)
        
        shared_experts_out = torch.stack(
            shared_experts_out, dim=0
        ).sum(dim=0)
        
        # 把 sparse_moe_out 和 shared_experts_out 加起来
        return sparse_moe_out + shared_experts_out, router_logits


def test_share_expert_moe():
    x = torch.rand(2, 4, 16)
    config = MOEConfig(16, 2, 2)
    share_expert_moe = ShareExpertMOE(config)
    out = share_expert_moe(x)
    print(out[0].shape, out[1].shape)


test_share_expert_moe()

4. 模型训练测试

用于测试上面的代码是否可以跑通?


def switch_load_balancing_loss(router_logits: torch.Tensor, num_experts: int) -> torch.Tensor:
    """
    计算 Switch Transformers 的负载均衡损失
    
    Args:
        router_logits: shape [batch_size * sequence_length, num_experts]
        num_experts: 专家数量
    
    Returns:
        total_loss: 总损失 = auxiliary_loss + z_loss
    """
    # 计算路由概率
    router_probs = torch.softmax(router_logits, dim=-1)  # [b*s, num_experts]
    
    # 获取每个token的最优专家
    _, selected_experts = torch.topk(router_probs, k=2, dim=-1) 
    
    # 创建one-hot矩阵表示选中的专家
    mask = torch.nn.functional.one_hot(selected_experts, num_experts).float() 
    
    # 计算每个专家的期望负载 (理想情况下应该是 1/num_experts)
    expected_load = torch.ones_like(router_probs) / num_experts
    
    # 计算实际负载 (每个专家处理的token数量除以总token数量)
    # 在batch维度上计算平均值
    actual_load = mask.mean(dim=0)
    
    # 计算auxiliary loss
    # 这会惩罚负载分布与期望负载的差异
    aux_loss = torch.sum(actual_load * router_probs.mean(dim=0)) * num_experts
    
    # 计算z_loss (可选)
    # 这会惩罚过大的路由logits
    z_loss = torch.mean(torch.square(router_logits))
    z_loss_weight = 0.001  # 可调整的超参数
    
    # 总损失
    total_loss = aux_loss + z_loss * z_loss_weight
    
    return total_loss

def test_moe_training():
    # Create a simple dataset
    batch_size = 32
    seq_len = 16
    hidden_dim = 32
    num_batches = 100
    
    # Initialize model and optimizer
    config = MOEConfig(hidden_dim=hidden_dim, 
                      expert_number=4,
                      top_k=2,
                      shared_experts_number=2)
    model = ShareExpertMOE(config)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    
    # Training loop
    model.train()
    for batch in range(num_batches):
        # Generate random input data
        x = torch.randn(batch_size, seq_len, hidden_dim)
        target = torch.randn(batch_size, seq_len, hidden_dim)
        
        # Forward pass
        output, router_logits = model(x)

        # Compute losses
        # MSE loss for prediction
        mse_loss = F.mse_loss(output, target)
        
        aux_loss = switch_load_balancing_loss(router_logits, config.expert_number)
        # Combined loss
        total_loss = mse_loss + 0.01 * aux_loss
        
        # Backward pass and optimize
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()
        
        if batch % 10 == 0:
            print(f"Batch {batch}, Loss: {total_loss.item():.4f} "
                  f"(MSE: {mse_loss.item():.4f}, Aux: {aux_loss.item():.4f})")

# Run the training test
test_moe_training()

5. 课后作业

  1. 把 expert 改成 swishGLU 版本的 FFN 专家
  2. 把 MOE 应用到上一次的 build_nanoGPT 中,也就是替换掉原来的 FFN层,注意这里负载均衡 loss 要包含每一层的 MOE 的 router_logits
  3. 自己问一下 GPT topK 是怎么实现的反向传播,了解反向传播的梯度怎么流转的?

交个朋友🤣

最后欢迎关注我,基本全网同名 chaofa用代码打点酱油

LLM activate function激活函数的进化之路,从 ReLU,GELU 到 SwiGLU(swishGLU)

2025-01-28 02:58:00

1. 背景

自 chatGPT 22年底问世以来,大模型(Large Language Model, LLM)一般使用 Causal Language Model 的形式,属于 Transformers 中的 Decoder 部分,其中在 Decoder 的 Block 中有一个 FFN(FeadForward) 层,一般认为这部分参数用于存储知识。而标准的 FFN 一般有一个升维度和降维度的过程,一共有两个权重矩阵,用公式表示为

FFN(x)=ReLU(xW1+b1)W2+b2(1) FFN(x) = ReLU(xW_1 + b1)W2 + b2 \tag{1}

其中 x shape 是 (b,s,h)(b, s, h),w1 shape 是 (h,4h)(h, 4h),w2 shape 是 (4h,h)(4h, h), w1 是升维(up),w2 是降维(down)

激活函数主要是为了实现神经网络学习输入和输出之间的复杂非线性关系而使用的一个函数。在公式 (1) 中,ReLU 是一个激活函数(Transfromers原版),可以替换成其他的激活函数,比如 BERT 开始用 Gaussian Error Linear Unit,GELU 比较多,随后就成了激活函数的主流选择,但是随着大模型的爆火以及 PaLM 模型的发布,大家开始慢慢使用 swishGLU 作为激活函数,并且作为一个主要的优化点。

具体可以看下面一段代码即可清楚的理解 FFN 模型是什么实现的。

class FeedForward(nn.Module):
    # 实际上就是 MLP
    def __init__(self, config):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(config.n_embd, 4 * config.n_embd),
             # 激活函数
             nn.ReLU(),  
             #  可以替换成 nn.GELU(),  
             #  但是 如果是 SwishGLU 则实现方式有所不同,接下来就会介绍 swishGLU 是怎么实现的
            nn.Linear(4 * config.n_embd, config.n_embd),
            nn.Dropout(config.dropout)
        )
    
    def forward(self, x):
        return self.net(x)

2. 升级之路

1. ReLU

ReLU 深度学习以来最常用的激活函数,其公式非常的简单。

ReLU(x)=max(0,x)(2) ReLU(x) = max(0, x) \tag{2}

2. GELU

从 GPT、BERT 以来,GELU 似乎成了新时代取代 ReLU 的激活函数,具体形式如下:

GELU(x)=xP(Xx)=xΦ(x)(3) GELU(x) = x P(X \le x) = x \Phi(x) \tag{3}

其中 Φ(x)\Phi(x) 是标准正态分布的累计分布函数,定义为

Φ(x)=12(1+erf(x2))(4) \Phi(x) = \frac{1}{2}(1 + erf(\frac{x}{\sqrt{2}})) \tag{4}

这里的 erf 是误差函数

erf(x)=2π0xet2dt(5) erf(x) = \frac{2}{\sqrt{\pi}} \int_0^x e^{-t^2} dt \tag{5}

但是这个函数由于计算成本较高,因此有两个初等函数作为近似计算(但目前【2025年1月27日】其实很多框架已经可以精确计算 erf 函数)。

近似计算分析详细可以参见苏神的文章,GELU的两个初等函数近似是怎么来的

3. SwiGLU(SwishGLU)

SwiGLU(或者swishGLU,以下可能混用) 是 swish 激活函数和 GLU 门控单元的结合体,因此需要分别介绍两者的不同。

其中需要注意的是:在 T5 开始,很多模型(比如 PaLM )在FFN层都不用 bias 了,也就是说 FFN的公式变成了

FFN(x)=ActiveFunction(xW1)W2(6) FFN(x) = \text{ActiveFunction}(xW_1)W2 \tag{6}

注意公式 6 和公式 1 的区别,一共没有 bias 一个有 bias,但具体得看不同模型的实现,并不能一概而论。

3.1 swish 激活函数

swish 是一个非线性函数(激活函数都是如此,笑🤣),具体公式为:

Swish=xσ(βx) \text{Swish} = x \sigma(\beta x)

其中 β\beta 是一个超参数,当 β=1\beta = 1 时,Swish 就变成了 SiLU (Sigmoid Linear Unit),大多数框架的默认实现(如 PyTorch、TensorFlow 的 nn.SiLU())使用的是 β=1\beta = 1 的固定版本。

因此如果采用 swish 激活函数,FFN 的公式变成了

FFN(W1,W2,x)=Swish(xW1)W2 FFN(W_1, W_2, x) = \text{Swish}(xW_1)W2

共有两个可学习的矩阵,其中 w1,(h,4h)w_1,(h, 4h) 是升维矩阵,w2,(4h,h)w_2,(4h, h) 是降低维度的矩阵。

3.2 GLU 门控单元

GLU,Gated Linear Units,是一种门控结构(有参数,因此相对于普通的激活函数多了一个 gate 矩阵),通过 sigmoid 控制不同维度的激活。公式如下[1]

GLU(W,x,V,b,c)=(Wx+b)sigmoid(Vx+c)(7) GLU(W, x, V, b, c) = (Wx + b) \otimes \text{sigmoid}(Vx + c) \tag{7}

这里是不是熟悉 LSTM, GRU 的同学一下就理解,其中需要注意的是,b, c 对应的 bias 不是必须的。

对比公式 7 和公式 9,公式 9 中的 wupw_{up} 对应 公式 7 中的 WW,而 wgatew_{gate} 对应公式 7 中的 VV 矩阵。

3.3 SwiGLU 的表达形式

SwiGLU 就是把门控函数替换成了 swish,并且去除掉了 bias 部分,以及把 FFN 层的一个 Linear 层替换成了 GLU 层,因此一共有三个可训练的参数矩阵, w1, w2, w3。

因此最终的公式表达为,

FFN(W1,W2,W3,x)=W2(W1xSwish(W3x))(8) FFN(W_1, W_2, W_3, x) = W_2 \cdot (W_1x \otimes \text{Swish}(W_3x)) \tag{8}

而我们都知道 FFN 是一个升高维度,然后降低维度的过程,因此可以写成,W2 是一个降低维度的参数,W1 是升高维度的过程,而 W3 是一个 Gate 需要用到的参数矩阵。

FFN(wup,wdown,wgate)=wdown(wupxSwish(wgatex))(9) FFN(w_{up}, w_{down}, w_{gate}) = w_{down} \cdot (w_{up}x \otimes \text{Swish}(w_{gate}x)) \tag{9}

通过这个公式整体就非常的清晰理解使用 swiGLU 的 FFN。

而我们都知道在 basic 版本的 FFN,见公式(1), 只有 wupw_{up}wdownw_{down} 分别是 (h, 4h) 和(4h, h),因此整体参数是 8h28h^2

而公式9 中,一共有三个矩阵,如果想要实现总参数 8h28h^2,那么每一个参数矩阵的大小应该是 8h23\frac{8h^2}{3},因此 wup,wgatew_{up}, w_{gate} 的shape应该是 (h,8h3)(h, \frac{8h}{3})wdownw_{down} 的 shape 是 (8h3,h)(\frac{8h}{3}, h)

假设输入的 hidden_dim 大小是 hidden_dim,那么中间层(up 后的维度)大小是 mid_dim, 具体计算逻辑如下:

mid_dim = int(8 * hidden_dim / 3)
# multiple_of:make SwiGLU hidden layer size multiple of large power of 2
mid_dim = multiple_of * ((mid_dim + multiple_of - 1) // multiple_of)

# multiple_of 一般设置为 256, LLaMA 和 GPT等模型

注意,在 LLM (大语言模型) 架构中,multiple_of 是一个用于优化计算效率的参数,通常设置为 256 或其他 2 的幂次方数(如 128、512 等),最终让 mid_dim 调整为 multiple_of 的整数倍。这样做有几个原因:

  1. 硬件优化:现代 GPU/TPU 在处理 2 的幂次方大小的张量时效率最高
  2. 内存对齐:确保内存对齐可以提高计算速度
  3. 并行计算效率:某些并行计算操作在处理规整的数字时效率更高

3. 带有 swishGLU 的 FFN 代码实现

class FFNExpert(nn.Module):
    def __init__(self, hidden_dim, dropout):   # LLM 进化之路, FFN 激活函数从 GELU -> SwiGLU
        super().__init__()  

        # 有一个 magic number 叫做 8/3
        hidden_dim = hidden_dim
        # 这里可以自己去优化成 multiple_of 的倍数
        mid_dim = hidden_dim * 8 // 3

        self.up = nn.Linear(hidden_dim, mid_dim, bias=False)
        self.down = nn.Linear(mid_dim, hidden_dim, bias=False)
        self.gate = nn.Linear(hidden_dim, mid_dim, bias=False)

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = self.dropout(
            self.down(
                # up 之后的 Shape 是(b, s, mid_dim)
                # gate 和 up 之后的Shape都是 (b, s, mid_dim)
                # 两者是 element-wise 相乘
                F.silu(
                    self.gate(x)
                ) * self.up(x)
            )
        )
        return out

参考

最后欢迎关注我,基本全网同名 chaofa用代码打点酱油


  1. https://zhuanlan.zhihu.com/p/693332639 ↩︎

2024,公开表达元年

2024-12-29 05:00:00

等待新的生命

如果今年要挑一个最重大的事情,那只能是点点(我妻子)怀孕了,我明年就要当爹了。这事情的影响是非常巨大的,不仅涉及到家庭,而且是一个需要长期付出不可逆的过程。有无孩子一定会是两个截然不同的世界,所以我是有些恐慌的(教育孩子的难度不言而喻)。

由于点点想生两个孩子,因此在去年结婚之后,就开始盘算着什么时候生娃,不然到时候得做高龄产妇。左思右想后决定从 4 月份开始备孕,并且一定要让孩子在次年 9 月之前出生。之所以有这么荒诞的想法是因为我国的入学政策是:“凡年满6周岁的儿童,其父母或者其他法定监护人应当送其入学接受并完成义务教育。”

  • “年满6周岁的儿童”即在新学年开始前也即在每年9月1日前年满6周岁的儿童、少年。
  • 换句话说就是:如果不是在 9 月 1 日之前出生的孩子得和次年的 9月1日之前的孩子一起上学。

这里有一个很有意思的内容,关于点点的高度不自洽,一方面天天讲以后孩子打螺丝能养活自己就好了,一方面又很焦虑孩子的入学时间,不然可能导致她/他以后读博可能有年龄压力(就半年也不至于吧,而且孩子一定想读博吗🤣)。所以未来会怎么样呢,这种极大的不确定真是让人又担心又期待。

经过半年的时间,周末各种跑医院,所幸预产期会是在 25年的 9 月前,缓解了点点下半年最大的烦恼,但随之而来的孕早期一系列的孕反,嗳气、呕吐、尿频等,十分不易。我也在反思自己,我的关心不够,好像真的只会抖机灵逗她开心。。。

公开表达元年

如果要说今年最有意义的是一件事情,那就是【从十月份开始时不时录制一些技术视频】,并把它分享在 B站上。正是因为这样的公开表达,最终收获了一些朋友的关注,尤其部分比较热心的人甚至会私信我表示感谢,这里面充满了正反馈,也让我感受到了一点点意义。这里简单讲一下相关的数据(以后全网基本都叫【chaofa用代码打点酱油】了)

  • B 站,累计获得播放 10W+,收获粉丝 4.7k,很有成就感。
    • 2024,公开表达元年-20241228224736884

我写博客其实还挺早的,但是根本没有人在看,没有什么反馈,基本属于自嗨。第一次自建博客是 17年,那个时候写了一个关于自己学习 React 的一些记录,但是后面读研之后不做前端了就把对应的内容删除了。后面受到【极客兔兔】在 V站发帖自建博客过程的影响,又开始第二次写自己的博客,这时候是 19年的 6 月份,也就是当前的博客:chaofa用代码打点酱油,改过很多次名字,但唯一值得高兴的时候,这个博客持续存在了 5年,里面记录了自己很多的碎碎念。

那么为什么今年我却把它称为【公开表达元年】,因为这一次不一样了。以前我想过写公众号,想过做视频,想过回答知乎问题,但是大多数都没有坚持下去。为此我今年反思了一下为什么以前没有持续下去?

  • 公开表达的羞耻。程序员圈子很小,做相同的岗位的就更少了,因此写的东西很容易被同事、朋友看见,这时候总会有一种羞耻感,会想自己是不是太装逼了,是不是说了什么不合时宜的话。此外,因为自己在互联网大厂上班,朋友转发你的内容给自己的时候还会调侃几句,比如「工作不饱和啊」之类的,这时候只能相视一笑。
    • 但将心比心,我自己看到那些在持续做内容的博主,是非常佩服他们持续输出表达的能力与毅力,因此别人看你亦如是,只要做得足够多,自然会得到别人的认可。
  • 懒惰的惯性。懒惰这件事是自己无论如何都没法推脱责任的,平常工作确实是挺累的,但是大多数情况下周末是有足够的时间去做【公开输出】的,但是短视频、动漫、各种信息流,真的太吸引人了,躺在床上不一会一天就过去了。
  • 输出的难度远大于输入。我们大多数人都只是互联网的世界的消费者,每天都会输入很多的内容,这就是因为要做输出是很难的。

大多数人都知道“公开表达是难却正确的事情”,但是真正的领悟者却不多,李笑来在《把时间当做朋友》一书中提到,互联网用户行为遵循 "90-9-1原则"。

  • 90% 的用户是潜水者,只消费内容。
  • 9% 的用户偶尔参与互动,如评论或分享。
  • 1% 的用户是主要的内容创作者。

不是说因为创作者特别稀疏我们采取成为创作者的,而是这其中有巨大的好处。

  1. 每个人都有被看见的需求,只有生产才有可能被人看见。
  2. 产生有价值的内容是能够帮助到别人的。我自己是从很多公开的博客或者公众号获取到了很多知识,很感谢他们,帮助我进步。
  3. 可以认识一些大佬。这也许不算是什么目的,不过人就是会相互吸引。只要内容有价值,就会有人去看,那么自然应该会认识一些大佬。举个例子:我以前一直听播客(硬地骇客),然后因为我开始做公开表达了之后,明年也许有机会一起录一期播客,这真的很赞。

那么后面我应该怎么做呢?

  1. 持续、体系化的发表我的学习思考,比如《LLMs101-from-zero-to-hero》,这个系列应该会比较有意思吧。(立个 flag,明年我想把它体系化成一本电子书)
  2. 持续在公开网络上宣传自己的内容。哪怕强如「苏剑林,苏神」,除了在自己的博客中发表文章之外,也会一些交流群发布自己的文章链接。现在的内容太多了,除了依赖于推荐算法,我们还是需要适当的去社交媒体传播自己的内容。尽管自己生产的内容肯定不是最优质的那一批,但从部分读者的反馈看,我的内容还是有一些价值的,所以要慢慢刨除宣传羞耻感。

职场深度求索

去年换工作之后,高强度的工作了一段时间,加上和岗位、老板的风格不是很适应,没干多久就感觉天天精疲力尽的,很快就想要辞职,但是迫于职业生涯的延续性,我自然是不敢真的就裸辞,因此在苦苦坚持,想要寻求一些方法延续自己的职业生涯,比如:自我鼓励——《工作,再坚持坚持》,理性分析——《如何在大厂工作六个月以上且保持一定的心理健康?》,只能说收效甚微。毕竟饿了就想吃饭,累了就会想休息,天经地义。

差不多待一年之后,就开始考虑活水转岗 or 换工作,不过深圳的就业机会还是较少,思来想去还是觉得活水合适一些。然后开始内部看一些机会,这个时候又涉及到去干什么业务的问题,所以很纠结到底干什么?继续和在腾讯一样做广告相关的业务,还是去做搜索,还是去做推荐,还是去做纯AIGC的业务,还是去做NLP相关的业务,最后兜兜转转又到了与我最有缘分的客服。

活水可能也算是一次跳槽吧,毕竟也要经过3轮技术面试,因此我基本把它当作全新的工作,工作方式也适当地做出一些改变。工作上一直在向表现好的大佬学习,希望明年工作上有一些突破。

回应去年

  • 健身
    • 去年完成的最好的事情,却是今年完成最差的事情
    • 全年锻炼加起来可能不足 30次吧,比去年少了 3倍有余,尤其是 6月之后基本就没怎么去过健身房了。
  • 播客-打点酱油。目标是录制 4 期,但实际上也只录制了两期。整体还算满意吧。
    • 不过有一个 highlight,有机会做客硬地骇客,这个很棒。
    • 2024,公开表达元年-20241228223121397
  • 博客
    • 基本达成去年定下的 20k pv,10k uv 的目标。明年的目标是继续翻倍,这个应该没有什么悬念,只要持续输出就应该比较容易达到吧。
    • 2024,公开表达元年-20241228222451159
  • 投资
    • 投资一塌糊涂。因为投资了A股和中概,亏损不容小觑。以后有机会可以写一个【程序员破产之路】系列文章。
    • 从去年结婚后,开始进行投资记账,一共 400多天,有知有行显示我的资金加权收益 -14%,年化收益 -11.32%。
  • 读书。今年几乎没怎么看书,很失败,不过是早有预期的。

展望明年

  • 工作。工作优先级还是很高,毕竟要是没有工作带来的现金流会很容易摧毁一个家庭,尤其是明年还有新的生命,持续的现金流还是非常的重要。
  • 投资。减少投资上的关注,减少个股的投资,个股投资不能超过仓位 10%,此外要减少中概的持仓,还是换成 ETF 更容易拿住。(把钱还我,我不想玩了
  • 公开表达。明年要继续做体系化的视频,多分享文章,希望能有更多的正反馈,比如读者邮件(🤣
  • 健康。希望家人都身体健康,明年再多多锻炼吧,明年再定一个 100 次/年的锻炼目标,30mins+/次。

手写大模型组件之Group Query Attention,从 MHA,MQA 到 GQA

2024-12-09 06:00:00

  • GQA(Group Query Attention)的优点:效果损失小,推理的时候可以加速(来自于kvcache小,内存取数少)。
  • 仔细阅读 MHA, MQA 和 GQA的区别,就会发现 MHA 和 MQA 都是 GQA 的特殊表达形式
    • 三者可以用同一套代码,只需要修改【GQA】代码里面的 nums_key_value_head 参数就可
    • nums_key_value_head 设置等于 1 就是 MQA
    • nums_key_value_head 设置等于 nums_head 就是 MHA

如果不喜欢看文字的同学可以查看 B站 或者 YouTube 视频。

B站:https://www.bilibili.com/video/BV1ZmqpYfEGY/

YouTube: https://www.youtube.com/watch?v=1jBW7qcyd7A&t=1s

multi-head self-attention

备注:也可以直接由 GQA 中修改参数得到。但是本代码更完整一些

import math
import torch
import torch.nn as nn

class MultiHeadAttention(nn.Module):
    def __init__(self, hidden_dim, nums_head) -> None:
        super().__init__()
        self.nums_head = nums_head

        # 一般来说,
        self.head_dim = hidden_dim // nums_head
        self.hidden_dim = hidden_dim

        # 一般默认有 bias,需要时刻主意,hidden_dim = head_dim * nums_head,所以最终是可以算成是 n 个矩阵
        self.q_proj = nn.Linear(hidden_dim, hidden_dim)
        self.k_proj = nn.Linear(hidden_dim, hidden_dim)
        self.v_proj = nn.Linear(hidden_dim, hidden_dim)

        # gpt2 和 bert 类都有,但是 llama 其实没有
        self.att_dropout = nn.Dropout(0.1)
        # 输出时候的 proj
        self.o_proj = nn.Linear(hidden_dim, hidden_dim)

    def forward(self, X, attention_mask=None):
        # 需要在 mask 之前 masked_fill
        # X shape is (batch, seq, hidden_dim)
        # attention_mask shape is (batch, seq)

        batch_size, seq_len, _ = X.size()

        Q = self.q_proj(X)
        K = self.k_proj(X)
        V = self.v_proj(X)

        # shape 变成 (batch_size, num_head, seq_len, head_dim)
        q_state = Q.view(batch_size, seq_len, self.nums_head, self.head_dim).permute(
            0, 2, 1, 3
        )
        k_state = K.view(batch_size, seq_len, self.nums_head, self.head_dim).transpose(
            1, 2
        )
        v_state = V.view(batch_size, seq_len, self.nums_head, self.head_dim).transpose(
            1, 2
        )
        # 主意这里需要用 head_dim,而不是 hidden_dim
        attention_weight = (
            q_state @ k_state.transpose(-1, -2) / math.sqrt(self.head_dim)
        )
        print(type(attention_mask))
        if attention_mask is not None:
            attention_weight = attention_weight.masked_fill(
                attention_mask == 0, float("-1e20")
            )

        # 第四个维度 softmax
        attention_weight = torch.softmax(attention_weight, dim=3)
        print(attention_weight)

        attention_weight = self.att_dropout(attention_weight)
        output_mid = attention_weight @ v_state

        # 重新变成 (batch, seq_len, num_head, head_dim)
        # 这里的 contiguous() 是相当于返回一个连续内存的 tensor,一般用了 permute/tranpose 都要这么操作
        # 如果后面用 Reshape 就可以不用这个 contiguous(),因为 view 只能在连续内存中操作
        output_mid = output_mid.transpose(1, 2).contiguous()

        # 变成 (batch, seq, hidden_dim),
        output = output_mid.view(batch_size, seq_len, -1)
        output = self.o_proj(output)
        return output


attention_mask = (
    torch.tensor(
        [
            [0, 1],
            [0, 0],
            [1, 0],
        ]
    )
    .unsqueeze(1)
    .unsqueeze(2)
    .expand(3, 8, 2, 2)
)

x = torch.rand(3, 2, 128)
net = MultiHeadAttention(128, 8)
net(x, attention_mask).shape

Group Query Attention

备注:以下代码省略了 attention_dropout attention_mask等情况的处理,真实实现过程中需要考虑。

import torch
import torch.nn as nn
import math

# 忽略了 attention_mask, attention_dropout; 
class GroupQueryAttention(nn.Module):
    def __init__(self, hidden_dim, nums_head, nums_key_value_head):
        super().__init__()
        assert hidden_dim % nums_head == 0 # 可以整除
        assert nums_head % nums_key_value_head == 0  # N 个 query head 为一组

        self.hidden_dim = hidden_dim
        self.nums_head = nums_head
        self.nums_key_value_head = nums_key_value_head
        self.head_dim = hidden_dim // nums_head

        # 初始化 qkv o
        self.q_proj = nn.Linear(hidden_dim, nums_head * self.head_dim)  # out feature_size (nums_head * head_dim)
        # k v out shape (nums_key_value_head * head_dim)
        self.k_proj = nn.Linear(hidden_dim, nums_key_value_head * self.head_dim)
        self.v_proj = nn.Linear(hidden_dim, nums_key_value_head * self.head_dim)

        self.o_proj = nn.Linear(hidden_dim, hidden_dim) # input_size nums_head * head_dim

    def forward(self, X, attention_mask=None):
        # X shape (batch, seq, hidden_dim)
        batch_size, seq, _ = X.size()

        # qkv projection
        q = self.q_proj(X)  # (batch, seq, hidden_dim)
        k = self.k_proj(X)
        v = self.v_proj(X) 

        # attention_weight 目标shape 是 (batch, nums_head, seq, seq)
        q = q.view(batch_size, seq, self.nums_head, self.head_dim)
        k = k.view(batch_size, seq, self.nums_key_value_head, self.head_dim)
        v = v.view(batch_size, seq, self.nums_key_value_head, self.head_dim)

        # 关注: nums_head 和 nums_key_value_head 的关系
        q = q.transpose(1, 2) # (b, nums_head, seq, head_dim)
        k = k.transpose(1, 2) # (b, nums_key_value_head, seq, head_dim)
        v = v.transpose(1, 2)  # (b, nums_key_value_head, seq, head_dim)

        # k v repeat; (广播操作)
        k = k.repeat_interleave(self.nums_head // self.nums_key_value_head, dim=1)
        v = v.repeat_interleave(self.nums_head // self.nums_key_value_head, dim=1)

        attention_score = (q @ k.transpose(2, 3)) / math.sqrt(self.head_dim)

        attention_weight = torch.softmax(attention_score, dim=-1)
        # (attention_mask 忽略) # 可以看前面的视频

        output = attention_weight @ v  # (b, nums_head, seq, head_dim)

        # output projection 变成 (b, seq, hidden_dim)
        output = output.transpose(1, 2).contiguous()
        final_output = self.o_proj(output.view(batch_size, seq, -1))

        return final_output

# 测试
x = torch.rand(3, 2, 128)
net = GroupQueryAttention(128, 8, 4)
net(x).shape

Multi Query Attention

由于 MQA 是 GQA 的一种特殊形式,因此只要在参数设置的时候将 nums_key_value_head = 1 就是 Multi Query Self-Attention。

交个朋友🤣

最后欢迎关注我,基本全网同名 chaofa用代码打点酱油