2025-10-05 06:18:00
这个词还是来源于孙宇晨的财富自由革命之路,我真的被孙宇晨圈粉了,甚至可以说部分行为都受其影响。
我比较想提的一点是「合法赚钱的高尚性」,音频中提到大多数国人都有一个赚钱的羞耻心,当然我也有。首先需要科普一下什么叫做合法赚钱的高尚性。
我个人绝不是一个社会达尔文主义支持者,也不是优绩主义的支持者,但孙宇晨提出的合法赚钱是高尚的却击中了我。我们从小受到的教育是“只有不求汇报才是高尚的”,以至于很多人忽略了商业逻辑,只是一味的认为:
而事实上要赚钱真的很难,无论是工作还是投资,都必须付出大量精力和努力才有可能有所回报。就以我自己做视频为例:一开始我是存粹的知识分享,但是后面无论是从频道未来发展,还是个人收益层面看,都需要有一定的商业化才能更健康可持续。
因此我个人思考逻辑也有了一定的变化,这里说两个比较好玩的例子:
我在 25 年 5 月份的月度总结中提到同样的金钱的有不同的分量,那时候的我根本没想靠这个赚钱,说话非常的硬气,甚至前几天(2025-09-29)有一个粉丝朋友给我发消息说「我的文章有一些追求和人生理想」,我感动之余又变得有一丝惭愧。
原因是什么呢?
我最近两个视频都是广告,一个是显示器,一个是沉浸式翻译。其中显示器广告是因为我想要换一个显示器,而沉浸式翻译虽然是我自己就想推荐的东西,说得也都是我自己想说的(甲方几乎没有任何要求),但在外人看人总归是不客观的。
但广告可能都是其次,核心原因是我在 Q3 期间几乎没有输入任何技术相关的内容,看起来我像是只是为了赚钱做视频。
出现这个问题的原因有很多:
回到小标题:chaofa 做视频的初衷变了吗?
我觉得答案是:暂时还没有变,但是等完成目标之后一定会变化的。
那么我做视频的初衷是什么呢?
去年10 月份我刚开始做视频的时候,写了一篇文章普通人从零开始做公开表达的增长策略,提到,出发点主要有两个:
所以说等我写完书[2]之后,我一定会更多的考虑赚钱的事情,合法赚钱本身就是高尚的。这也是我听课之后思维的变化,所以孙宇晨真的牛逼,值得学习。
相对于工作,做自媒体肯定是不赚钱的,这也是我为什么在这个上面投入的时间非常的少,因此一定是工作为主,做视频依然要保持图一乐心态。
为什么我说赚钱很难?有两点
2025 年,还有遍地黄金的机会吗?我想应该是没有了。
最后欢迎来探讨对世界的认知,基本全网同名 chaofa用代码打点酱油 (推荐)
2025-10-03 19:53:20
本文旨在祛魅【Agentic RAG】的概念,因此本文的阅读收获包括:
如果说 2024 年,LLM(Large Language Model) 落地最广泛且最有实用价值的一项技术,那么我提名 RAG(Retrieval Augmented Generation) 应该不会有太多的反对。但 2025 年最火的概念变成 Agent,而 RAG 似乎变成了一个基础组件,提的不多却是融合到了 Agent 的日常使用中了,尤其是 OpenAI DeepResearch 的出现,让 Agentic RAG 成了 2025 年最成功的 RAG 应用之一。
但网络上有很多文章,把 Agentic RAG 说得玄乎,故意制造难懂的概念从而达到抬高自身的目的。但实际上我们只需要理清楚两个概念,就可以知道什么是 Agentic RAG。
Agentic RAG
实际上就是指在传统 RAG 基础上,加入了 Agent 组件的 RAG 系统,任何实现了 Agentic Search
能力的 RAG 系统都可以称为 Agentic RAG
。传统的 RAG(Native RAG)并不是一个复杂的概念,核心概念就两个:检索(Retrieval)和生成(生成)。因此要做好 RAG 就是两件事情:
因此 RAG 系统架构可以如下图所示:
NATIVE RAG
一般来说可以分成两个不同的链路:离线和在线。具体的代码可以参考:动手学习大模型-中文版-第八章-native-rag 源代码
requires-python = ">=3.12"
dependencies = [
"langchain>=0.3.27",
"langchain-chroma>=0.2.6",
"langchain-community>=0.3.30",
"langchain-deepseek>=0.1.4",
"langchain-openai>=0.3.34",
"langgraph>=0.6.8",
]
离线入库是指将文档处理成向量并存储到向量数据库中,以便后续检索使用。这个过程主要包括:文档加载、文本切分、向量化、存储。
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_chroma import Chroma
# 1. 加载文档
loader = TextLoader("knowledge_base.txt")
documents = loader.load()
# 2. 文本切分
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # 每个文本块的大小
chunk_overlap=50, # 文本块之间的重叠部分
)
splits = text_splitter.split_documents(documents)
# 3. 向量化并存储
embeddings = OpenAIEmbeddings(
base_url="https://api.siliconflow.cn/v1",
model="Qwen/Qwen3-Embedding-0.6B",
)
vectorstore = Chroma.from_documents(
documents=splits,
embedding=embeddings,
persist_directory="./chroma_db", # 持久化存储路径
)
print(f"成功将 {len(splits)} 个文本块存入向量数据库")
在线应用是指用户提问时,系统检索相关文档并生成回答的过程。主要包括:用户查询、检索相关文档、构建提示词、LLM 生成回答。
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain_openai import ChatOpenAI
from langchain.prompts import PromptTemplate
# 1. 加载已有的向量数据库
embeddings = OpenAIEmbeddings(
base_url="https://api.siliconflow.cn/v1",
model="Qwen/Qwen3-Embedding-0.6B",
)
vectorstore = Chroma(persist_directory="./chroma_db", embedding_function=embeddings)
# 2. 用户提问
query = "什么是RAG?"
# 3. 检索相关文档(返回最相关的 3 个)
docs = vectorstore.similarity_search(query, k=3)
# 4. 将检索到的文档内容拼接成上下文
context = "\n\n".join([doc.page_content for doc in docs])
# 5. 构建 Prompt 模板
prompt_template = """
你是一个专业的问答助手。请根据以下参考文档回答用户的问题。
如果参考文档中没有相关信息,请诚实地说不知道,不要编造答案。
参考文档:
{context}
用户问题:{question}
回答:
"""
prompt = PromptTemplate(
template=prompt_template,
input_variables=["context", "question"],
)
# 6. 创建 LLM 并生成回答
llm = ChatOpenAI(
model="THUDM/glm-4-9b-chat",
temperature=0,
max_retries=3,
base_url="https://api.siliconflow.cn/v1",
)
final_prompt = prompt.format(context=context, question=query)
print(f"最终的 Prompt 内容:{final_prompt}")
response = llm.predict(final_prompt)
# 7. 输出结果
print(f"问题: {query}")
print(f"回答: {response}")
print(f"\n参考文档数量: {len(docs)}")
Agentic RAG
的核心“不是更复杂的模型”,而是“让模型学会做事”。和一次性把文档塞进 Prompt 就生成答案的 Native RAG 相比,Agentic RAG 让大模型扮演一个“决策-执行”的控制器:先制定策略,再调用工具逐步收集证据,最后基于证据作答并给出引用。
所以说:模型通过自主决策实现的 RAG 过程,我们就可以称之为 Agentic RAG
。无论这个过程是发现在离线入库阶段(当然 Agentic RAG 其实可以不严格区分 offline/online 截断,都可以让 Agent 自主决策),还是 RAG 生成阶段的 search query rewrite
,rerank
还是 dynamic search
等,只要有模型的自主决策过程,那么就可以称为 Agentic RAG
。具体的形式可以参考 Agentic RAG 流程图(将 search 能力变成一个工具,模型可以根据需要调用):
给模型更多的自主决策空间、配备合适的工具,LLM 会给你出乎意料的智能。 好处:更强的适应性(可改写查询/追加搜索)、更深的证据利用(读到再答)、更可归因(引用具体来源)。
如果想了解更多的 Agentic RAG
的工业级别的实现,我觉得可以参考「开源项目 chatbox」的实现,该项目是一个比较早的 LLM Chat 集成的项目,并且算是比较早的实现了 Agentic RAG
。因为作为一个离线的 LLM chat 项目,对于时延等问题可以有更少的考虑,从而更激进的、更早阶段将 naive chat 变成 Agentic Chat。
ReAct 是一个常见的 Agent 实现方式,因此只要给 LLM 配备合适的 Tool
以及适当的引导 Prompt
,就可以将一个 Native RAG
转换成 Agentic RAG
。这里我通过解读 36.8k star
开源企业级项目——chatbox来讲解一个 Agentic RAG 是怎么实现的,以及为什么它在复杂场景下效果好[^1]。
下面是 Chatbox
的整体流程图,可以分为两个部分,左半部分是 Agentic RAG
,右半部分是介于 Native RAG
到 Agentic RAG
之间的 Native RAG
。
因此我们重点来解读 chatbox
到底是怎么设置工具,来实现更好的 Agentic Search
,然后再给出最小示例代码:
包括 Anthropic 的 context engineering 文章中也提到了
Agentic Seach
对于 Agent 应用是非常重要的。
query_knowledge_base
get_files_meta
read_file_chunks
ID + chunkIndex
精读具体片段,用于“取证”。建议一次只读少量最相关的 chunk
,以降低噪声。list_files
这里我想通过一个例子让读者理解什么是 Agentic RAG
。
2025-01-28 03:30:00
本次课一共讲解三个不同版本的 MOE,分别是基础版MOE,大模型训练用的 SparseMoE,还有 DeepSeek 用的比较多的 shared_expert 的 SparseMoE。
输入是一个 Token, 输出是一个 Token Embedding。暂时先不考虑 MOE 得到的 Embedding 怎么使用。
因为 MOE 网络对应着 Expert,这个 Expert 一般是一个 FeadFoward Network,FFN。而为了简化,后续我们都用一层的 Linear 代替,更高级版本的 Expert 留给大家当做课后作业。下面是一个专家的定义。
class BasicExpert(nn.Module):
# 一个 Expert 可以是一个最简单的, linear 层即可
# 也可以是 MLP 层
# 也可以是 更复杂的 MLP 层(active function 设置为 swiglu)
def __init__(self, feature_in, feature_out):
super().__init__()
self.linear = nn.Linear(feature_in, feature_out)
def forward(self, x):
return self.linear(x)
基础版本的 MOE 可以看这个图,非常的简单。
class BasicMOE(nn.Module):
def __init__(self, feature_in, feature_out, expert_number):
super().__init__()
self.experts = nn.ModuleList(
[
BasicExpert(feature_in, feature_out) for _ in range(expert_number)
]
)
# gate 就是选一个 expert
self.gate = nn.Linear(feature_in, expert_number)
def forward(self, x):
# x 的 shape 是 (batch, feature_in)
expert_weight = self.gate(x) # shape 是 (batch, expert_number)
expert_out_list = [
expert(x).unsqueeze(1) for expert in self.experts
] # 里面每一个元素的 shape 是: (batch, ) ??
# concat 起来 (batch, expert_number, feature_out)
expert_output = torch.cat(expert_out_list, dim=1)
# print(expert_output.size())
expert_weight = expert_weight.unsqueeze(1) # (batch, 1, expert_nuber)
# expert_weight * expert_out_list
output = expert_weight @ expert_output # (batch, 1, feature_out)
return output.squeeze()
def test_basic_moe():
x = torch.rand(2, 4)
basic_moe = BasicMOE(4, 3, 2)
out = basic_moe(x)
print(out)
test_basic_moe()
这个一般我们用 switch transformers 这篇文章的图作为演示,详情看:
和 Basic 区别是,MOE 选择 topK 个专家,然后对这 topK 个专家的输出进行加权求和,并且把输入样本变成了大模型中真实的输入 Shape,(batch, seq_len, hidden_dim)
# 主要参考自 mistral MOE 的实现
class MOERouter(nn.Module):
def __init__(self, hidden_dim, expert_number, top_k):
super().__init__()
self.gate = nn.Linear(hidden_dim, expert_number)
self.expert_number = expert_number
self.top_k = top_k
def forward(self, hidden_states):
# 计算路由logits
router_logits = self.gate(hidden_states) # shape is (b * s, expert_number)
# 计算专家经过softmax之后的概率
routing_probs = F.softmax(router_logits, dim=-1, dtype=torch.float)
# 计算topk的专家的输出
router_weights, selected_experts = torch.topk(
routing_probs, self.top_k, dim=-1
) # shape都是 (b * s, top_k)
# 专家权重归一化
router_weights = router_weights / router_weights.sum(dim=-1, keepdim=True)
router_weights = router_weights.to(hidden_states.dtype)
# 生成专家掩码
expert_mask = F.one_hot(
selected_experts,
num_classes=self.expert_number
) # shape是 (b * s, top_k, expert_number)
expert_mask = expert_mask.permute(2, 1, 0) # (expert_number, top_k, b * s)
return router_logits, router_weights, selected_experts, expert_mask
class MOEConfig:
def __init__(
self,
hidden_dim,
expert_number,
top_k,
shared_experts_number=2,
):
self.hidden_dim = hidden_dim
self.expert_number = expert_number
self.top_k = top_k
self.shared_experts_number = shared_experts_number
class SparseMOE(nn.Module):
# 稀疏 MOE 模型,这里每一个 token 都会过 topk 个专家,得到对应token 的 hidden_embeddings
def __init__(self, config):
super().__init__()
self.hidden_dim = config.hidden_dim
self.expert_number = config.expert_number
self.top_k = config.top_k
self.experts = nn.ModuleList(
[
BasicExpert(self.hidden_dim, self.hidden_dim) for _ in range(self.expert_number)
]
)
self.router = MOERouter(self.hidden_dim, self.expert_number, self.top_k)
def forward(self, x):
# x shape is (b, s, hidden_dim)
batch_size, seq_len, hidden_dim = x.size()
# 合并前两个维度,因为不是 Sample 维度了,而是 token 维度
hidden_states = x.view(-1, hidden_dim) # shape is(b * s, hidden_dim)
router_logits, router_weights, selected_experts_indices, expert_mask = self.router(hidden_states)
# 其中 selected_experts_indices shape 是 (b * s, top_k)
# 其中 expert_mask shape 是 (expert_number, top_k, b * s)
final_hidden_states = torch.zeros(
(batch_size * seq_len, hidden_dim),
dtype=hidden_states.dtype,
device=hidden_states.device
)
for expert_idx in range(self.expert_number):
expert_layer = self.experts[expert_idx]
# expert_mask[expert_idx] shape 是 (top_k, b * s)
idx, top_x = torch.where(expert_mask[expert_idx])
# idx 和 top_x 都是一维 tensor
# idx 的值是 0 或 1, 表示这个 token 是作为当前专家的 top1 还是 top2
# top_x 的值是 token 在 batch*seq_len 中的位置索引
# 例如对于 batch_size=2, seq_len=4 的输入:
# top_x 的值范围是 0-7, 表示在展平后的 8 个 token 中的位置
# idx 的值是 0/1, 表示这个 token 把当前专家作为其 top1/top2 专家
# hidden_states 的 shape 是 (b * s, hidden_dim)
# 需要取到 top_x 对应的 hidden_states
current_state = hidden_states.unsqueeze(
0
)[:, top_x, :].reshape(-1, hidden_dim) # (selected_token_number, hidden_dim)
# router_weight 的 shape 是 (b * s, top_k)
current_hidden_states = expert_layer(
current_state
) * router_weights[top_x, idx].unsqueeze(-1) # (selected_token_number, 1) 这里有广播
# 把当前专家的输出加到 final_hidden_states 中
# 方式1 的写法性能更好,并且方式1容易出现
final_hidden_states.index_add_(0, top_x, current_hidden_states.to(hidden_states.dtype))
# 方式2
# final_hidden_states[top_x] += current_hidden_states.to(hidden_states.dtype)
# 方式2 的写法性能更差,并且方式2容易出现错误,+= 操作在处理重复索引时需要多次读写内存,可能会导致竞争条件
# 把 final_hidden_states 还原到原来的 shape
final_hidden_states = final_hidden_states.reshape(batch_size, seq_len, hidden_dim)
return final_hidden_states, router_logits # shape 是 (b * s, expert_number)
def test_token_level_moe():
x = torch.rand(2, 4, 16)
config = MOEConfig(16, 2, 2)
token_level_moe = SparseMOE(config)
out = token_level_moe(x)
print(out[0].shape, out[1].shape)
test_token_level_moe()
备注:这里是参考 deepseek moe 思想,写的一个共享 expert 的 MOE 网络,有一定的简化,但是可以方便理解训练过程。
和 版本2 的 SparseMOE 区别是,这里多了一个 shared experts 的模型,这个模型是所有 token 共享的,也就是说,所有 token 都过这个 shared experts 模型,然后每个 token 会用计算的 Router 权重,来选择 topK 个专家,然后和共享的专家的输出一起加权求和。
具体结构图为:
class ShareExpertMOE(nn.Module):
def __init__(self, config):
super().__init__()
self.moe_model = SparseMOE(config)
self.shared_experts = nn.ModuleList(
[
BasicExpert(
config.hidden_dim, config.hidden_dim
) for _ in range(config.shared_experts_number)
]
)
def forward(self, x):
# x shape 是 (b, s, hidden_dim)
# 首先过 moe 模型
sparse_moe_out, router_logits = self.moe_model(x)
# 针对的还是 x 的每一个
# 然后过 shared experts
shared_experts_out = [
expert(x) for expert in self.shared_experts
] # 每一个 expert 的输出 shape 是 (b, s, hidden_dim)
shared_experts_out = torch.stack(
shared_experts_out, dim=0
).sum(dim=0)
# 把 sparse_moe_out 和 shared_experts_out 加起来
return sparse_moe_out + shared_experts_out, router_logits
def test_share_expert_moe():
x = torch.rand(2, 4, 16)
config = MOEConfig(16, 2, 2)
share_expert_moe = ShareExpertMOE(config)
out = share_expert_moe(x)
print(out[0].shape, out[1].shape)
test_share_expert_moe()
用于测试上面的代码是否可以跑通?
def switch_load_balancing_loss(router_logits: torch.Tensor, num_experts: int) -> torch.Tensor:
"""
计算 Switch Transformers 的负载均衡损失
Args:
router_logits: shape [batch_size * sequence_length, num_experts]
num_experts: 专家数量
Returns:
total_loss: 总损失 = auxiliary_loss + z_loss
"""
# 计算路由概率
router_probs = torch.softmax(router_logits, dim=-1) # [b*s, num_experts]
# 获取每个token的最优专家
_, selected_experts = torch.topk(router_probs, k=2, dim=-1)
# 创建one-hot矩阵表示选中的专家
mask = torch.nn.functional.one_hot(selected_experts, num_experts).float()
# 计算每个专家的期望负载 (理想情况下应该是 1/num_experts)
expected_load = torch.ones_like(router_probs) / num_experts
# 计算实际负载 (每个专家处理的token数量除以总token数量)
# 在batch维度上计算平均值
actual_load = mask.mean(dim=0)
# 计算auxiliary loss
# 这会惩罚负载分布与期望负载的差异
aux_loss = torch.sum(actual_load * router_probs.mean(dim=0)) * num_experts
# 计算z_loss (可选)
# 这会惩罚过大的路由logits
z_loss = torch.mean(torch.square(router_logits))
z_loss_weight = 0.001 # 可调整的超参数
# 总损失
total_loss = aux_loss + z_loss * z_loss_weight
return total_loss
def test_moe_training():
# Create a simple dataset
batch_size = 32
seq_len = 16
hidden_dim = 32
num_batches = 100
# Initialize model and optimizer
config = MOEConfig(hidden_dim=hidden_dim,
expert_number=4,
top_k=2,
shared_experts_number=2)
model = ShareExpertMOE(config)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# Training loop
model.train()
for batch in range(num_batches):
# Generate random input data
x = torch.randn(batch_size, seq_len, hidden_dim)
target = torch.randn(batch_size, seq_len, hidden_dim)
# Forward pass
output, router_logits = model(x)
# Compute losses
# MSE loss for prediction
mse_loss = F.mse_loss(output, target)
aux_loss = switch_load_balancing_loss(router_logits, config.expert_number)
# Combined loss
total_loss = mse_loss + 0.01 * aux_loss
# Backward pass and optimize
optimizer.zero_grad()
total_loss.backward()
optimizer.step()
if batch % 10 == 0:
print(f"Batch {batch}, Loss: {total_loss.item():.4f} "
f"(MSE: {mse_loss.item():.4f}, Aux: {aux_loss.item():.4f})")
# Run the training test
test_moe_training()
最后欢迎关注我,基本全网同名 chaofa用代码打点酱油
2025-01-28 02:58:00
自 chatGPT 22年底问世以来,大模型(Large Language Model, LLM)一般使用 Causal Language Model 的形式,属于 Transformers 中的 Decoder 部分,其中在 Decoder 的 Block 中有一个 FFN(FeadForward) 层,一般认为这部分参数用于存储知识。而标准的 FFN 一般有一个升维度和降维度的过程,一共有两个权重矩阵,用公式表示为
其中 x shape 是 ,w1 shape 是 ,w2 shape 是 , w1 是升维(up),w2 是降维(down)
激活函数主要是为了实现神经网络学习输入和输出之间的复杂非线性关系而使用的一个函数。在公式 (1) 中,ReLU
是一个激活函数(Transfromers原版),可以替换成其他的激活函数,比如 BERT 开始用 Gaussian Error Linear Unit,GELU
比较多,随后就成了激活函数的主流选择,但是随着大模型的爆火以及 PaLM 模型的发布,大家开始慢慢使用 swishGLU 作为激活函数,并且作为一个主要的优化点。
具体可以看下面一段代码即可清楚的理解 FFN 模型是什么实现的。
class FeedForward(nn.Module):
# 实际上就是 MLP
def __init__(self, config):
super().__init__()
self.net = nn.Sequential(
nn.Linear(config.n_embd, 4 * config.n_embd),
# 激活函数
nn.ReLU(),
# 可以替换成 nn.GELU(),
# 但是 如果是 SwishGLU 则实现方式有所不同,接下来就会介绍 swishGLU 是怎么实现的
nn.Linear(4 * config.n_embd, config.n_embd),
nn.Dropout(config.dropout)
)
def forward(self, x):
return self.net(x)
ReLU 深度学习以来最常用的激活函数,其公式非常的简单。
从 GPT、BERT 以来,GELU 似乎成了新时代取代 ReLU 的激活函数,具体形式如下:
其中 是标准正态分布的累计分布函数,定义为
这里的 erf
是误差函数
但是这个函数由于计算成本较高,因此有两个初等函数作为近似计算(但目前【2025年1月27日】其实很多框架已经可以精确计算 erf 函数)。
近似计算分析详细可以参见苏神的文章,GELU的两个初等函数近似是怎么来的
SwiGLU(或者swishGLU,以下可能混用) 是 swish 激活函数和 GLU 门控单元的结合体,因此需要分别介绍两者的不同。
其中需要注意的是:在 T5 开始,很多模型(比如 PaLM )在FFN层都不用 bias 了,也就是说 FFN的公式变成了
注意公式 6 和公式 1 的区别,一共没有 bias 一个有 bias,但具体得看不同模型的实现,并不能一概而论。
swish 是一个非线性函数(激活函数都是如此,笑🤣),具体公式为:
其中 是一个超参数,当 时,Swish 就变成了 SiLU (Sigmoid Linear Unit),大多数框架的默认实现(如 PyTorch、TensorFlow 的 nn.SiLU()
)使用的是 的固定版本。
因此如果采用 swish 激活函数,FFN 的公式变成了
共有两个可学习的矩阵,其中 是升维矩阵, 是降低维度的矩阵。
GLU,Gated Linear Units,是一种门控结构(有参数,因此相对于普通的激活函数多了一个 gate
矩阵),通过 sigmoid 控制不同维度的激活。公式如下[1]:
这里是不是熟悉 LSTM, GRU 的同学一下就理解,其中需要注意的是,b, c
对应的 bias 不是必须的。
对比公式 7 和公式 9,公式 9 中的 对应 公式 7 中的 ,而 对应公式 7 中的 矩阵。
而 SwiGLU
就是把门控函数替换成了 swish
,并且去除掉了 bias
部分,以及把 FFN 层的一个 Linear 层替换成了 GLU 层,因此一共有三个可训练的参数矩阵, w1, w2, w3。
因此最终的公式表达为,
而我们都知道 FFN 是一个升高维度,然后降低维度的过程,因此可以写成,W2 是一个降低维度的参数,W1 是升高维度的过程,而 W3 是一个 Gate 需要用到的参数矩阵。
通过这个公式整体就非常的清晰理解使用 swiGLU 的 FFN。
而我们都知道在 basic 版本的 FFN,见公式(1), 只有 和 分别是 (h, 4h) 和(4h, h),因此整体参数是 。
而公式9 中,一共有三个矩阵,如果想要实现总参数 ,那么每一个参数矩阵的大小应该是 ,因此 的shape应该是 , 的 shape 是 。
假设输入的 hidden_dim
大小是 hidden_dim
,那么中间层(up 后的维度)大小是 mid_dim
, 具体计算逻辑如下:
mid_dim = int(8 * hidden_dim / 3)
# multiple_of:make SwiGLU hidden layer size multiple of large power of 2
mid_dim = multiple_of * ((mid_dim + multiple_of - 1) // multiple_of)
# multiple_of 一般设置为 256, LLaMA 和 GPT等模型
注意,在 LLM (大语言模型) 架构中,multiple_of 是一个用于优化计算效率的参数,通常设置为 256 或其他 2 的幂次方数(如 128、512 等),最终让 mid_dim
调整为 multiple_of
的整数倍。这样做有几个原因:
class FFNExpert(nn.Module):
def __init__(self, hidden_dim, dropout): # LLM 进化之路, FFN 激活函数从 GELU -> SwiGLU
super().__init__()
# 有一个 magic number 叫做 8/3
hidden_dim = hidden_dim
# 这里可以自己去优化成 multiple_of 的倍数
mid_dim = hidden_dim * 8 // 3
self.up = nn.Linear(hidden_dim, mid_dim, bias=False)
self.down = nn.Linear(mid_dim, hidden_dim, bias=False)
self.gate = nn.Linear(hidden_dim, mid_dim, bias=False)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
out = self.dropout(
self.down(
# up 之后的 Shape 是(b, s, mid_dim)
# gate 和 up 之后的Shape都是 (b, s, mid_dim)
# 两者是 element-wise 相乘
F.silu(
self.gate(x)
) * self.up(x)
)
)
return out
最后欢迎关注我,基本全网同名 chaofa用代码打点酱油
https://zhuanlan.zhihu.com/p/693332639 ↩︎
2024-12-29 05:00:00
如果今年要挑一个最重大的事情,那只能是点点(我妻子)怀孕了,我明年就要当爹了。这事情的影响是非常巨大的,不仅涉及到家庭,而且是一个需要长期付出不可逆的过程。有无孩子一定会是两个截然不同的世界,所以我是有些恐慌的(教育孩子的难度不言而喻)。
由于点点想生两个孩子,因此在去年结婚之后,就开始盘算着什么时候生娃,不然到时候得做高龄产妇。左思右想后决定从 4 月份开始备孕,并且一定要让孩子在次年 9 月之前出生。之所以有这么荒诞的想法是因为我国的入学政策是:“凡年满6周岁的儿童,其父母或者其他法定监护人应当送其入学接受并完成义务教育。”
这里有一个很有意思的内容,关于点点的高度不自洽,一方面天天讲以后孩子打螺丝能养活自己就好了,一方面又很焦虑孩子的入学时间,不然可能导致她/他以后读博可能有年龄压力(就半年也不至于吧,而且孩子一定想读博吗🤣)。所以未来会怎么样呢,这种极大的不确定真是让人又担心又期待。
经过半年的时间,周末各种跑医院,所幸预产期会是在 25年的 9 月前,缓解了点点下半年最大的烦恼,但随之而来的孕早期一系列的孕反,嗳气、呕吐、尿频等,十分不易。我也在反思自己,我的关心不够,好像真的只会抖机灵逗她开心。。。
如果要说今年最有意义的是一件事情,那就是【从十月份开始时不时录制一些技术视频】,并把它分享在 B站上。正是因为这样的公开表达,最终收获了一些朋友的关注,尤其部分比较热心的人甚至会私信我表示感谢,这里面充满了正反馈,也让我感受到了一点点意义。这里简单讲一下相关的数据(以后全网基本都叫【chaofa用代码打点酱油】了)
我写博客其实还挺早的,但是根本没有人在看,没有什么反馈,基本属于自嗨。第一次自建博客是 17年,那个时候写了一个关于自己学习 React 的一些记录,但是后面读研之后不做前端了就把对应的内容删除了。后面受到【极客兔兔】在 V站发帖自建博客过程的影响,又开始第二次写自己的博客,这时候是 19年的 6 月份,也就是当前的博客:chaofa用代码打点酱油,改过很多次名字,但唯一值得高兴的时候,这个博客持续存在了 5年,里面记录了自己很多的碎碎念。
那么为什么今年我却把它称为【公开表达元年】,因为这一次不一样了。以前我想过写公众号,想过做视频,想过回答知乎问题,但是大多数都没有坚持下去。为此我今年反思了一下为什么以前没有持续下去?
大多数人都知道“公开表达是难却正确的事情”,但是真正的领悟者却不多,李笑来在《把时间当做朋友》一书中提到,互联网用户行为遵循 "90-9-1原则"。
不是说因为创作者特别稀疏我们采取成为创作者的,而是这其中有巨大的好处。
那么后面我应该怎么做呢?
去年换工作之后,高强度的工作了一段时间,加上和岗位、老板的风格不是很适应,没干多久就感觉天天精疲力尽的,很快就想要辞职,但是迫于职业生涯的延续性,我自然是不敢真的就裸辞,因此在苦苦坚持,想要寻求一些方法延续自己的职业生涯,比如:自我鼓励——《工作,再坚持坚持》,理性分析——《如何在大厂工作六个月以上且保持一定的心理健康?》,只能说收效甚微。毕竟饿了就想吃饭,累了就会想休息,天经地义。
差不多待一年之后,就开始考虑活水转岗 or 换工作,不过深圳的就业机会还是较少,思来想去还是觉得活水合适一些。然后开始内部看一些机会,这个时候又涉及到去干什么业务的问题,所以很纠结到底干什么?继续和在腾讯一样做广告相关的业务,还是去做搜索,还是去做推荐,还是去做纯AIGC的业务,还是去做NLP相关的业务,最后兜兜转转又到了与我最有缘分的客服。
活水可能也算是一次跳槽吧,毕竟也要经过3轮技术面试,因此我基本把它当作全新的工作,工作方式也适当地做出一些改变。工作上一直在向表现好的大佬学习,希望明年工作上有一些突破。
2024-12-09 06:00:00
nums_key_value_head
参数就可nums_key_value_head
设置等于 1 就是 MQAnums_key_value_head
设置等于 nums_head
就是 MHA如果不喜欢看文字的同学可以查看 B站 或者 YouTube 视频。
备注:也可以直接由 GQA 中修改参数得到。但是本代码更完整一些
import math
import torch
import torch.nn as nn
class MultiHeadAttention(nn.Module):
def __init__(self, hidden_dim, nums_head) -> None:
super().__init__()
self.nums_head = nums_head
# 一般来说,
self.head_dim = hidden_dim // nums_head
self.hidden_dim = hidden_dim
# 一般默认有 bias,需要时刻主意,hidden_dim = head_dim * nums_head,所以最终是可以算成是 n 个矩阵
self.q_proj = nn.Linear(hidden_dim, hidden_dim)
self.k_proj = nn.Linear(hidden_dim, hidden_dim)
self.v_proj = nn.Linear(hidden_dim, hidden_dim)
# gpt2 和 bert 类都有,但是 llama 其实没有
self.att_dropout = nn.Dropout(0.1)
# 输出时候的 proj
self.o_proj = nn.Linear(hidden_dim, hidden_dim)
def forward(self, X, attention_mask=None):
# 需要在 mask 之前 masked_fill
# X shape is (batch, seq, hidden_dim)
# attention_mask shape is (batch, seq)
batch_size, seq_len, _ = X.size()
Q = self.q_proj(X)
K = self.k_proj(X)
V = self.v_proj(X)
# shape 变成 (batch_size, num_head, seq_len, head_dim)
q_state = Q.view(batch_size, seq_len, self.nums_head, self.head_dim).permute(
0, 2, 1, 3
)
k_state = K.view(batch_size, seq_len, self.nums_head, self.head_dim).transpose(
1, 2
)
v_state = V.view(batch_size, seq_len, self.nums_head, self.head_dim).transpose(
1, 2
)
# 主意这里需要用 head_dim,而不是 hidden_dim
attention_weight = (
q_state @ k_state.transpose(-1, -2) / math.sqrt(self.head_dim)
)
print(type(attention_mask))
if attention_mask is not None:
attention_weight = attention_weight.masked_fill(
attention_mask == 0, float("-1e20")
)
# 第四个维度 softmax
attention_weight = torch.softmax(attention_weight, dim=3)
print(attention_weight)
attention_weight = self.att_dropout(attention_weight)
output_mid = attention_weight @ v_state
# 重新变成 (batch, seq_len, num_head, head_dim)
# 这里的 contiguous() 是相当于返回一个连续内存的 tensor,一般用了 permute/tranpose 都要这么操作
# 如果后面用 Reshape 就可以不用这个 contiguous(),因为 view 只能在连续内存中操作
output_mid = output_mid.transpose(1, 2).contiguous()
# 变成 (batch, seq, hidden_dim),
output = output_mid.view(batch_size, seq_len, -1)
output = self.o_proj(output)
return output
attention_mask = (
torch.tensor(
[
[0, 1],
[0, 0],
[1, 0],
]
)
.unsqueeze(1)
.unsqueeze(2)
.expand(3, 8, 2, 2)
)
x = torch.rand(3, 2, 128)
net = MultiHeadAttention(128, 8)
net(x, attention_mask).shape
备注:以下代码省略了 attention_dropout attention_mask等情况的处理,真实实现过程中需要考虑。
import torch
import torch.nn as nn
import math
# 忽略了 attention_mask, attention_dropout;
class GroupQueryAttention(nn.Module):
def __init__(self, hidden_dim, nums_head, nums_key_value_head):
super().__init__()
assert hidden_dim % nums_head == 0 # 可以整除
assert nums_head % nums_key_value_head == 0 # N 个 query head 为一组
self.hidden_dim = hidden_dim
self.nums_head = nums_head
self.nums_key_value_head = nums_key_value_head
self.head_dim = hidden_dim // nums_head
# 初始化 qkv o
self.q_proj = nn.Linear(hidden_dim, nums_head * self.head_dim) # out feature_size (nums_head * head_dim)
# k v out shape (nums_key_value_head * head_dim)
self.k_proj = nn.Linear(hidden_dim, nums_key_value_head * self.head_dim)
self.v_proj = nn.Linear(hidden_dim, nums_key_value_head * self.head_dim)
self.o_proj = nn.Linear(hidden_dim, hidden_dim) # input_size nums_head * head_dim
def forward(self, X, attention_mask=None):
# X shape (batch, seq, hidden_dim)
batch_size, seq, _ = X.size()
# qkv projection
q = self.q_proj(X) # (batch, seq, hidden_dim)
k = self.k_proj(X)
v = self.v_proj(X)
# attention_weight 目标shape 是 (batch, nums_head, seq, seq)
q = q.view(batch_size, seq, self.nums_head, self.head_dim)
k = k.view(batch_size, seq, self.nums_key_value_head, self.head_dim)
v = v.view(batch_size, seq, self.nums_key_value_head, self.head_dim)
# 关注: nums_head 和 nums_key_value_head 的关系
q = q.transpose(1, 2) # (b, nums_head, seq, head_dim)
k = k.transpose(1, 2) # (b, nums_key_value_head, seq, head_dim)
v = v.transpose(1, 2) # (b, nums_key_value_head, seq, head_dim)
# k v repeat; (广播操作)
k = k.repeat_interleave(self.nums_head // self.nums_key_value_head, dim=1)
v = v.repeat_interleave(self.nums_head // self.nums_key_value_head, dim=1)
attention_score = (q @ k.transpose(2, 3)) / math.sqrt(self.head_dim)
attention_weight = torch.softmax(attention_score, dim=-1)
# (attention_mask 忽略) # 可以看前面的视频
output = attention_weight @ v # (b, nums_head, seq, head_dim)
# output projection 变成 (b, seq, hidden_dim)
output = output.transpose(1, 2).contiguous()
final_output = self.o_proj(output.view(batch_size, seq, -1))
return final_output
# 测试
x = torch.rand(3, 2, 128)
net = GroupQueryAttention(128, 8, 4)
net(x).shape
由于 MQA 是 GQA 的一种特殊形式,因此只要在参数设置的时候将 nums_key_value_head = 1 就是 Multi Query Self-Attention。
最后欢迎关注我,基本全网同名 chaofa用代码打点酱油