MoreRSS

site iconJiaJe | 杰哥修改

清华大学计算机系博士生。
请复制 RSS 到你的阅读器,或快速订阅到 :

Inoreader Feedly Follow Feedbin Local Reader

JiaJe | 杰哥的 RSS 预览

如何进行条件分支预测器实验

2025-04-10 08:00:00

如何进行条件分支预测器实验

背景

最近针对各种条件分支预测器(Conditional Branch Predictor)做了在各种 benchmark 上的实验,在此记录一下做这个实验的流程。

流程

说到做条件分支预测器实验,到底是做什么呢?其实就是针对未来的处理器中的条件分支预测器的设计,在提前准备好的一些 benchmark 上进行模拟,观察它的预测准确性。既然是未来的处理器,那么硬件肯定是没有的,如果直接用 RTL 去实现新的预测器,再用 RTL 仿真,结果固然准确,但这还是太复杂并且太慢了。所以在前期的时候,首先会构建一个单独的条件分支预测器的实验环境,在只考虑条件分支指令、不考虑其他指令的情况下,单纯来观察预测的效果,从而可以实现比较快速的设计迭代。

为了达成这个目的,需要:

  1. 提前准备好一些 benchmark,提取出这个 benchmark 中所涉及到的条件分支 trace,以及条件分支预测器所需要的其他信息
  2. 为了进一步缩短模拟的时间和 trace 的大小,利用 SimPoint 等技术来减少要模拟的指令条数
  3. 搭建一个条件分支预测器模拟器,在上一步提取出来的 trace 中模拟条件分支预测器的执行,从而得到结果

下面按照这个顺序,分别来讨论一下这个流程。

benchmark 准备

比较常见的 benchmark 就是 SPEC INT 2017,当然现在很多论文也会自己去寻找一些其他的 benchmark,不同的 benchmark 它的程序的特性也是不一样的,未来也可能会有新的 benchmark 出来,所以有必要了解从 benchmark 到 trace 的过程。选好了 benchmark 以后,我们需要思考怎么去生成一个 trace:为了减少后续模拟的负担,我们需要从 benchmark 中提取条件分支的执行历史,作为 groundtruth,喂给条件分支预测器,这样才能知道每次预测是否准确。当然了,网上已经有很多现成的 trace,比如 CBP Championship 比赛也都有提供自己的 benchmark trace(P.S. 2025 年的 CBP Championship 正在火热进行中),但读完本文,你应该可以尝试自己完成这个从 benchmark 到 trace 的过程。

那么第一个问题就是,怎么获取 benchmark 中分支指令执行的信息呢?首先来看一组数据,在 amd64 上,用 -O3 编译 SPEC INT 2017 的 benchmark,一共 10 个子 benchmark,加起来运行的指令数大约是 1.6e13 条,其中有大约 2.9e12 条分支指令(包括了有条件和无条件),这个数量是非常巨大的,无论是保存这些执行信息的性能开销,还是需要的存储空间,都是比较巨大的。

考虑到条件分支预测器只需要分支指令的信息,所以只考虑 2.9e12 条分支指令的部分,而不去考虑完整的 1.6e13 条指令,首先可以减少一个数量级。接着,考虑每个分支指令需要记录哪些信息:

  1. 知道每一条分支指令的地址、每一条直接分支指令的目的地址
  2. 对于执行的每一条条件分支指令,要记录它跳转与否
  3. 对于执行的每一条间接分支指令,需要记录它跳转的目的地址

其中第一点,由于条件分支指令本身是不变的(不考虑 JIT),所以只需要存一份就行。而 SPEC INT 2017 所有程序的分支指令加起来大概只有 5e4 的量级,相比 2.9e12 的执行的分支指令数可以忽略不计。第三点,由于间接分支指令通常也是比较少的,而且同一条间接分支指令的目的地址通常来说不会特别多,也有压缩的空间。那么最主要的空间来自于:

  1. 虽然条件分支指令数量不多,但是执行的条件分支指令次数很多,每一次执行的可能是不同的条件分支指令,如果要记录当前这次执行的是哪一条条件分支指令,那么这个指令的地址或者一个 id 所占用的空间会很大;如果不记录当前执行的是哪一条分支分支指令,就需要在后续处理的时候,结合可执行程序的汇编来推断,当前执行的是哪一条条件分支指令
  2. 其次就是要记录条件分支跳转与否,这一个的开销相对会小一些,只需要一个 bit

由此可以推导出不同的 trace 记录方式:

第一种方式是,遇到条件分支指令时,只记录跳转(Taken)还是不跳转(Not Taken),这种方式保存的数据量最小(平均每个分支只需要比 1 bit 略多的空间),但是后续需要结合汇编,恢复出执行的过程,更进一步还可以压缩那些 return 的目的地址等于对应的 call 指令的下一条指令的地址的情况(Indirect Transfer Compression for Returns)。Intel PT 采用的是这种方法。

第二种方式是,遇到条件分支指令时,不仅记录跳转与否,还记录它执行的是哪一条分支指令。这种方式保存的数据量稍多,假如要支持 5e4 条不同的条件分支指令,为了保存这个 id,就需要 16 位。类似地,也可以只记录跳转了的条件分支指令,那么那些没有跳转的条件分支指令,就需要后续结合汇编或者完整的条件分支指令表来恢复出来。CBP Championship 的 trace 采用的是这种方法。

第一种方法明显空间会更小,以 2.9e12 条执行的分支指令数,大概需要 300GB 的磁盘空间;第二种方法,同样的分支指令数,就需要大概 5.8TB 的磁盘空间。当然了,第二种方法存的数据可以经过无损压缩进一步缩小空间,实测压缩后大概是每分支 0.16 字节(这个数字与所跑的 benchmark 有关系,分支容易被预测的 benchmark 对应更好的压缩率,因为某种意义来说分支预测也是一种无损压缩),只比第一种方法大概每分支 0.14 字节略大。同理,第一种方法存的数据也可以经过无损压缩进一步缩小空间,达到每分支大约 0.018 字节的程度(压缩率也和分支预测准确率有关)。

在评估条件分支预测器的时候,除了知道分支本身,还需要知道执行的指令数,用于计算 MPKI 等,这个可以通过 PMU 单独统计出来,或者直接根据控制流推算出执行的指令数,例如在等长指令的 ISA 上直接用地址差除以指令长度来计算指令数,在变长指令的 ISA 上 Parse ELF 去解析控制流经过的指令:

  1. 解析 ELF,用反汇编器得到每条指令的地址,从小到大排序放到数组中
  2. 对于每个分支地址和目的地址,查询它对应的指令在指令数组中的下标,记录下来
  3. 统计指令数时,每遇到一个跳转的分支,就用当前跳转的分支的分支地址在指令数组中的下标,减去上一个跳转的分支的目的地址在指令数组中的下标,加上一,累加到指令数中

当然了,这里有一些细节,例如如果程序是 PIE,那么需要知道它加载的基地址,从而把运行时的指令地址和 ELF 对应起来;类似地,如果程序加载了 libc 等动态库,也需要知道它们的加载地址。这些信息可以在抓取指令 trace 的同时,顺带记录下来。如果想规避这个麻烦,可以使用静态编译,不过 vdso 依然会动态加载,但 vdso 内指令很少,通常可以忽略不计,可以特判忽略掉。

此外,如果分支预测器需要知道分支指令的 fallthrough 地址(例如 Path History Register),且使用的是变长指令集,还需要记录分支指令的长度。这些需求实现起来都并不复杂,也只需要占用很小的空间。

TB 级别的规模,无论是保存这些数据,还是生成这些数据,或者更进一步在这些数据上模拟条件分支预测器,都会带来很大的负担。因此,需要一个办法来减少要模拟的 trace 长度。

SimPoint

SimPoint 是解决这个问题的一个很重要的方法:它观察到了一个很重要的现象,就是这些 benchmark 其实大多数时候是在重复做相同的事情,只不过涉及到的数据不同。这也很好理解,因为很多程序里面都是循环,而循环是很有规律的,我们可以预期程序的行为在时间尺度上也会有一定的周期性。下面是 SimPoint 论文中的一个图,它记录了 gzip-graphic benchmark 的 IPC(每周期指令数,图中的实线)和 L1 数据缓存缺失率(图中的虚线)随着执行过程的变化:

可以看到比较明显的周期性,而涉及到周期性,就会想到利用周期的性质:如果在一个周期上评估它的 IPC 或者分支预测器的准确率,然后外推到其他的周期,是不是大大缩小了执行时间?SimPoint 利用这个思想,设计了如下的步骤:

  1. 首先把整个执行过程按照执行的指令数切分成很多个 slice
  2. 接着对 slice 进行聚类,使得每一个类内的 slice 的行为类似,这个类就叫做一个 phase
  3. 之后做实验的时候,只需要对每个 phase 内的一个 slice 进行实验,评估出它的 IPC 或者其他性能指标,再按照 phase 内的 slice 数量加权平均,就可以得到完整执行过程的性能指标了

这里比较核心的步骤,就是怎么对 slice 聚类?SimPoint 论文采用了机器学习的方法:针对每个 slice,统计它在不同 Basic Block 内执行的时间的比例,把这个统计数据记为 Basic Block Vector;那么聚类,就是针对那些 Basic Block Vector 相近的 Slice,进行 K-Means 算法。

由于 K-Means 算法执行的时候,需要首先知道聚出来多少个类,所以 SimPoint 枚举了若干个不同的类的个数,对每个 K-Means 聚类结果进行打分:BIC(Bayesian Information Criterion),根据打分找到一个聚类效果足够好,但是类又不是特别多的结果。

进一步为了提升聚类的性能,SimPoint 还进行了一次降维操作,把很长的 Basic Block Vector 线性映射到一个比较小的 15 维的向量上。

SimPoint 论文中展示了聚类的效果,还是很可观的:

完成聚类以后,SimPoint 的输出就是若干个 phase,每一个类对应一个 phase,每个 phase 包括:

  1. 权重:权重就是这个类中 slice 的个数
  2. 代表这个 phase 的一个 slice 的信息,例如它是从第几条指令开始到第几条指令

完成 SimPoint 算法后,得到的 trace 长度大大减小,例如一段原始的长为 1e10 条指令的 trace,以 3e7 条指令为一个 slice,聚类以后,只剩下 10 个 phase,那么需要模拟和保存的 trace 长度只剩下了 3e8 条指令。SimPoint 聚类整个流程的性能大概在每秒 2e8 条分支指令的量级。

回顾前面提到的完整的 SPEC INT 2017 的量级:2.9e12 条执行的分支指令数,经过 SimPoint 处理后,假如每个子 benchmark 拆分成 15 个长度为 1e8 条指令的 phase,那么最终可能只需要 3e10 条指令,这就是一个比较好处理的大小了,以单核每秒模拟 1e7 条分支指令的速度,完整跑一次条件分支预测器实验,可能只需要几十分钟的时间,再加上多核,可以进一步缩短到几分钟。

trace 抓取

刚才讨论了很多 trace 的大小以及如何用 SimPoint 压缩空间,那么这个 trace 到底怎么抓取呢?主要有两种方法:

  1. 基于硬件已有的 trace,比如 Intel PT,但需要注意,Intel PT 是可能丢失历史的,虽然比例比较小;为了避免丢失历史,建议设置 sysctl kernel.perf_event_paranoid=-1(或者用 root 权限来运行 perf record,即绕过 mlock limit after perf_event_mlock_kb 的限制)来扩大 Intel PT 使用的 buffer 大小,从 32KB 扩大到 1MB(参考 pt_perf),在大小核机器上还要绑定到一个大核上
  2. 基于软件的 Binary Instrumentation,即针对分支指令插桩,比如 Pin、DynamoRIO 甚至 QEMU

第一种方法性能是最好的,运行开销比较小,耗费 1.4x 的时间,但是后续处理也比较费劲一些,此外比较依赖平台,ARM 上虽然也有 SPE,但是支持的平台比较少。其他平台就不好说了。

第二种方法性能会差一些,大概会有 30-50x 的性能开销,但是一天一夜也能够把 SPEC INT 2017 跑完。实现的时候,需要注意在遇到分支的时候,首先把信息保存在内存的 buffer 中,buffer 满了再写盘;此外,为了减少磁盘空间以及写盘所耗费的 I/O 时间,可以在内存中一边生成数据一边压缩,直接把压缩好的数据写入到文件中。

实践中,可以先用 Intel PT 抓取 trace,再把 trace 转换为第二种格式,最终的抓取 + 转换的性能开销大概是 15x。大致算法如下:

  1. 遍历程序中所有的分支,按照地址从小到大保存起来在数组当中,针对那些直接分支,提前计算好从它的目的地址开始遇到的第一个分支在数组的下标
  2. 解析 perf.data 中的 Intel PT packet,提取出其中的 TNT 和 TIP packet,从程序的 entrypoint 开始,沿着 Intel PT 的 trace 重建控制流:条件分支从 TNT packet 获取方向,间接分支从 TIP packet 获取目的地址,
  3. 如果分支跳转了,就根据目的地址找到从目的地址开始遇到的下一个分支(二分查找);如果没有跳转,就直接访问数组的下一个分支
  4. 注意 RET compression 的处理:维护 call stack,如果遇到 return 的时候刚好在 TNT packet 中,且对应的 bit 是 Taken,则从 call stack 取出目的地址;一个优化是 call stack 不仅记录地址,还记录从这个地址开始遇到的下一个分支在数组的下标
  5. 重建控制流的同时,输出第二种格式的 trace,在内存中完成流式压缩

以上的这些性能开销只是在一个程序上测得的结果,不同的程序上,其性能开销也有很大的不同。

对于动态链接,perf.data 会记录 mmap event;Pin 和 DynamoRIO 都可以对 module load 事件进行插桩。动态库可以从文件系统中访问,vdso 可以从内存中导出

条件分支预测器模拟

在完成了前面的大部分步骤以后,最终就是搭建一个条件分支预测器的模拟器了。其实这一点倒是并不复杂,例如 CBP Championship 或者 ChampSim 都有现成的框架,它们也都提供了一些经典的分支预测器的实现代码,例如 TAGE-SC-L。在它们的基础上进行开发,就可以评估各种条件分支预测器的预测效果了。

实际上,除了条件分支预测器,还有很多其他的实验也可以用类似的方法构建 trace 然后运行。但条件分支预测器有个比较好的特点:它需要的状态比较简单,通常拿之前一段指令做预热即可,不需要 checkpoint;而如果要完整模拟整个处理器的执行,通常需要得到系统的整个状态,比如内存和寄存器,才能继续执行,这时候就可能需要提前把 slice 开始的状态保存下来(checkpoint),或者用一个简单的不精确的模拟器快速计算出 slice 开始的状态(fast forwarding)。

实验数据

Trace 抓取

在这里列出最终使用的 trace 格式和实验数据:

  1. trace 格式:使用第二种 trace 记录方法,每次执行 branch 记录 4 字节的信息,包括 branch id 和是否跳转,使用 zstd 进行无损压缩
  2. trace 大小和运行时间统计(GCC 12.2.0,-O3 -static 编译,在 Intel i9-14900K 上实验):
benchmark 子 benchmark 分支执行次数 trace 大小 每分支空间开销 程序直接运行时间 Pin 抓取时间 时间开销
500.perlbench_r checkspam 2.40e11 8.87 GiB 0.32 bit 59s 6334s 107x
500.perlbench_r diffmail 1.49e11 2.78 GiB 0.16 bit 33s 4615s 140x
500.perlbench_r splitmail 1.33e11 1.49 GiB 0.10 bit 31s 3385s 109x
500.perlbench_r Total 5.22e11 13.14 GiB 0.22 bit 123s 14334s 117x
502.gcc_r gcc-pp -O3 4.50e10 3.28 GiB 0.63 bit 17s 1625s 96x
502.gcc_r gcc-pp -O2 5.37e10 3.46 GiB 0.55 bit 20s 1930s 97x
502.gcc_r gcc-smaller 5.51e10 2.84 GiB 0.44 bit 21s 1830s 87x
502.gcc_r ref32 -O5 4.22e10 1.20 GiB 0.24 bit 16s 1369s 86x
502.gcc_r ref32 -O3 4.80e10 1.50 GiB 0.27 bit 24s 2209s 92x
502.gcc_r Total 2.44e11 12.24 GiB 0.43 bit 98s 8963s 91x
505.mcf_r N/A 2.21e11 31.0 GiB 1.20 bit 168s 4800s 29x
520.omnetpp_r N/A 2.15e11 13.3 GiB 0.53 bit 135s 7289s 54x
523.xalancbmk_r N/A 3.27e11 4.45 GiB 0.12 bit 112s 8883s 79x
525.x264_r pass 1 1.44e10 579 MiB 0.34 bit 14s 348s 25x
525.x264_r pass 2 4.42e10 2.30 GiB 0.45 bit 39s 1202s 31x
525.x264_r seek 500 4.78e10 2.77 GiB 0.50 bit 41s 1258s 31x
525.x264_r Total 1.06e11 5.64 GiB 0.46 bit 94s 2808s 30x
531.deepsjeng_r N/A 2.74e11 31.6 GiB 0.99 bit 140s 8093s 58x
541.leela_r N/A 3.38e11 75.6 GiB 1.92 bit 224s 8894s 40x
548.exchange2_r N/A 3.01e11 26.3 GiB 0.75 bit 88s 6753s 77x
557.xz_r cld 5.08e10 9.16 GiB 1.55 bit 60s 1252s 21x
557.xz_r cpu2006docs 1.84e11 7.80 GiB 0.36 bit 65s 3923s 60x
557.xz_r input 7.96e10 10.5 GiB 1.14 bit 55s 1842s 33x
557.xz_r Total 3.14e11 27.5 GiB 0.75 bit 180s 7017s 39x
Total N/A 2.86e12 241 GiB 0.72 bit 1362s 77834s 57x

每分支的空间开销和在 i9-14900K 上测得的 MPKI(Mispredictions Per Kilo Instructions)有比较明显的正相关性:

benchmark MPKI 每分支空间开销
523.xalancbmk_r 0.84 0.12 bit
500.perlbench_r 0.95 0.22 bit
525.x264_r 1.06 0.46 bit
548.exchange2_r 2.66 0.75 bit
502.gcc_r 3.16 0.43 bit
531.deepsjeng_r 4.35 0.99 bit
520.omnetpp_r 4.47 0.53 bit
557.xz_r 5.35 0.75 bit
541.leela_r 12.61 1.92 bit
505.mcf_r 13.24 1.20 bit

由于 LTO 对分支数量的影响较大,额外对比了 -O3-O3 -flto 的区别:

benchmark 子 benchmark O3 分支执行次数 O3 trace 大小 O3+LTO 分支执行次数 O3+LTO trace 大小
500.perlbench_r checkspam 2.40e11 8.87 GiB 2.32e11 8.77 GiB
500.perlbench_r diffmail 1.49e11 2.78 GiB 1.45e11 2.70 GiB
500.perlbench_r splitmail 1.33e11 1.49 GiB 1.31e11 1.48 GiB
500.perlbench_r Total 5.22e11 13.14 GiB 5.08e11 12.95 GiB
502.gcc_r gcc-pp -O3 4.50e10 3.28 GiB 4.27e10 3.20 GiB
502.gcc_r gcc-pp -O2 5.37e10 3.46 GiB 5.10e10 3.38 GiB
502.gcc_r gcc-smaller 5.51e10 2.84 GiB 5.31e10 2.78 GiB
502.gcc_r ref32 -O5 4.22e10 1.20 GiB 4.05e10 1.17 GiB
502.gcc_r ref32 -O3 4.80e10 1.50 GiB 4.58e10 1.45 GiB
502.gcc_r Total 2.44e11 12.24 GiB 2.33e11 11.98 GiB
505.mcf_r N/A 2.21e11 31.0 GiB 1.62e11 26.6 GiB
520.omnetpp_r N/A 2.15e11 13.3 GiB 1.97e11 13.2 GiB
523.xalancbmk_r N/A 3.27e11 4.45 GiB 3.16e11 4.37 GiB
525.x264_r pass 1 1.44e10 579 MiB 1.43e10 575 MiB
525.x264_r pass 2 4.42e10 2.30 GiB 4.42e10 2.29 GiB
525.x264_r seek 500 4.78e10 2.77 GiB 4.77e10 2.76 GiB
525.x264_r Total 1.06e11 5.64 GiB 1.06e11 5.61 GiB
531.deepsjeng_r N/A 2.74e11 31.6 GiB 2.13e11 29.1 GiB
541.leela_r N/A 3.38e11 75.6 GiB 2.61e11 72.3 GiB
548.exchange2_r N/A 3.01e11 26.3 GiB 3.02e11 26.2 GiB
557.xz_r cld 5.08e10 9.16 GiB 5.07e10 9.12 GiB
557.xz_r cpu2006docs 1.84e11 7.80 GiB 1.84e11 7.78 GiB
557.xz_r input 7.96e10 10.5 GiB 7.94e10 10.5 GiB
557.xz_r Total 3.14e11 27.5 GiB 3.14e11 27.4 GiB
Total N/A 2.86e12 241 GiB 2.61e12 230 GiB

LTO 去掉了 9% 的分支指令,对整体性能的影响还是蛮大的,MPKI 的数值也有明显的变化。

时间开销

各个步骤需要耗费的时间(单线程):

  1. 抓取 trace:约 20 小时
  2. 运行 SimPoint: 约 4 小时
  3. 模拟分支预测:约 1 小时

SimPoint

考虑到(子)benchmark 之间没有依赖关系,可以同时进行多个 trace/simpoint/simulate 操作,不过考虑到内存占用和硬盘 I/O 压力,实际的并行性也没有那么高。

跑出来的 SimPoint 聚类可视化中效果比较好的,图中横轴是按执行顺序的 SimPoint slice,纵轴每一个 y 值对应一个 SimPoint phase,图中的点代表哪个 slice 被归到了哪个 phase 上:

500.perlbench_r diffmail:

520.omnetpp_r:

557.xz_r input:

以 1e8 条指令的粒度切分 SimPoint,把 241 GB 的 trace 缩减到 415 MB 的规模。

分支预测器模拟

使用 CBP 2016 的 Andre Seznec TAGE-SC-L 8KB/64KB 的分支预测器在 SimPoint 上模拟 SPEC INT 2017 Rate-1,只需要 9-10 分钟。

使用 CBP 2016 的 Andre Seznec TAGE-SC-L 8KB/64KB 的分支预测器在 SimPoint 上模拟的 CMPKI(只考虑了方向分支),和 Intel i9-14900K 的 MPKI(考虑了所有分支)在 SPEC INT 2017 Rate-1(AMD64,-O3)的比较:

benchmark TAGE-SC-L 8KB TAGE-SC-L 64KB i9-14900K
500.perlbench_r 1.00 0.72 0.95
502.gcc_r 4.69 3.36 3.16
505.mcf_r 13.02 12.23 13.24
520.omnetpp_r 4.09 3.49 4.47
523.xalancbmk_r 0.85 0.68 0.84
525.x264_r 0.76 0.59 1.06
531.deepsjeng_r 4.59 3.45 4.35
541.leela_r 11.79 9.42 12.61
548.exchange2_r 2.96 1.25 2.66
557.xz_r 4.68 4.06 5.35
average 4.84 3.925 4.87

不同编译器、编译选项和指令集下 SPEC INT 2017 的指令数和分支指令数规模

ISA Flags Compiler # Inst # Branch Inst % Branch Inst
AMD64 O3 LLVM 20.1.5 2.10e+13 3.11e+12 14.8%
AMD64 O3 LLVM 18.1.8 1.99e+13 3.17e+12 14.8%
AMD64 O3 LLVM 19.1.4 1.99e+13 3.13e+12 15.7%
AMD64 O3 GCC 11.3 1.69e+13 2.88e+12 17.0%
AMD64 O3 GCC 12.2 1.67e+13 2.88e+12 17.2%
AMD64 O3+LTO GCC 12.2 1.57e+13 2.63e+12 16.8%
AMD64 O3+LTO+JEMALLOC GCC 12.2 1.57e+13 2.62e+12 16.7%
ARM64 O3 GCC 12.2 1.62e+13 2.83e+12 17.5%
ARM64 O3+LTO GCC 12.2 1.53e+13 2.59e+12 16.9%
ARM64 O3+LTO+JEMALLOC GCC 12.2 1.52e+13 2.57e+12 16.9%
LoongArch O3 GCC 14.2 1.75e+13 2.86e+12 16.3%
LoongArch O3+LTO GCC 14.2 1.66e+13 2.61e+12 15.7%
LoongArch O3+LTO+JEMALLOC GCC 14.2 1.65e+13 2.61e+12 15.8%

基于 Pin 开发 branch trace 工具

下面给出如何用 Pin 实现一个 branch trace 工具的过程:

  1. 参考 Pin 的样例代码,Instrument 每一条指令,如果它是分支指令,就记录它的指令地址、目的地址、指令长度、分支类型以及是否跳转的信息:

    VOID Instruction(INS ins, VOID *v) { if (INS_IsControlFlow(ins)) { UINT32 size = INS_Size(ins); enum branch_type type = /* omitted */; INS_InsertCall(ins, IPOINT_BEFORE, (AFUNPTR)RecordBranch, IARG_INST_PTR, IARG_BRANCH_TARGET_ADDR, IARG_UINT32, size, IARG_UINT32, type, IARG_BRANCH_TAKEN, IARG_END); }}int main(int argc, char *argv[]) { if (PIN_Init(argc, argv)) return Usage(); INS_AddInstrumentFunction(Instruction, 0); PIN_StartProgram(); return 0;}
  2. 记录分支的时候,分别维护分支的数组和分支执行事件的数组,然后对于每次执行的分支,记录分支的 id 以及是否跳转的信息,到分支执行事件的数组当中:

    VOID RecordBranch(VOID *inst_addr, VOID *targ_addr, UINT32 inst_length, UINT32 type, BOOL taken) { struct branch br; br.inst_addr = (uint64_t)inst_addr; br.targ_addr = (uint64_t)targ_addr; br.inst_length = inst_length; br.type = (branch_type)type; struct entry e; e.taken = taken; // insert branch if not exists auto it = br_map.find(br); if (it == br_map.end()) { assert(num_brs < MAX_BRS); br_map[br] = num_brs; e.br_index = num_brs; brs[num_brs++] = br; } else { e.br_index = it->second; } if (buffer_size == BUFFER_SIZE) { // omitted } write_buffer[buffer_size++] = e; num_entries++;}
  3. 为了减少磁盘空间,当缓冲区满的时候,首先经过 zstd 的流压缩,再把压缩后的内容写入到文件中:

    // https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.cZSTD_EndDirective mode = ZSTD_e_continue;ZSTD_inBuffer input = {write_buffer, sizeof(write_buffer), 0};int finished;do { ZSTD_outBuffer output = {zstd_output_buffer, zstd_output_buffer_size, 0}; size_t remaining = ZSTD_compressStream2(zstd_cctx, &output, &input, mode); assert(!ZSTD_isError(remaining)); assert(fwrite(zstd_output_buffer, 1, output.pos, trace) == output.pos); finished = input.pos == input.size;} while (!finished);
  4. 最后在程序结束时,把分支数组写入到文件并记录元数据:

    VOID Fini(INT32 code, VOID *v) { // 1. compress the remaining data in write buffer and write to file // 2. save metadata, including: // 1. number of branch executions // 2. number of branches // 3. branches array}int main(int argc, char *argv[]) { // omitted PIN_AddFiniFunction(Fini, 0); PIN_StartProgram(); // omitted}
  5. 针对动态链接,可以利用 Pin 已有的 API IMG_AddInstrumentFunction 来跟踪,方便后续找到 trace 中各个地址对应的指令

这样就完成了 Branch Trace 的抓取。

Thread Local Storage (TLS) 实现探究

2025-04-07 08:00:00

Thread Local Storage (TLS) 实现探究

背景

TLS 是 thread local storage 的缩写,可以很方便地存储一些 per-thread 的数据,但它内部是怎么实现的呢?本文对 glibc 2.31 版本的 TLS 实现进行探究。

__thread

首先来看 TLS 在 C 中是怎么使用的:用 __thread 标记一个全局变量(注:进入 C11/C++11 标准的用法是用 thread_local 来标记),那么它就会保存在 TLS 当中,每个线程都有一份:

__thread int data;

那么编译器在生成访问这个 TLS 全局变量时,生成的指令也不同。以下面的代码为例:

int global_data;__thread int tls_data;void global(int val) { global_data = val; }void tls(int val) { tls_data = val; }

生成的 amd64 汇编如下:

 .textglobal: movl %edi, global_data(%rip) rettls: movl %edi, %fs:tls_data@tpoff ret .section .tbss,"awT",@nobitstls_data: .zero 4 .bssglobal_data: .zero 4

访问全局变量的时候,采用的是典型的 PC-relative 方式来找到全局变量 global_data 的地址;访问 thread local 变量的时候,可以看到它采用了一个比较少见的写法:%fs:tls_data@tpoff,它的意思是由链接器计算出 tls_data 相对 %fs 段寄存器的偏移,然后直接写到指令的偏移里。链接以上程序,可以看到最终的二进制是:

0000000000001140 <global>: 1140: 89 3d ce 2e 00 00 mov %edi,0x2ece(%rip) # 4014 <global_data> 1146: c3 ret 1147: 66 0f 1f 84 00 00 00 nopw 0x0(%rax,%rax,1) 114e: 00 000000000000001150 <tls>: 1150: 64 89 3c 25 fc ff ff mov %edi,%fs:0xfffffffffffffffc 1157: ff 1158: c3 ret

可见最终 global_data 被放到了相对二进制开头 0x4014 的地方,而 tls_data 被放到了 %fs:-0x4 的位置。那么这个 %fs 是怎么得到的,-0x4 的偏移又是怎么计算的呢?下面来进一步研究背后的实现。

TLS 的组织方式

首先 TLS 是 per-thread 的存储,意味着每个新线程,都有一个 buffer 需要保存 TLS 的数据。那么这个数据所在的位置,也需要一些 per-thread 的高效方式来访问,在 amd64 上,它是通过 %fs 段寄存器来维护的。那么 TLS 可能有哪些来源呢?首先可执行程序自己可能会用一些,它通过 DT_NEEDED 由动态链接器在启动时加载的动态库也有一些(比如 glibc 的 tcache),此外运行时 dlopen 了一些动态库也会有 TLS 的需求。为了满足这些需求,需要设计一个 TLS 的结构,既能满足这些在启动时已知的可执行程序和动态库的需求,又能满足运行时动态加载的新动态库的需求。

这里面可执行程序和启动时加载的动态库的需求是明确的,不会变的,因此可以由动态链接器在加载的时候,直接给可执行程序和动态库分配 TLS 空间:

  1. 比如可执行程序本身需要 0x10 字节的 TLS 空间,它启动时加载两个动态库 libc.so.6 和 libstdc++.so.6,期中 libc.so.6 需要 0x20 字节的 TLS 空间,libstdc++.so.6 需要 0x30 字节的 TLS 空间
  2. 加起来一共需要 0x60 字节的 TLS 空间,那么在创建线程的时候,创建好 0x60 字节的 TLS 空间,按照顺序进行分配:
    1. 0x00-0x10: 属于可执行程序
    2. 0x10-0x30: 属于 libc.so.6
    3. 0x30-0x60: 属于 libstdc++.so.6
  3. 分配好这个空间以后,因为 libc.so.6 无法提前预知它会被分配到哪个位置,所以需要一次重定位,把 libc.so.6 里的 TLS 空间的使用重定位到分配后的位置,例如 libc.so.6 的 0x20 的 TLS 空间内的开头 8 字节,现在在整个 TLS 空间内的偏移就是 0x20 + 8 = 0x28

但是 dlopen 动态加载进来的动态库怎么办呢?这些动态库的数量可以动态变化,可以加载也可以卸载,再这么线性分配就不合适了,这时候就需要给每个 dlopen 得到的动态库分配独立的 TLS 空间。既然是动态分配的空间,那么这些独立的 TLS 空间的地址,不同线程不同,不能通过一个基地址加固定偏移的方式来计算,就需要提供一个机制来找到各个动态库的 TLS 空间的地址。

glibc 的实现中,它把各个动态库的 TLS 空间的起始地址记录在一个 dtv 数组中,并且提供一个 __tls_get_addr 函数来查询动态库的 TLS 空间内指定 offset 的实际地址:

typedef struct dl_tls_index{ uint64_t ti_module; uint64_t ti_offset;} tls_index;void *__tls_get_addr (tls_index *ti){ dtv_t *dtv = THREAD_DTV (); /* omitted */ void *p = dtv[ti->ti_module].pointer.val; /* omitted */ return (char *) p + ti->ti_offset;}

其中 dtv 数组的指针保存在 struct pthread(即 Thread Control Block (TCB))中,而这个 struct pthread 就保存在 %fs 段寄存器指向的地址上:

struct pthread{ tcbhead_t header; /* omitted */};typedef struct{ /* omitted */ dtv_t *dtv; /* omitted */ uintptr_t stack_guard; /* omitted */} tcbhead_t;# define THREAD_DTV() \ ({ struct pthread *__pd; \ THREAD_GETMEM (__pd, header.dtv); })

P.S. stack protector 所使用的 canary 的值就保存在 pthread.header.stack_guard 字段中,也就是在 %fs:40 位置。

而之前提到的可执行程序本身的 TLS 空间以及程序启动时加载的动态库的 TLS 空间,实际上是保存在 struct thread 也就是 TCB 前面的部分,从高地址往低地址分配(图片来源:ELF Handling For Thread-Local Storage):

图中 \(tp_t\) 在 amd64 下就是 %fs 段寄存器,它直接指向的就是 struct thread 也就是 TCB;从 %fs 开始往低地址,先分配可执行程序本身的 TLS 空间(图中 \(tlsoffset_1\)\(tp_t\) 的范围),后分配程序启动时加载的动态库的 TLS 空间(图中 \(tlsoffset_1\)\(tlsoffset_2\) 以及 \(tlsoffset_3\)\(tlsoffset_2\) 的范围)。注意这些偏移对于每个线程都是相同的,只是不同线程的 %fs 寄存器不同。

而对于 dlopen 动态加载的动态库,则 TLS 空间需要动态分配,然后通过 dtv 数组来索引(图中 \(dtv_{t,4}\)\(dtv_{t,5}\)),因此无法通过重定位修正,而是要在运行时通过 __tls_get_addr 函数获取地址。为了让 __tls_get_addr 更具有通用性,dtv 数组也记录了分配在 %fs 指向的 TCB 更低地址的可执行程序和程序启动时加载的动态库的 TLS 空间,此时 __tls_get_addr 可以查到所有 TLS 变量的地址。每个动态库在 dtv 数组中都记录了信息,那么这个动态库在 dtv 数组中的下标,记为这个动态库的编号(module id),后面会多次出现这个概念。

知道了 TLS 的组织方式后,接下来观察编译器、链接器和动态链接器是如何配合着让代码可以找到正确的 TLS 变量的地址。

可执行程序

首先来看一个简单的场景:可执行程序直接访问自己定义的 TLS 变量。前面提到,可执行程序的 TLS 空间直接保存到 %fs 往下的地址,因此可执行程序的 TLS 变量相对 %fs 的偏移,是可以提前计算得到的。下面看一个例子:

__thread int tls_data1;__thread int tls_data2;int read_tls_data1() { return tls_data1; }int read_tls_data2() { return tls_data2; }int main() {}

编译得到的汇编:

read_tls_data1: movl %fs:tls_data1@tpoff, %eax retread_tls_data2: movl %fs:tls_data2@tpoff, %eax ret

这就是在本文一开头就看到的语法:%fs:symbol@tpoff,它会对应一个 R_X86_64_TPOFF32 类型的重定位,告诉链接器,这是一个 TLS 变量,并且它的偏移在静态链接的时候就可以计算出来,并且这个偏移会直接写到 mov 指令的立即数内:

$ gcc -O2 -c tls.c -o tls.o$ objdump -S -r tls.o# omitted0000000000000000 <read_tls_data1>: 0: 64 8b 04 25 00 00 00 mov %fs:0x0,%eax 7: 00 4: R_X86_64_TPOFF32 tls_data1 8: c3 ret 9: 0f 1f 80 00 00 00 00 nopl 0x0(%rax)0000000000000010 <read_tls_data2>: 10: 64 8b 04 25 00 00 00 mov %fs:0x0,%eax 17: 00 14: R_X86_64_TPOFF32 tls_data2 18: c3 ret$ gcc tls.o -o tls$ objdump -S tls# omitted0000000000001140 <read_tls_data1>: 1140: 64 8b 04 25 fc ff ff mov %fs:0xfffffffffffffffc,%eax 1147: ff 1148: c3 ret 1149: 0f 1f 80 00 00 00 00 nopl 0x0(%rax)0000000000001150 <read_tls_data2>: 1150: 64 8b 04 25 f8 ff ff mov %fs:0xfffffffffffffff8,%eax 1157: ff 1158: c3 ret$ objdump -t tls# omitted0000000000000000 g .tbss 0000000000000004 tls_data20000000000000004 g .tbss 0000000000000004 tls_data1

根据以上输出可以看到,可执行程序自己使用了 8 字节的 TLS 空间,其中低 4 字节对应 tls_data2,高 4 字节对应 tls_data1;根据这个信息,链接器就可以推断出 tls_data2 保存在 %fs-0x8 的位置,tls_data1 保存在 %fs-0x4 的位置,直接把这个偏移编码到 mov 指令内。这样,运行时开销是最小的。

这一种访问 TLS 的情况,也叫做 local exec TLS model:它只用于可执行程序访问可执行程序自己的 TLS 变量的场景。可执行程序的 TLS 空间总是紧贴着 %fs 分配,不会受到动态库的影响,因此可以提前计算出它自己的 TLS 变量的偏移。

动态库

接下来观察另一种情况:动态库使用动态库自己的 TLS 变量。按照前面的分析,有两种情况:

  1. 第一种情况是,动态库是在程序启动时被动态链接器加载,那么它会被分配在 %fs 往低地址的空间。虽然相对 %fs 的偏移无法在链接阶段就提前得知,但是动态链接器会给它分配连续的 TLS 空间,从而可以计算出它的 TLS 空间相对 %fs 的偏移,于是动态链接器可以帮助完成剩下的重定位。
  2. 第二种情况是,动态库是由 dlopen 被加载,那么它被分配的 TLS 空间的地址就无法从 %fs 直接计算得出,此时就需要借助 __tls_get_addr 函数的帮助。

initial exec TLS model

首先来看第一种情况,它也被叫做 initial exec TLS model。还是从例子开始看起:

__thread int tls_data1;__thread int tls_data2;int read_tls_data1() { return tls_data1; }int read_tls_data2() { return tls_data2; }

首先观察编译出来的汇编:

$ gcc -ftls-model=initial-exec -fPIC -O2 -S tls.c$ cat tls.sread_tls_data1: movq tls_data1@gottpoff(%rip), %rax movl %fs:(%rax), %eax retread_tls_data2: movq tls_data2@gottpoff(%rip), %rax movl %fs:(%rax), %eax ret

可以看到,这次生成的汇编不同了:它首先从 symbol@gottpoff(%rip) 读取一个 offset 到 %rax 寄存器,再从 %fs:(%rax) 地址读取 TLS 变量的值。上面提到,在 initial exec TLS model 下,TLS 空间是可以相对 %fs 寻址的,但是 offset 无法提前得知,需要由动态链接器完成重定位。

回忆之前在《开发一个链接器(4)》一文中,当动态库想要获得某个只有动态链接器才知道的地址,就会把它预留好位置放到 .got 表当中,并且输出一个 dynamic relocation,告诉动态链接器如何把地址计算出来并填进去。在这里,原理也是类似的,只不过是在 .got 表中预留了一个空间来保存 TLS 变量相对 %fs 的偏移。下面观察对象文件内是怎么记录这个信息的:

$ as tls.s -o tls.o$ objdump -S -r tls.o# omitted0000000000000000 <read_tls_data1>: 0: 48 8b 05 00 00 00 00 mov 0x0(%rip),%rax # 7 <read_tls_data1+0x7> 3: R_X86_64_GOTTPOFF tls_data1-0x4 7: 64 8b 00 mov %fs:(%rax),%eax a: c3 ret b: 0f 1f 44 00 00 nopl 0x0(%rax,%rax,1)0000000000000010 <read_tls_data2>: 10: 48 8b 05 00 00 00 00 mov 0x0(%rip),%rax # 17 <read_tls_data2+0x7> 13: R_X86_64_GOTTPOFF tls_data2-0x4 17: 64 8b 00 mov %fs:(%rax),%eax 1a: c3 ret

可以看到,这时候它在 mov 指令的立即数位置创建了一个 R_X86_64_GOTTPOFF 类型的重定位,这是告诉链接器:创建一个 .got entry,里面由动态链接器填写对应 symbol 在运行时相对 %fs 的偏移,然后链接器把 .got entry 相对 mov 指令的偏移写到 mov 指令的立即数内。

至于为啥是 symbol-0x4 而不是 symbol,原因在之前《开发一个链接器(2)》 已经出现过:x86 指令的立即数偏移是基于指令结尾的,而 relocation 指向的是立即数的起始地址,也就是指令结尾地址减去 4,那么立即数也要做相应的修正。

最后,观察链接器做的事情:

$ gcc -shared tls.o -o libtls.so$ objdump -D -S -R libtls.so# omittedDisassembly of section .text:0000000000001100 <read_tls_data1>: 1100: 48 8b 05 d1 2e 00 00 mov 0x2ed1(%rip),%rax # 3fd8 <tls_data1+0x3fd4> 1107: 64 8b 00 mov %fs:(%rax),%eax 110a: c3 ret 110b: 0f 1f 44 00 00 nopl 0x0(%rax,%rax,1)0000000000001110 <read_tls_data2>: 1110: 48 8b 05 a9 2e 00 00 mov 0x2ea9(%rip),%rax # 3fc0 <tls_data2+0x3fc0> 1117: 64 8b 00 mov %fs:(%rax),%eax 111a: c3 retDisassembly of section .got:0000000000003fb8 <.got>: ... 3fc0: R_X86_64_TPOFF64 tls_data2 3fd8: R_X86_64_TPOFF64 tls_data1

可以看到:

  1. 链接器为两个 TLS 变量分别创建了一个 .got entry,tls_data1 对应 0x3fd8tls_data2 对应 0x3fc0
  2. 链接器在这两个 .got entry 处创建了 dynamic relocation R_X86_64_TPOFF64,告诉动态链接器:给动态库分配空间后,把 tls_data1tls_data2 相对 %fs 的偏移写入到这两个 .got entry 内
  3. 链接器计算出了 mov 指令和 .got entry 的相对偏移,直接写到了 mov 指令的立即数当中:

    0000000000001100 <read_tls_data1>: 1100: 48 8b 05 d1 2e 00 00 mov 0x2ed1(%rip),%rax # 3fd8 <tls_data1+0x3fd4> 1107: 64 8b 00 mov %fs:(%rax),%eax 110a: c3 ret 110b: 0f 1f 44 00 00 nopl 0x0(%rax,%rax,1)0000000000001110 <read_tls_data2>: 1110: 48 8b 05 a9 2e 00 00 mov 0x2ea9(%rip),%rax # 3fc0 <tls_data2+0x3fc0> 1117: 64 8b 00 mov %fs:(%rax),%eax 111a: c3 ret
    4. 那么在运行时,为了读取 TLS 变量,首先从 .got 表读取 TLS 变量相对 %fs 的偏移写到 %rax 寄存器,再通过 %fs:(%rax) 访问 TLS 变量即可

那么这就是 initial exec TLS model 的实现方法了:它利用了动态库会在程序启动时加载的性质,保证 TLS 变量都保存在相对 %fs 的运行时可知且不变的偏移上,把偏移记录在 .got 表中,由动态链接器去计算,那么访问的时候就很简单了,直接读取 offset 从 %fs 访问即可。

local/global dynamic TLS model

接下来看动态库的第二种情况:它可能由 dlopen 加载,因此 TLS 变量相对 %fs 的位置可能会变化,此时需要通过 __tls_get_addr 函数来得到 TLS 变量的地址。回顾前面提到的 __tls_get_addr 的声明:

typedef struct dl_tls_index{ uint64_t ti_module; uint64_t ti_offset;} tls_index;void *__tls_get_addr (tls_index *ti);

即它需要两个信息,一个是 TLS 变量所在的动态库的编号(这个编号是动态生成的一个 id,实际上是这个动态库在 dtv 数组中的下标),另外是这个 TLS 变量在动态库内的偏移。这时候,又分为两种情况:

  1. 第一种情况是,这个 TLS 变量就在这个动态库本身内部定义,此时 TLS 变量在动态库内的偏移在链接期间已知,只是不知道 TLS 空间的起始地址,需要通过 __tls_get_addr 函数获取,这种情景叫做 local dynamic TLS model
  2. 第二种情况是,这个 TLS 变量不知道在哪个动态库定义,此时只知道这个 TLS 变量的名字,不知道它属于哪个动态库,也不知道它在动态库内的偏移,这种情况叫做 global dynamic TLS model,是最通用的情况,对 TLS 变量所在的位置没有任何假设

local dynamic TLS model

接下来分析 local dynamic TLS model,它面向的场景是一个可能被 dlopen 加载的动态库,需要访问自己的 TLS 变量,此时需要用 __tls_get_addr 读取自己的 TLS 空间的起始地址,根据链接时已知的偏移,计算出 TLS 变量在运行时的地址。由于 __tls_get_addr 需要知道动态库的编号,而这个编号只有动态链接器才知道,因此需要生成一个 dynamic relocation,让动态链接器把这个动态库自己的编号写入到 .got entry 中,之后才能拿这个 .got entry 的值调用 __tls_get_addr,进而得到 TLS 变量的地址。下面来观察这个过程,源码和之前一样:

__thread int tls_data1;__thread int tls_data2;int read_tls_data1() { return tls_data1; }int read_tls_data2() { return tls_data2; }

首先查看生成的汇编:

$ gcc -ftls-model=local-dynamic -fPIC -O2 -S tls.c$ cat tls.sread_tls_data1:.LFB0: subq $8, %rsp leaq tls_data1@tlsld(%rip), %rdi call __tls_get_addr@PLT movl tls_data1@dtpoff(%rax), %eax addq $8, %rsp retread_tls_data2: subq $8, %rsp leaq tls_data2@tlsld(%rip), %rdi call __tls_get_addr@PLT movl tls_data2@dtpoff(%rax), %eax addq $8, %rsp ret

首先可以看到的是一个新的语法:symbol@tlsld(%rip),生成一个 R_X86_64_TLSLD 类型的 relocation,它的意思是在 .got 表中生成一个 entry,这个 entry 会保存当前动态库对应的编号,然后在这里通过 lea 指令把这个 .got entry 的地址作为 tls_index * 类型的参数传给 __tls_get_addr,那么它就会去寻找这个动态库的 TLS 空间的起始地址,把结果写入到 %rax 寄存器内。

得到 TLS 空间的起始地址后,再利用 symbol@dtpoff(%rax) 的语法,生成 R_X86_64_DTPOFF32 类型的 relocation,在链接的时候直接把 symbol 相对自己的 TLS 空间的起始地址的偏移写到 movl 指令内,从而实现了 TLS 变量的访问。

下面观察生成的对象文件:

$ as tls.s -o tls.o$ objdump -D -r tls.oDisassembly of section .text:0000000000000000 <read_tls_data1>: 0: 48 83 ec 08 sub $0x8,%rsp 4: 48 8d 3d 00 00 00 00 lea 0x0(%rip),%rdi # b <read_tls_data1+0xb> 7: R_X86_64_TLSLD tls_data1-0x4 b: e8 00 00 00 00 call 10 <read_tls_data1+0x10> c: R_X86_64_PLT32 __tls_get_addr-0x4 10: 8b 80 00 00 00 00 mov 0x0(%rax),%eax 12: R_X86_64_DTPOFF32 tls_data1 16: 48 83 c4 08 add $0x8,%rsp 1a: c3 ret 1b: 0f 1f 44 00 00 nopl 0x0(%rax,%rax,1)0000000000000020 <read_tls_data2>: 20: 48 83 ec 08 sub $0x8,%rsp 24: 48 8d 3d 00 00 00 00 lea 0x0(%rip),%rdi # 2b <read_tls_data2+0xb> 27: R_X86_64_TLSLD tls_data2-0x4 2b: e8 00 00 00 00 call 30 <read_tls_data2+0x10> 2c: R_X86_64_PLT32 __tls_get_addr-0x4 30: 8b 80 00 00 00 00 mov 0x0(%rax),%eax 32: R_X86_64_DTPOFF32 tls_data2 36: 48 83 c4 08 add $0x8,%rsp 3a: c3 ret

可以看到,由于 __tls_get_addr 的运行时地址也是不知道的,所以就和调用其他动态库的函数一样,用已有的 PLT 机制去重定位。

接下来看最后的动态库:

$ gcc -shared tls.o -o libtls.so$ objdump -D -R libtls.so# omittedDisassembly of section .text:0000000000001110 <read_tls_data1>: 1110: 48 83 ec 08 sub $0x8,%rsp 1114: 48 8d 3d 9d 2e 00 00 lea 0x2e9d(%rip),%rdi # 3fb8 <_DYNAMIC+0x1c0> 111b: e8 10 ff ff ff call 1030 <__tls_get_addr@plt> 1120: 8b 80 04 00 00 00 mov 0x4(%rax),%eax 1126: 48 83 c4 08 add $0x8,%rsp 112a: c3 ret 112b: 0f 1f 44 00 00 nopl 0x0(%rax,%rax,1)0000000000001130 <read_tls_data2>: 1130: 48 83 ec 08 sub $0x8,%rsp 1134: 48 8d 3d 7d 2e 00 00 lea 0x2e7d(%rip),%rdi # 3fb8 <_DYNAMIC+0x1c0> 113b: e8 f0 fe ff ff call 1030 <__tls_get_addr@plt> 1140: 8b 80 00 00 00 00 mov 0x0(%rax),%eax 1146: 48 83 c4 08 add $0x8,%rsp 114a: c3 retDisassembly of section .got:0000000000003fb8 <.got>: ... 3fb8: R_X86_64_DTPMOD64 *ABS*

可以看到,无论是访问 tls_data1 还是 tls_data2,在调用 __tls_get_addr 时,使用的参数都是一样的 0x3fb8,也就是动态链接器把当前动态库的编号写进去的 .got entry。返回值就是当前动态库的 TLS 空间的基地址,把返回值加上对应的 offset(tls_data1 的偏移是 4,tls_data2 的偏移是 0,这个 offset 直接写到了 movl 指令的立即数里),就得到了 TLS 变量的地址。

特别地,如果在一个函数里访问多个当前动态库的 TLS 变量,那么 __tls_get_addr 调用是可以合并的:

__thread int tls_data1;__thread int tls_data2;int read_tls_data() { return tls_data1 + tls_data2; }

会生成如下的汇编:

$ gcc -ftls-model=local-dynamic -fPIC -O2 -S tls.c$ cat tls.sread_tls_data:.LFB0: subq $8, %rsp leaq tls_data1@tlsld(%rip), %rdi call __tls_get_addr@PLT movl tls_data1@dtpoff(%rax), %edx addl tls_data2@dtpoff(%rax), %edx addq $8, %rsp movl %edx, %eax ret

这样就减少了一次 __tls_get_addr 的调用。

global dynamic TLS model

再来介绍最后一种情况:对于一个 dlopen 的动态库,如果它要访问的 TLS 变量,只知道名字,不知道来自哪一个动态库,不知道偏移是多少。这时候,只能把全部工作交给动态链接器去做:让动态链接器根据符号,去查找符号表,找到对应的动态库和偏移,记录下来;由于涉及到动态库的编号和偏移,所以需要两个连续的 .got entry,正好对应 tls_index 结构体的两项成员:

typedef struct dl_tls_index{ uint64_t ti_module; uint64_t ti_offset;} tls_index;

继续上面的例子,这次采用 global dynamic TLS model 进行编译:

$ cat tls.c__thread int tls_data1;__thread int tls_data2;int read_tls_data1() { return tls_data1; }int read_tls_data2() { return tls_data2; }$ gcc -ftls-model=global-dynamic -fPIC -O2 -S tls.c$ cat tls.sread_tls_data1: subq $8, %rsp data16 leaq tls_data1@tlsgd(%rip), %rdi .value 0x6666 rex64 call __tls_get_addr@PLT movl (%rax), %eax addq $8, %rsp retread_tls_data2: subq $8, %rsp data16 leaq tls_data2@tlsgd(%rip), %rdi .value 0x6666 rex64 call __tls_get_addr@PLT movl (%rax), %eax addq $8, %rsp ret

这次出现了一些不一样的内容:data16.value 0x6666rex64;实际上,这些是无用的指令前缀,不影响指令的语义,但是保证了这段代码有足够的长度,方便后续链接器进行优化。除了这些奇怪的前缀,核心就是 symbol@tlsgd(%rip) 语法,它会创建 R_X86_64_TLSGD relocation,它的意思是:创建一对 .got entry,第一个 entry 对应 symbol 所在动态库的编号,第二个 entry 对应 symbol 在动态库的 TLS 空间内的偏移,这两个 entry 组成一个 tls_index 结构体;通过 leaq 指令得到这个结构体的指针,调用 __tls_get_addr,就得到了这个 TLS 变量的地址。

接下来看生成的对象文件:

$ as tls.s -o tls.o$ objdump -D -r tls.oDisassembly of section .text:0000000000000000 <read_tls_data1>: 0: 48 83 ec 08 sub $0x8,%rsp 4: 66 48 8d 3d 00 00 00 data16 lea 0x0(%rip),%rdi # c <read_tls_data1+0xc> b: 00 8: R_X86_64_TLSGD tls_data1-0x4 c: 66 66 48 e8 00 00 00 data16 data16 rex.W call 14 <read_tls_data1+0x14> 13: 00 10: R_X86_64_PLT32 __tls_get_addr-0x4 14: 8b 00 mov (%rax),%eax 16: 48 83 c4 08 add $0x8,%rsp 1a: c3 ret 1b: 0f 1f 44 00 00 nopl 0x0(%rax,%rax,1)0000000000000020 <read_tls_data2>: 20: 48 83 ec 08 sub $0x8,%rsp 24: 66 48 8d 3d 00 00 00 data16 lea 0x0(%rip),%rdi # 2c <read_tls_data2+0xc> 2b: 00 28: R_X86_64_TLSGD tls_data2-0x4 2c: 66 66 48 e8 00 00 00 data16 data16 rex.W call 34 <read_tls_data2+0x14> 33: 00 30: R_X86_64_PLT32 __tls_get_addr-0x4 34: 8b 00 mov (%rax),%eax 36: 48 83 c4 08 add $0x8,%rsp 3a: c3 ret

基本符合预期,通过 R_X86_64_TLSGD relocation 来表示意图,通过反汇编也可以看到,多余的那些修饰符是没有用的,语义上就是一条 leaq 加一条 call 指令。和之前 local dynamic TLS model 类似,__tls_get_addr 也是用已有的 PLT 机制来寻址。

最后来看生成的动态库:

$ gcc -shared tls.o -o libtls.so$ objdump -D -R libtls.so# omittedDisassembly of section .text:0000000000001110 <read_tls_data1>: 1110: 48 83 ec 08 sub $0x8,%rsp 1114: 66 48 8d 3d b4 2e 00 data16 lea 0x2eb4(%rip),%rdi # 3fd0 <tls_data1@@Base+0x3fcc> 111b: 00 111c: 66 66 48 e8 0c ff ff data16 data16 rex.W call 1030 <__tls_get_addr@plt> 1123: ff 1124: 8b 00 mov (%rax),%eax 1126: 48 83 c4 08 add $0x8,%rsp 112a: c3 ret 112b: 0f 1f 44 00 00 nopl 0x0(%rax,%rax,1)0000000000001130 <read_tls_data2>: 1130: 48 83 ec 08 sub $0x8,%rsp 1134: 66 48 8d 3d 74 2e 00 data16 lea 0x2e74(%rip),%rdi # 3fb0 <tls_data2@@Base+0x3fb0> 113b: 00 113c: 66 66 48 e8 ec fe ff data16 data16 rex.W call 1030 <__tls_get_addr@plt> 1143: ff 1144: 8b 00 mov (%rax),%eax 1146: 48 83 c4 08 add $0x8,%rsp 114a: c3 retDisassembly of section .got:0000000000003fa8 <.got>: ... 3fb0: R_X86_64_DTPMOD64 tls_data2@@Base 3fb8: R_X86_64_DTPOFF64 tls_data2@@Base 3fd0: R_X86_64_DTPMOD64 tls_data1@@Base 3fd8: R_X86_64_DTPOFF64 tls_data1@@Base

观察 .got,可以看到对于每个 TLS 变量,都生成了两个 entry:tls_data2 占用了 0x3fb00x3fb8 两个 entry,第一个对应动态库的下标(MOD 表示 Module),第二个对应偏移(OFF 表示 Offset);tls_data1 也是类似的,占用了 0x3fd00x3fd8。当动态链接器在 .got 表中准备好 tls_index 结构体后,在访问 TLS 变量时,只需要 lea + call 就可以找到 TLS 变量的地址了。

四种 TLS model 的对比

接下来进行四种 TLS model 的对比:

  1. local exec TLS model: 用于可执行程序访问自身的 TLS 变量,由于可执行程序的 TLS 空间总是紧挨着 %fs,所以自身的 TLS 变量相对 %fs 的偏移在链接时已知,可以直接计算出来,运行时开销最小
  2. initial exec TLS model: 用于在程序启动时由动态链接器自动加载的动态库访问自身的 TLS 变量,由于它的 TLS 空间相对 %fs 的偏移在加载后就是固定的,所以由动态链接器计算出各个 TLS 变量相对 %fs 的偏移,写到 .got 表中,运行时只需要读取 .got 表中记录的 offset,和 %fs 做加法就得到了变量的地址
  3. local dynamic TLS model: 用于可能被 dlopen 的动态库访问自身的 TLS 变量,由于它的 TLS 空间相对 %fs 的偏移是不确定的,所以需要用 __tls_get_addr 调用来获取自身的 TLS 空间的起始地址;为了给 __tls_get_addr 传递正确的参数,告诉这个函数自己的动态库编号是多少,在 .got 表中预留了一个 entry 让动态链接器把该动态库的编号写进去;那么运行时只需要读取 .got 表中记录的动态库编号,调用 __tls_get_addr,再和链接时已知的 offset 做加法就得到了变量的地址
  4. global dynamic TLS model: 用于通用情况下,不知道 TLS 变量属于哪个动态库,也不知道 TLS 变量在 TLS 空间内的偏移是多少,所以需要动态链接器去查询 TLS 变量属于哪个动态库,放在哪个偏移上,并且动态链接器要把这两个信息写到 .got 表中;那么运行时就要用 __tls_get_addr 调用来根据 .got 表中记录的动态库编号以及偏移来找到变量的地址

下面是一个对比表格:

Instructions GOT
local exec movq N/A
initial exec movq + addq offset
local dynamic leaq + call + leaq self module index
global dynamic leaq + call module index + offset

特别地,local dynamic TLS model 的 leaq + call 是可以复用的,所以整体来说,还是越通用的 TLS model,运行时的开销越大。

实际编程中的 TLS model

看到这里,你可能会疑惑:在编程的时候,大多数时候并没有去管 TLS model 的事情,也就是说在编译的时候并没有指定,那么这个时候会采用什么 TLS model 呢?

答案是取决于编译器和链接器会根据所能了解到的情况,选择一个最优的实现方法。在前面的例子中,都是直接定义了一个全局的 __thread 变量然后去访问它,但如果它是 static 的,会发生什么呢?如果编译的时候,没有开 -fPIC,也就是说生成的代码不会出现在动态库中,又会发生什么呢?

首先来看从编译器到汇编的这一个阶段,会采用什么样的 TLS model:

  1. 如果在编译源码的时候,没有开 -fPIC,那么生成的代码只出现在可执行程序中,这个时候编译器会直接使用 local exec TLS model,即生成 movl %fs:symbol@tpoff, %rax 的指令
  2. 如果在编译源码的时候,开了 -fPIC,那么生成的代码既可能出现在可执行程序中,也可能出现在动态库中,这时会首先默认为 global dynamic TLS model,即生成 data16 leaq symbol@tlsgd(%rip), %rdi; .value 0x6666; rex64; call __tls_get_addr@PLT; movl (%rax), %eax 指令
  3. 但如果 __thread 变量设置了 static,即使打开了 -fPIC,也保证了这个 TLS 变量一定是访问自己 TLS 空间中的,不会访问别人的,那么编译器会自动选择 local dynamic TLS model,即生成 leaq symbol@tlsld(%rip), %rdi; call__tls_get_addr@PLT; movl %symbol@dtpoff(%rax), %eax 指令

接下来观察链接的时候,会发生什么事情:

  1. 如果编译源码的时候,打开了 -fPIC 且没有用 static,如前所述,编译器会使用 global dynamic TLS model;但如果这个对象文件最后被链接到了可执行程序当中,那么链接器知道这个时候用 local exec TLS model 是性能更好的,那么它会对指令进行改写,此时之前预留的无用的指令前缀 data 16; .value 0x6666; rex64 起了作用,保证改写前后的指令序列的长度不变:

    # before linker optimizations: global dynamicdata16 leaq symbol@tlsgd(%rip), %rdi.value 0x6666rex64call __tls_get_addr@PLT# after linker optimizations: local exec# the symbol@tpoff(%rax) relocation is resolved by the linker immediatelymovq %fs:0, %raxleaq symbol@tpoff(%rax), %rax
  2. 类似地,如果编译源码的时候,打开了 -fPIC 且用了 static,如前所述,编译器会使用 local dynamic TLS model;但如果这个对象文件最后被链接到了可执行程序当中,那么链接器知道这个时候用 local exec TLS model 是性能更好的,那么它会对指令进行改写,为了保证改写前后的指令序列的长度不变,这次是在生成的汇编里加入无用的指令前缀:

    # before linker optimizations: local dynamicleaq symbol@tlsld(%rip), %rdicall __tls_get_addr@PLTmovl symbol@dtpoff(%rax), %eax# after linker optimizations: local exec# the symbol@tpoff(%rax) relocation is resolved by the linker immediately.word 0x6666.byte 0x66movq %fs:0, %raxmovl symbol@tpoff(%rax), %eax
  3. 如果编译源码的时候,打开了 -fPIC 且用了 extern 来标记 TLS 变量,由于编译器不知道这个 TLS 变量属于谁,所以编译器会使用 global dynamic TLS model;但如果这个对象文件最后被链接到了可执行程序当中,并且编译器发现这个 TLS 变量属于一个动态库,这意味着这个 TLS 变量在程序启动时会随着动态库加载而变得可用,适用 initial exec TLS model,于是链接器也会进行改写:

    # before linker optimizations: global dynamicdata16 leaq symbol@tlsgd(%rip), %rdi.value 0x6666rex64call __tls_get_addr@PLT# after linker optimizations: initial execmovq %fs:0, %raxaddq symbol@gottpoff(%rip), %rax
  4. 如果编译源码的时候,没有打开 -fPIC 且用了 extern 来标记 TLS 变量,那么编译器知道,这个对象文件最后只能出现在可执行程序中,那么这个 TLS 变量要么来自于可执行程序自己,要么来自于程序启动时加载的动态库,所以编译器会使用 initial exec TLS model;但如果这个对象文件最后被链接到了可执行程序当中,并且编译器发现这个 TLS 变量属于可执行程序自己,适用 local exec TLS model,于是链接器也会进行改写:

    # before linker optimizations: initial execmovq %fs:0, %raxaddq symbol@gottpoff(%rip), %rax# after linker optimizations: local exec# the symbol@tpoff(%rax) relocation is resolved by the linker immediatelymovq %fs:0, %raxleaq symbol@tpoff(%rax), %rax

可见通过两阶段的处理,在编译器和链接器的协同下,尝试优化到一个开销更小的 TLS model,转化的几种情况如下:

  1. global dynamic -> initial exec:编译的时候开了 -fPIC 和 extern,然后链接到可执行程序内,TLS 变量来自动态库
  2. global dynamic -> local exec:编译的时候开了 -fPIC,然后链接到可执行程序内,TLS 变量来自程序自己
  3. local dynamic -> local exec:编译的时候开了 -fPIC 和 -static,然后链接到可执行程序内,TLS 变量来自程序自己
  4. initial exec -> local exec:编译的时候没开 -fPIC,然后链接到可执行程序内,TLS 变量来自程序自己

TLSDESC

前面提到,在 global dynamic 和 local dynamic 两种 TLS model 下,要访问 TLS 变量的时候,需要调用 __tls_get_addr 函数,这是比较慢的。为了优化它,让人想到了 PLT 机制:

  1. 初始情况下,PLT 会生成一个 stub,从 .got 读取一个函数指针并跳转,这个函数指针初始情况下是执行了 stub 的下一条指令
  2. 对于第一次执行这个 stub,它会把这个函数的编号 push 到栈上,然后调用动态链接器提供的 _dl_runtime_resolve 函数来寻找这个函数的实际地址;此时 _dl_runtime_resolve 会把找到的函数地址写回到 .got 的函数指针
  3. 此后再次执行 stub 的时候,就会从 .got 读取计算好的的函数指针,直接跳转到实际的函数地址

由此可以类比得到一个针对 TLS 的类似机制,称为 TLSDESC:

  1. TLSDESC 占用 16 字节空间,前面 8 字节是一个函数指针,后面 8 字节用来保存 offset,保存在 .got 表中
  2. 把原来 local/global dynamic TLS model 对 __tls_get_addr 的调用,改成调用 TLSDESC 中的函数指针,调用时 %rax 寄存器指向了 TLSDESC 的地址,它的返回结果是 TLS 变量相对 %fs 的偏移,后续指令根据这个偏移计算出实际的地址
  3. 动态链接器在加载的时候,它会去判断目标 TLS 变量相对 %fs 的偏移是否是常量:对于可执行程序以及随着程序启动而自动加载的动态库,它们的 TLS 变量相对 %fs 的偏移是常量
  4. 如果目标 TLS 变量相对 %fs 的偏移是常量,则把这个常量写入到 .got 表中 TLSDESC 变量的 offset 的位置,然后把函数指针改写成 _dl_tlsdesc_return,它是一个很简单的实现,因为在调用这个函数时,%rax 寄存器指向了 TLSDESC 的地址,所以直接从 %rax+8 地址把 offset 读出来然后返回就可以:

    _dl_tlsdesc_return: movq 8(%rax), %rax ret
  5. 如果目标 TLS 变量相对 %fs 的偏移不是常量,则把函数指针改写成 _dl_tlsdesc_dynamic 函数,再走和之前的 __tls_get_addr 类似的逻辑,完成剩下的查找;由于返回值是 TLS 变量相对 %fs 的偏移,所以返回之前还要减去 %fs 的地址:

    /* %rax points to the TLS descriptor, such that 0(%rax) points to _dl_tlsdesc_dynamic itself, and 8(%rax) points to a struct tlsdesc_dynamic_arg object. It must return in %rax the offset between the thread pointer and the object denoted by the argument, without clobbering any registers. The assembly code that follows is a rendition of the following C code, hand-optimized a little bit.ptrdiff_t_dl_tlsdesc_dynamic (register struct tlsdesc *tdp asm ("%rax")){struct tlsdesc_dynamic_arg *td = tdp->arg;dtv_t *dtv = *(dtv_t **)((char *)__thread_pointer + DTV_OFFSET);if (__builtin_expect (td->gen_count <= dtv[0].counter && (dtv[td->tlsinfo.ti_module].pointer.val != TLS_DTV_UNALLOCATED), 1)) return dtv[td->tlsinfo.ti_module].pointer.val + td->tlsinfo.ti_offset - __thread_pointer;return __tls_get_addr_internal (&td->tlsinfo) - __thread_pointer;}*/

它利用的也是在内存中保存函数指针,通过运行时替换函数指针的方式,实现 slow path 到 fast path 的动态替换。

dtv 维护

最后再来深入分析一下 dtv 的维护方式。前面提到,dtv 的指针是保存在 struct pthread 内的,而 struct pthread 又是保存在 %fs 寄存器指向的位置:

struct pthread{ tcbhead_t header; /* omitted */};typedef struct{ /* omitted */ dtv_t *dtv; /* omitted */} tcbhead_t;

所以要访问 dtv 也很简单,直接从 %fs 加它在 struct pthread 结构体内的偏移即可。

前面提到,在调用 __tls_get_addr 时,需要提供一个动态库的 ID 来查询得到这个动态库的 TLS 空间的起始地址,再加上在这个 TLS 空间内的偏移。而这个动态库的 ID,正好就是 dtv 数组的下标,所以 __tls_get_addr 做的事情大概是:

  1. 找到 dtv 的地址:mov %fs:DTV_OFFSET, %RDX_LP
  2. __tls_get_addr 函数的参数里读取 ti_module 字段:mov TI_MODULE_OFFSET(%rdi), %RAX_LP
  3. 读取 dtv[ti->ti_module].val,也就是这个模块的 TLS 空间的起始地址:salq $4, %rax; movq (%rdx, %rax), %rax,这里左移 4 位是因为 dtv 数组的每个元素的类型是 dtv_t,其定义如下:

    struct dtv_pointer{ void *val; /* Pointer to data, or TLS_DTV_UNALLOCATED. */ void *to_free; /* Unaligned pointer, for deallocation. */};/* Type for the dtv. */typedef union dtv{ size_t counter; struct dtv_pointer pointer;} dtv_t;
    4. 把起始地址加上偏移,然后返回:add TI_OFFSET_OFFSET(%rdi), %RAX_LP; ret

但实际情况会比这个更复杂:dlopen 可能会动态引入新的动态库,此时 dtv 数组可能需要扩张;此外,如果一个动态库有 TLS 变量但是从来不用,也可以 lazy 分配它的 TLS 空间,只有在第一次访问的时候,才去分配。

首先来考虑第一个需求,处理 dlopen 导致 dtv 元素个数变化,它的实现方法是这样的:

  1. dtv[0] 不用来保存 TLS 空间的信息,而是记录一个 counter,这个 counter 记录的是当前 dtv 的版本号(generation),另外在全局变量 dl_tls_generation 中记录当前最新的版本号;当 dlopen 导致 dtv 结构发生变化时,更新 dl_tls_generation 版本,然后在 __tls_get_addr 里检查版本号,不一致则进入 slow path:

    ENTRY (__tls_get_addr) mov %fs:DTV_OFFSET, %RDX_LP mov GL_TLS_GENERATION_OFFSET+_rtld_local(%rip), %RAX_LP /* GL(dl_tls_generation) == dtv[0].counter */ cmp %RAX_LP, (%rdx) jne 1f mov TI_MODULE_OFFSET(%rdi), %RAX_LP /* dtv[ti->ti_module] */ salq $4, %rax movq (%rdx,%rax), %rax /* omitted */ add TI_OFFSET_OFFSET(%rdi), %RAX_LP ret1: /* slow path, stack alignment omitted */ call __tls_get_addr_slow ret
    2. 在 __tls_get_addr_slow 中,如果发现当前 dtv 的版本号和最新的版本号 dl_tls_generation 不一致,就调用 update_get_addr 来重新分配内存:
    void *__tls_get_addr_slow (tls_index *ti){ dtv_t *dtv = THREAD_DTV (); if (__glibc_unlikely (dtv[0].counter != GL(dl_tls_generation))) return update_get_addr (ti); return tls_get_addr_tail (ti, dtv, NULL);}

具体的 dtv 更新逻辑比较复杂,有兴趣的读者可以翻阅 glibc 的源码中 update_get_addr 函数的实现。

接下来考虑第二个需求,也就是 lazy 分配,只有在第一次访问 TLS 空间的时候,才给 dlopen 的动态库分配 TLS 空间。为了区分已分配和未分配的 TLS 空间,未分配的 TLS 空间的 val 字段的值是 TLS_DTV_UNALLOCATED,当 __tls_get_addr 检测到 TLS 空间尚未分配时,也会进入 slow path:

ENTRY (__tls_get_addr) mov %fs:DTV_OFFSET, %RDX_LP mov GL_TLS_GENERATION_OFFSET+_rtld_local(%rip), %RAX_LP /* GL(dl_tls_generation) == dtv[0].counter */ cmp %RAX_LP, (%rdx) jne 1f mov TI_MODULE_OFFSET(%rdi), %RAX_LP /* dtv[ti->ti_module] */ salq $4, %rax movq (%rdx,%rax), %rax /* branch if val == TLS_DTV_UNALLOCATED */ cmp $-1, %RAX_LP je 1f add TI_OFFSET_OFFSET(%rdi), %RAX_LP ret1: /* slow path, stack alignment omitted */ call __tls_get_addr_slow ret

在 slow path 中,最终由 allocate_dtv_entry 函数来分配这片空间,注意到 TLS 空间可能有对齐的要求,所以它实际上记录了两个地址,一个是 malloc 得到的地址(用于后续的 free 调用),一个是经过对齐后的地址:

/* Allocate one DTV entry. */static struct dtv_pointerallocate_dtv_entry (size_t alignment, size_t size){ if (powerof2 (alignment) && alignment <= _Alignof (max_align_t)) { /* The alignment is supported by malloc. */ void *ptr = malloc (size); return (struct dtv_pointer) { ptr, ptr }; } /* Emulate memalign to by manually aligning a pointer returned by malloc. First compute the size with an overflow check. */ size_t alloc_size = size + alignment; if (alloc_size < size) return (struct dtv_pointer) {}; /* Perform the allocation. This is the pointer we need to free later. */ void *start = malloc (alloc_size); if (start == NULL) return (struct dtv_pointer) {}; /* Find the aligned position within the larger allocation. */ void *aligned = (void *) roundup ((uintptr_t) start, alignment); return (struct dtv_pointer) { .val = aligned, .to_free = start };}

可见这些 lazy 分配的 TLS 空间都是放在堆上的,由 malloc 进行动态分配。而可执行程序和随着程序启动而自动加载的动态库的 TLS 空间,是随着 TCB 也就是 struct pthread 一起分配的。对于新创建的线程来说,TCB 放置在栈的顶部,而不是在堆上,所以要求大小不能动态变化,只有 dtv 数组的指针保存在 struct pthread 中,dtv 数组本身是放在堆上的,根据需要进行 malloc/realloc(见 _dl_resize_dtv 函数)。对于初始线程来说,TCB 是通过 malloc 或者 sbrk 动态分配的。

参考

glibc 内存分配器实现探究

2025-03-30 08:00:00

glibc 内存分配器实现探究

背景

malloc 和 free 日常用的很多,但它内部是怎么实现的呢?本文对 glibc 2.31 版本的内存分配器的实现进行探究。

本文的完整版内容已经整合到知识库中。

malloc

glibc 2.31 是 ubuntu 20.04 所使用的 libc 版本,首先来分析它的实现,源码可以从 glibc-2.31 tag 中找到。

首先来看 malloc 函数,它实现在 malloc/malloc.c__libc_malloc 函数当中,忽略 __malloc_hook 和一些检查,首先可以看到它有一段代码,使用了一个叫做 tcache 的数据结构:

/* int_free also calls request2size, be careful to not pad twice. */size_t tbytes;if (!checked_request2size (bytes, &tbytes)) { __set_errno (ENOMEM); return NULL; }size_t tc_idx = csize2tidx (tbytes);MAYBE_INIT_TCACHE ();DIAG_PUSH_NEEDS_COMMENT;if (tc_idx < mp_.tcache_bins && tcache && tcache->counts[tc_idx] > 0) { return tcache_get (tc_idx); }DIAG_POP_NEEDS_COMMENT;

tcache (Thread Local Cache)

接下来仔细地研究 tcache 的结构。首先,它是一个 per-thread 的数据结构,意味着每个线程都有自己的一份 tcache,不需要上锁就可以访问:

static __thread tcache_perthread_struct *tcache = NULL;

接下来看它具体保存了什么:

/* We overlay this structure on the user-data portion of a chunk when the chunk is stored in the per-thread cache. */typedef struct tcache_entry{ struct tcache_entry *next; /* This field exists to detect double frees. */ struct tcache_perthread_struct *key;} tcache_entry;/* There is one of these for each thread, which contains the per-thread cache (hence "tcache_perthread_struct"). Keeping overall size low is mildly important. Note that COUNTS and ENTRIES are redundant (we could have just counted the linked list each time), this is for performance reasons. */typedef struct tcache_perthread_struct{ uint16_t counts[TCACHE_MAX_BINS]; tcache_entry *entries[TCACHE_MAX_BINS];} tcache_perthread_struct;/* Caller must ensure that we know tc_idx is valid and there's room for more chunks. */static __always_inline voidtcache_put (mchunkptr chunk, size_t tc_idx){ tcache_entry *e = (tcache_entry *) chunk2mem (chunk); /* Mark this chunk as "in the tcache" so the test in _int_free will detect a double free. */ e->key = tcache; e->next = tcache->entries[tc_idx]; tcache->entries[tc_idx] = e; ++(tcache->counts[tc_idx]);}/* Caller must ensure that we know tc_idx is valid and there's available chunks to remove. */static __always_inline void *tcache_get (size_t tc_idx){ tcache_entry *e = tcache->entries[tc_idx]; tcache->entries[tc_idx] = e->next; --(tcache->counts[tc_idx]); e->key = NULL; return (void *) e;}

可以看到它有两个成员,把 tcache 分为 TCACHE_MAX_BINS 这么多个 bin,每个 bin 分别有一个:

  1. counts[bin]:记录了这个 bin 中空闲块的数量,tcache_put 的时候加一,tcache_get 的时候减一
  2. entries[bin]: 每个 bin 用一个链表保存了空闲块,链表的节点类型是 tcache_entry,那么 entries[bin] 保存了链表头的指针

bin 是内存分配器的一个常见做法,把要分配的块的大小分 bin,从而保证拿到的空闲块足够大。接下来看 tcache_put 是如何把空闲块放到 tcache 中的:

  1. 把空闲块强制转换为 tcache_entry 结构体类型
  2. 把它的 key 字段指向 tcache,用来表示这个空闲块当前在 tcache 当中,后续用它来检测 double free
  3. 以新的 tcache_entry 作为链表头,插入到 tcache 的对应的 bin 当中:entries[tc_idx]
  4. 更新这个 bin 的空闲块个数到 count[tc_idx] 当中

反过来,tcache_get 则是从 tcache 中拿出一个空闲块:

  1. 从链表头 entries[tc_idx] 取出一个空闲块,把它从链表中删除:entries[tc_idx] = e->next
  2. 更新这个 bin 的空闲块个数到 count[tc_idx] 当中
  3. 把它的 key 字段指向 NULL,用来表示这个空闲块当前不在 tcache 当中
  4. 返回这个空闲块的地址

malloc

接下来回到 malloc 的实现,它首先根据用户要分配的空间大小(bytes),计算出实际需要分配的大小(tbytes),和对应的 bin(tc_idx):

/* int_free also calls request2size, be careful to not pad twice. */size_t tbytes;if (!checked_request2size (bytes, &tbytes)) { __set_errno (ENOMEM); return NULL; }size_t tc_idx = csize2tidx (tbytes);

其中 checked_request2size 实现如下:

#define request2size(req) \ (((req) + SIZE_SZ + MALLOC_ALIGN_MASK < MINSIZE) ? \ MINSIZE : \ ((req) + SIZE_SZ + MALLOC_ALIGN_MASK) & ~MALLOC_ALIGN_MASK)/* Check if REQ overflows when padded and aligned and if the resulting value is less than PTRDIFF_T. Returns TRUE and the requested size or MINSIZE in case the value is less than MINSIZE on SZ or false if any of the previous check fail. */static inline boolchecked_request2size (size_t req, size_t *sz) __nonnull (1){ if (__glibc_unlikely (req > PTRDIFF_MAX)) return false; *sz = request2size (req); return true;}

它实现的实际上是把用户请求的内存大小,加上 SIZE_SZ(即 sizeof(size_t)),向上取整到 MALLOC_ALIGN_MASK 对应的 alignment(MALLOC_ALIGNMENT,通常是 2 * SIZE_SZ)的整数倍数,再和 MINSIZE 取 max。这里要加 SIZE_SZ,是因为 malloc 会维护被分配的块的一些信息,包括块的大小和一些 flag,后续会详细讨论,简单来说就是分配的实际空间会比用户请求的空间要更大。

接着,看它是如何计算出 tcache index 的:

/* When "x" is from chunksize(). */# define csize2tidx(x) (((x) - MINSIZE + MALLOC_ALIGNMENT - 1) / MALLOC_ALIGNMENT)

可以看到,从 MINSIZE 开始,以 MALLOC_ALIGNMENT 为单位,每个 bin 对应一个经过 align 以后的可能的内存块大小。得到 tcache index 后,检查对应的 bin 是否有空闲块,如果有,则直接分配:

if (tc_idx < mp_.tcache_bins && tcache && tcache->counts[tc_idx] > 0) { return tcache_get (tc_idx); }

可以看到,tcache 相当于是一个 per thread 的小缓存,记录了最近释放的内存块,可供 malloc 使用。由于 bin 的数量有限,所以比较大的内存分配不会经过 tcache。

P.S. calloc 不会使用 tcache,而是用后面提到的 _int_malloc 进行各种分配。

free

既然 malloc 用到了 tcache,自然 free 就要往里面放空闲块了,相关的代码在 _int_free 函数当中:

size_t tc_idx = csize2tidx (size);if (tcache != NULL && tc_idx < mp_.tcache_bins) { /* Check to see if it's already in the tcache. */ tcache_entry *e = (tcache_entry *) chunk2mem (p); /* This test succeeds on double free. However, we don't 100% trust it (it also matches random payload data at a 1 in 2^<size_t> chance), so verify it's not an unlikely coincidence before aborting. */ if (__glibc_unlikely (e->key == tcache)) { tcache_entry *tmp; LIBC_PROBE (memory_tcache_double_free, 2, e, tc_idx); for (tmp = tcache->entries[tc_idx]; tmp; tmp = tmp->next) if (tmp == e) malloc_printerr ("free(): double free detected in tcache 2"); /* If we get here, it was a coincidence. We've wasted a few cycles, but don't abort. */ } if (tcache->counts[tc_idx] < mp_.tcache_count) { tcache_put (p, tc_idx); return; } }

它的逻辑也不复杂:

  1. 计算 tcache index,找到对应的 bin
  2. 检查它是不是已经被 free 过了,即 double free:free 过的指针,它的 key 字段应当指向 tcache,如果实际检测到是这样,那就去 tcache 里遍历链表,检查是不是真的在里面,如果是,说明 double free 了,报错
  3. 如果对应的 bin 的链表长度不是很长(阈值是 mp_.tcache_count,取值见后),则添加到链表头部,完成 free 的过程

那么 tcache 默认情况下有多大呢:

/* We want 64 entries. This is an arbitrary limit, which tunables can reduce. */# define TCACHE_MAX_BINS 64/* This is another arbitrary limit, which tunables can change. Each tcache bin will hold at most this number of chunks. */# define TCACHE_FILL_COUNT 7

也就是说,它有 64 个 bin,每个 bin 的链表最多 7 个空闲块。

在 64 位下,这 64 个 bin 对应的块大小是从 32 字节到 1040 字节,每 16 字节一个 bin((1040 - 32) / 16 + 1 = 64)。那么,malloc(1032) 或更小的分配会经过 tcache,而 malloc(1033) 或更大的分配则不会。

实验

下面来写一段程序来观察 tcache 的行为,考虑到从链表头部插入和删除是后进先出(LIFO),相当于是一个栈,所以分配两个大小相同的块,释放后再分配相同大小的块,得到的指针的顺序应该是反过来的:

#include <stdio.h>#include <stdlib.h>int main() { void *p1 = malloc(32); void *p2 = malloc(32); free(p1); free(p2); void *p3 = malloc(32); void *p4 = malloc(32); printf("p1=%p p2=%p p3=%p p4=%p\n", p1, p2, p3, p4);}

输出如下:

p1=0x55fb2f9732a0 p2=0x55fb2f9732d0 p3=0x55fb2f9732d0 p4=0x55fb2f9732a0

结果符合预期,tcache 的内部状态变化过程如下:

  1. free(p1):p1 变成链表的头部
  2. free(p2):p2 变成链表的头部,next 指针指向 p1
  3. p3 = malloc(32): p2 是链表的头部,所以被分配给 p3,之后 p1 成为链表的头部
  4. p4 = malloc(32): p1 是链表的头部,所以被分配给 p4

如果修改分配的大小,让它们被放到不同的 bin,就不会出现顺序颠倒的情况:

#include <stdio.h>#include <stdlib.h>int main() { void *p1 = malloc(32); void *p2 = malloc(48); free(p1); free(p2); void *p3 = malloc(32); void *p4 = malloc(48); printf("p1=%p p2=%p p3=%p p4=%p\n", p1, p2, p3, p4);}

输出如下:

p1=0x5638e68db2a0 p2=0x5638e68db2d0 p3=0x5638e68db2a0 p4=0x5638e68db2d0

可以看到 p3 等于 p1,p4 等于 p2。此时 p1 和 p3 属于同一个 bin,而 p2 和 p4 属于另一个 bin。

既然我们知道了 tcache 的内部构造,我们可以写一个程序,首先得到 tcache 的地址,再打印出每次 malloc/free 之后的状态:

#include <stdint.h>#include <stdio.h>#include <stdlib.h>#define TCACHE_MAX_BINS 64typedef struct tcache_entry { struct tcache_entry *next; struct tcache_perthread_struct *key;} tcache_entry;typedef struct tcache_perthread_struct { uint16_t counts[TCACHE_MAX_BINS]; tcache_entry *entries[TCACHE_MAX_BINS];} tcache_perthread_struct;void dump_tcache(tcache_perthread_struct *tcache) { for (int i = 0; i < TCACHE_MAX_BINS; i++) { if (tcache->counts[i]) { tcache_entry *p = tcache->entries[i]; printf("tcache bin #%d: %p", i, p); p = p->next; while (p) { printf(" -> %p", p); p = p->next; } printf("\n"); } }}int main() { // leak tcache address void *p0 = malloc(128); free(p0); tcache_entry *entry = p0; tcache_perthread_struct *tcache = entry->key; printf("tcache is at %p\n", tcache); // clear tcache p0 = malloc(128); void *p1 = malloc(32); void *p2 = malloc(32); free(p1); printf("after free(p1):\n"); dump_tcache(tcache); free(p2); printf("after free(p2):\n"); dump_tcache(tcache); void *p3 = malloc(32); printf("after malloc(p3):\n"); dump_tcache(tcache); void *p4 = malloc(32); printf("after malloc(p4):\n"); dump_tcache(tcache); printf("p1=%p p2=%p p3=%p p4=%p\n", p1, p2, p3, p4);}

运行结果如下:

tcache is at 0x558f39310010after free(p1):tcache bin #1: 0x558f39310740after free(p2):tcache bin #1: 0x558f39310770 -> 0x558f39310740after malloc(p3):tcache bin #1: 0x558f39310740after malloc(p4):p1=0x558f39310740 p2=0x558f39310770 p3=0x558f39310770 p4=0x558f39310740

打印出来的结果和预期一致。

接下来继续分析 malloc 的后续代码。

回到 __libc_malloc

如果 malloc 没有命中 tcache,或者 free 没有把空闲块放到 tcache 当中,会发生什么事情呢?接下来往后看,首先是 __libc_malloc 的后续实现:

if (SINGLE_THREAD_P) { victim = _int_malloc (&main_arena, bytes); // omitted return victim; }arena_get (ar_ptr, bytes);victim = _int_malloc (ar_ptr, bytes);

这里出现了 arena 的概念:多线程情况下,为了提升性能,同时用多个 arena,每个 arena 用一把锁来保证多线程安全,从而使得多个线程可以同时从不同的 arena 中分配内存。这里先不讨论多线程的情况,先假设在单线程程序下,全局只用一个 arena:main_arena,然后从里面分配内存。接下来看 _int_malloc 的内部实现,可以看到它根据要分配的块的大小进入了不同的处理:

// in _int_mallocif ((unsigned long) (nb) <= (unsigned long) (get_max_fast ())) { // fast bin handling }if (in_smallbin_range (nb)) { // small bin handling }else { // consolidate fast bins to unsorted bins }for (;; ) { // process unsorted bins }

malloc 把空闲的块分成四种类型来保存:

  1. fast bin: 类似前面的 tcache bin,把大小相同的空闲块放到链表中,再维护多个对应不同大小的空闲块的链表头指针,采用单向链表维护
  2. small bin:small bin 也会把相同的空闲块放在链表中,但相邻的空闲块会被合并为更大的空闲块,采用双向链表维护
  3. large bin:large bin 可能保存不同大小的空闲块,采用双向链表维护
  4. unsorted bin:近期被 free 的空闲块,如果没有保存到 tcache,会被放到 unsorted bin 当中,留待后续的处理

在讨论这些 bin 的维护方式之前,首先要知道 glibc 是怎么维护块的:空闲的时候是什么布局,被分配的时候又是什么布局?

块布局

glibc 每个空闲块(chunk)对应了下面的结构体 malloc_chunk

struct malloc_chunk { INTERNAL_SIZE_T mchunk_prev_size; /* Size of previous chunk (if free). */ INTERNAL_SIZE_T mchunk_size; /* Size in bytes, including overhead. */ struct malloc_chunk* fd; /* double links -- used only if free. */ struct malloc_chunk* bk; /* Only used for large blocks: pointer to next larger size. */ struct malloc_chunk* fd_nextsize; /* double links -- used only if free. */ struct malloc_chunk* bk_nextsize;};

它的字段如下:

  1. 相邻的前一个空闲块的大小 mchunk_prev_size,记录它是为了方便找到前一个空闲块的开头,这样合并相邻的空闲块就很简单
  2. 当前空闲块的大小 mchunk_size,由于块的大小是对齐的,所以它的低位被用来记录 flag
  3. fdbk:small bin 和 large bin 需要用双向链表维护空闲块,指针就保存在这里
  4. fd_nextsizebk_next_size:large bin 需要用双向链表维护不同大小的空闲块,方便找到合适大小的空闲块

这是空闲块的内存布局,那么被分配的内存呢?被分配的内存,相当于是如下的结构:

struct { INTERNAL_SIZE_T mchunk_size; /* Size in bytes, including overhead. */ char payload[]; /* malloc() returns pointer to payload */};

也就是说,malloc() 返回的地址,等于空闲块里 fd 所在的位置。被分配的块,除了用户请求的空间以外,只有前面的 sizeof(size_t) 大小的空间是内存分配器带来的空间开销。块被释放以后,它被重新解释成 malloc_chunk 结构体(注意它们的起始地址不同,malloc_chunk 的地址是 malloc 返回的 payload 地址减去 2 * sizeof(size_t),对应 mchunk_prev_sizemchunk_size 两个字段)。事实上,mchunk_prev_size 保存在用户请求的空间的最后几个字节。内存布局如下:

 in-use chunk free chunk+-------------+ +------------------+| mchunk_size | | mchunk_size |+-------------+ +------------------+| payload | | fd || | +------------------+| | | bk || | +------------------+| | | fd_nextsize || | ---> +------------------+| | | bk_nextsize || | +------------------+| | | unused || | | || | | || | | || | +------------------+| | | mchunk_prev_size |+-------------+ +------------------+

因此为了在 payload 和 malloc_chunk 指针之间转换,代码中设计了两个宏来简化指针运算:

/* conversion from malloc headers to user pointers, and back */#define chunk2mem(p) ((void*)((char*)(p) + 2*SIZE_SZ))#define mem2chunk(mem) ((mchunkptr)((char*)(mem) - 2*SIZE_SZ))

知道了空闲块的维护方式,由于各个 bin 维护的就是这些空闲块,所以接下来分别看这几种 bin 的维护方式。

fast bin

fast bin 的维护方式和 tcache 类似,它把不同大小的空闲块按照大小分成多个 bin,每个 bin 记录在一个单向链表当中,然后用一个数组记录各种 bin 大小的链表头,这里直接用的就是 malloc_chunk 指针数组:

typedef struct malloc_chunk *mfastbinptr;struct malloc_state{ /* other fields are omitted */ /* Fastbins */ mfastbinptr fastbinsY[NFASTBINS];}

在 64 位下,默认 NFASTBINS 等于 10,计算方式如下:

  1. 最大的由 fast bin 管理的块大小等于 80 * sizeof(size_t) / 4 + sizeof(size_t) 向上取整到 16 的倍数,在 64 位机器上等于 176 字节
  2. 分配粒度从最小的 32 字节到最大的 176 字节,每 16 字节一个 bin,一共有 10 个 bin((176 - 32) / 16 + 1 = 10

不过默认情况下,fast bin 管理的块大小通过 set_max_fast(DEFAULT_MXFAST) 被限制在 DEFAULT_MXFAST 附近,这个值等于 64 * sizeof(size_t) / 4,加上 sizeof(size_t) 再向下取整到 16 的倍数,就是 128 字节。此时,只有前 7 个 bin 可以被用到(32 字节到 128 字节,每 16 字节一个 bin,(128 - 32) / 16 + 1 = 7),即 malloc(120) 或更小的分配会保存到 fast bin 中,malloc(121) 或更大的分配则不会。

malloc

分配的时候,和 tcache 类似,也是计算出 fastbin 的 index,然后去找对应的链表,如果链表非空,则从链表头取出空闲块用于分配:

#define fastbin(ar_ptr, idx) ((ar_ptr)->fastbinsY[idx])/* offset 2 to use otherwise unindexable first 2 bins */#define fastbin_index(sz) \ ((((unsigned int) (sz)) >> (SIZE_SZ == 8 ? 4 : 3)) - 2)// in _int_malloc, allocate using fastbinidx = fastbin_index (nb);mfastbinptr *fb = &fastbin (av, idx);mchunkptr pp;victim = *fb;if (victim != NULL) { if (SINGLE_THREAD_P) *fb = victim->fd; else REMOVE_FB (fb, pp, victim); if (__glibc_likely (victim != NULL)) { size_t victim_idx = fastbin_index (chunksize (victim)); if (__builtin_expect (victim_idx != idx, 0)) malloc_printerr ("malloc(): memory corruption (fast)"); check_remalloced_chunk (av, victim, nb); /* While we're here, if we see other chunks of the same size, stash them in the tcache. */ size_t tc_idx = csize2tidx (nb); if (tcache && tc_idx < mp_.tcache_bins) { mchunkptr tc_victim; /* While bin not empty and tcache not full, copy chunks. */ while (tcache->counts[tc_idx] < mp_.tcache_count && (tc_victim = *fb) != NULL) { if (SINGLE_THREAD_P) *fb = tc_victim->fd; else { REMOVE_FB (fb, pp, tc_victim); if (__glibc_unlikely (tc_victim == NULL)) break; } tcache_put (tc_victim, tc_idx); } } void *p = chunk2mem (victim); alloc_perturb (p, bytes); return p; } }

它的过程如下:

  1. 使用 fastbin_index (nb) 根据块的大小计算出 fast bin 的 index,然后 fastbin (av, idx) 对应 fast bin 的链表头指针
  2. 如果链表非空,说明可以从 fast bin 分配空闲块,此时就把链表头的结点弹出:*fb = victim->fd(单线程)或 REMOVE_FB (fb, pp, victim)(多线程);只用到了单向链表的 fd 指针,其余的字段没有用到
  3. 进行一系列的安全检查:__builtin_expectcheck_remalloced_chunk
  4. 检查 tcache 对应的 bin,如果它还没有满,就把 fast bin 链表中的元素挪到 tcache 当中
  5. 把 payload 地址通过 chunk2mem 计算出来,返回给 malloc 调用者
  6. 调用 alloc_perturb 往新分配的空间内写入垃圾数据(可选),避免泄露之前的数据

可以看到,这个过程比较简单,和 tcache 类似,只不过它从 thread local 的 tcache 改成了支持多线程的版本,同时为了支持多线程访问,使用 CAS 原子指令来更新链表头部:

#define REMOVE_FB(fb, victim, pp) \ do \ { \ victim = pp; \ if (victim == NULL) \ break; \ } \ while ((pp = catomic_compare_and_exchange_val_acq (fb, victim->fd, victim)) != victim);

正因如此,这个分配过程才能做到比较快,所以这样的分配方法叫做 fast bin。

free

接下来分析一下 free 的时候,空闲块是如何进入 fast bin 的:

// in _int_free, after tcache handlingif ((unsigned long)(size) <= (unsigned long)(get_max_fast ())) { /* check omitted */ free_perturb (chunk2mem(p), size - 2 * SIZE_SZ); atomic_store_relaxed (&av->have_fastchunks, true); unsigned int idx = fastbin_index(size); fb = &fastbin (av, idx); /* Atomically link P to its fastbin: P->FD = *FB; *FB = P; */ mchunkptr old = *fb, old2; if (SINGLE_THREAD_P) { /* Check that the top of the bin is not the record we are going to add (i.e., double free). */ if (__builtin_expect (old == p, 0)) malloc_printerr ("double free or corruption (fasttop)"); p->fd = old; *fb = p; } else do { /* Check that the top of the bin is not the record we are going to add (i.e., double free). */ if (__builtin_expect (old == p, 0)) malloc_printerr ("double free or corruption (fasttop)"); p->fd = old2 = old; } while ((old = catomic_compare_and_exchange_val_rel (fb, p, old2)) != old2); /* check omitted */ }

可以看到,它的逻辑很简单:如果大小合适,就直接添加到 fast bin 的链表头里,没有 tcache 那样的长度限制,多线程场景下依然是用 CAS 来实现原子的链表插入。

相比 tcache,fast bin 的 double free 检查更加简陋:它只能防护连续两次 free 同一个块,只判断了要插入链表的块是否在链表头,而不会检查是否在链表中间。

实验

接下来写一段代码来观察 fast bin 的更新过程:

  1. 由于 fastbin 保存在 main_arena 中,所以我们需要找到 main_arena 的运行时地址
  2. main_arena 不在 libc 符号表中,不能直接找到它的地址,此时可以通过 libc 的调试符号,找到它相对 image base 的 offset 是 0x1ecb80
  3. 再找一个在符号表中的符号 _IO_2_1_stdout_,它相对 image base 的 offset 是 0x1ed6a0
  4. 根据以上信息,就可以在运行时找到 libc 的 image base 地址,从而推断 main_arena 的地址,进而找到所有的 fast bin
  5. 下面写一段代码,观察空闲块进入 fast bin 的过程
#include <stddef.h>#include <stdint.h>#include <stdio.h>#include <stdlib.h>struct malloc_chunk { size_t mchunk_prev_size; /* Size of previous chunk (if free). */ size_t mchunk_size; /* Size in bytes, including overhead. */ struct malloc_chunk *fd; /* double links -- used only if free. */ struct malloc_chunk *bk; /* Only used for large blocks: pointer to next larger size. */ struct malloc_chunk *fd_nextsize; /* double links -- used only if free. */ struct malloc_chunk *bk_nextsize;};/* offset 2 to use otherwise unindexable first 2 bins */#define fastbin_index(sz) ((((unsigned int)(sz)) >> (SIZE_SZ == 8 ? 4 : 3)) - 2)#define INTERNAL_SIZE_T size_t/* MALLOC_ALIGNMENT equals to 16 on 64-bit */#define MALLOC_ALIGNMENT \ (2 * SIZE_SZ < __alignof__(long double) ? __alignof__(long double) \ : 2 * SIZE_SZ)/* The corresponding word size. *//* SIZE_SZ equals to 8 on 64-bit */#define SIZE_SZ (sizeof(INTERNAL_SIZE_T))/* The corresponding bit mask value. *//* MALLOC_ALIGN_MASK equals to 15 on 64-bit */#define MALLOC_ALIGN_MASK (MALLOC_ALIGNMENT - 1)/* The smallest possible chunk *//* MIN_CHUNK_SIZE equals to 32 on 64-bit */#define MIN_CHUNK_SIZE (offsetof(struct malloc_chunk, fd_nextsize))/* The smallest size we can malloc is an aligned minimal chunk *//* MINSIZE equals to 32 on 64-bit */#define MINSIZE \ (unsigned long)(((MIN_CHUNK_SIZE + MALLOC_ALIGN_MASK) & ~MALLOC_ALIGN_MASK))/* equivalent to max(alignUp(req + SIZE_SZ, MALLOC_ALIGNMENT), MINSIZE) */#define request2size(req) \ (((req) + SIZE_SZ + MALLOC_ALIGN_MASK < MINSIZE) \ ? MINSIZE \ : ((req) + SIZE_SZ + MALLOC_ALIGN_MASK) & ~MALLOC_ALIGN_MASK)/* MAX_FAST_SIZE equals to 160 on 64-bit */#define MAX_FAST_SIZE (80 * SIZE_SZ / 4)/* NFASTBINS equals to 10 on 64-bit */#define NFASTBINS (fastbin_index(request2size(MAX_FAST_SIZE)) + 1)struct malloc_state { /* Serialize access. */ int mutex; /* Flags (formerly in max_fast). */ int flags; /* Set if the fastbin chunks contain recently inserted free blocks. */ /* Note this is a bool but not all targets support atomics on booleans. */ int have_fastchunks; /* Fastbins */ struct malloc_chunk *fastbinsY[NFASTBINS];};void dump_fastbin() { void *libc_base = (char *)stdout - 0x1ed6a0; // offset of _IO_2_1_stdout_ struct malloc_state *main_arena = libc_base + 0x1ecb80; // offset of main_arena for (int i = 0; i < NFASTBINS; i++) { if (main_arena->fastbinsY[i]) { struct malloc_chunk *p = main_arena->fastbinsY[i]; printf("fastbin #%d: %p", i, p); p = p->fd; while (p) { printf(" -> %p", p); p = p->fd; } printf("\n"); } }}int main() { // use 10 malloc + free, the first 7 blocks will be saved in tcache, the rest // ones will go to fastbin void *ptrs[10]; printf("allocate 10 pointers:"); for (int i = 0; i < 10; i++) { ptrs[i] = malloc(32); printf(" %p", ptrs[i]); } printf("\n"); // now one ptr goes to fastbin for (int i = 0; i < 8; i++) { free(ptrs[i]); } printf("fastbins after 8 pointers freed:\n"); dump_fastbin(); // free the 9th one free(ptrs[8]); // two pointers in the fastbin printf("fastbins after 9 pointers freed:\n"); dump_fastbin(); // free the 10th one free(ptrs[9]); // three pointers in the fastbin printf("fastbins after 10 pointers freed:\n"); dump_fastbin(); return 0;}

输出如下:

allocate 10 pointers: 0x563bd918d6b0 0x563bd918d6e0 0x563bd918d710 0x563bd918d740 0x563bd918d770 0x563bd918d7a0 0x563bd918d7d0 0x563bd918d800 0x563bd918d830 0x563bd918d860fastbins after 8 pointers freed:fastbin #1: 0x563bd918d7f0fastbins after 9 pointers freed:fastbin #1: 0x563bd918d820 -> 0x563bd918d7f0fastbins after 10 pointers freed:fastbin #1: 0x563bd918d850 -> 0x563bd918d820 -> 0x563bd918d7f0

可以看到,代码先分配了十个块,再按顺序释放,那么前七个块会进入 tcache,剩下的三个块则进入了同一个 fast bin,并且后释放的会在链表的开头。注意 fast bin 链表里的地址打印的是 chunk 地址,而用 malloc 分配的地址指向的是 payload 部分,二者差了 16 字节,最终 fast bin 就是把十个块里最后三个块用链表串起来。由于总是往链表的头部插入空闲块,所以后释放的块出现在靠前的位置。

small bin

分析完 fast bin,接下来来看 small bin。small bin 每个 bin 内空闲块的大小是相同的,并且也是以链表的方式组织,只不过用的是双向链表。

malloc

接下来观察 _int_malloc 是怎么使用 small bin 的。前面提到,_int_malloc 首先会尝试在 fast bin 中分配,如果分配失败,或者大小超出了 fast bin 的范围,接下来会尝试在 small bin 中分配:

// in _int_mallocif (in_smallbin_range (nb)) { idx = smallbin_index (nb); bin = bin_at (av, idx); if ((victim = last (bin)) != bin) { bck = victim->bk; if (__glibc_unlikely (bck->fd != victim)) malloc_printerr ("malloc(): smallbin double linked list corrupted"); set_inuse_bit_at_offset (victim, nb); bin->bk = bck; bck->fd = bin; if (av != &main_arena) set_non_main_arena (victim); check_malloced_chunk (av, victim, nb); /* While we're here, if we see other chunks of the same size, stash them in the tcache. */ size_t tc_idx = csize2tidx (nb); if (tcache && tc_idx < mp_.tcache_bins) { mchunkptr tc_victim; /* While bin not empty and tcache not full, copy chunks over. */ while (tcache->counts[tc_idx] < mp_.tcache_count && (tc_victim = last (bin)) != bin) { if (tc_victim != 0) { bck = tc_victim->bk; set_inuse_bit_at_offset (tc_victim, nb); if (av != &main_arena) set_non_main_arena (tc_victim); bin->bk = bck; bck->fd = bin; tcache_put (tc_victim, tc_idx); } } } void *p = chunk2mem (victim); alloc_perturb (p, bytes); return p; } }

它的过程如下:

  1. 使用 in_smallbin_range (nb) 检查块的大小是否应该放到 small bin 当中
  2. 使用 smallbin_index (nb) 根据块的大小计算出 small bin 的 index,然后 bin_at (av, idx) 对应 small bin 的链表尾部的哨兵,这个双向链表有且只有一个哨兵,这个哨兵就放在 small bin 数组当中
  3. 找到哨兵结点的前驱结点 last (bin),如果链表为空,那么哨兵的前驱结点就是它自己;如果链表非空,那么哨兵的前驱结点就是链表里的最后一个结点,把它赋值给 victim
  4. 把这个空闲块标记为正在使用:set_inuse_bit_at_offset (victim, nb)
  5. victim 从链表里删除:bck = victim->bk; bin->bk = bck; bck->fd = bin;,典型的双向链表的结点删除过程,维护 victim 前驱结点的后继指针,维护哨兵 bin 的前驱指针
  6. 进行一系列的安全检查:check_malloced_chunk
  7. 检查 tcache 对应的 bin,如果它还没有满,就把 small bin 链表中的元素挪到 tcache 当中
  8. 把 payload 地址通过 chunk2mem 计算出来,返回给 malloc 调用者
  9. 调用 alloc_perturb 往新分配的空间内写入垃圾数据(可选),避免泄露之前的数据

其实现过程和 fast bin 很类似,只不过把单向链表改成了双向,并且引入了哨兵结点,这个哨兵结点保存在 malloc_state 结构的 bins 数组当中:

#define NSMALLBINS 64/* SMALLBIN_WIDTH equals to 16 on 64-bit */#define SMALLBIN_WIDTH MALLOC_ALIGNMENT/* SMALLBIN_CORRECTION equals to 0 on 64-bit */#define SMALLBIN_CORRECTION (MALLOC_ALIGNMENT > 2 * SIZE_SZ)/* MIN_LARGE_SIZE equals to 1024 on 64-bit */#define MIN_LARGE_SIZE ((NSMALLBINS - SMALLBIN_CORRECTION) * SMALLBIN_WIDTH)/* equivalent to (sz < 1024) on 64-bit */#define in_smallbin_range(sz) \ ((unsigned long) (sz) < (unsigned long) MIN_LARGE_SIZE)/* equivalent to (sz >> 4) on 64-bit */#define smallbin_index(sz) \ ((SMALLBIN_WIDTH == 16 ? (((unsigned) (sz)) >> 4) : (((unsigned) (sz)) >> 3))\ + SMALLBIN_CORRECTION)typedef struct malloc_chunk* mchunkptr;/* addressing -- note that bin_at(0) does not exist */#define bin_at(m, i) \ (mbinptr) (((char *) &((m)->bins[((i) - 1) * 2])) \ - offsetof (struct malloc_chunk, fd))#define NBINS 128struct malloc_state{ /* omitted */ /* Normal bins packed as described above */ mchunkptr bins[NBINS * 2 - 2];}

乍一看会觉得很奇怪,这里 NBINS * 2 - 2 是什么意思?mchunkptr 是个指针类型,那它指向的数据存在哪?其实这里用了一个小的 trick:

  1. 不去看 bins 元素的类型,只考虑它的元素的大小,每个元素大小是 sizeof(size_t),一共有 NBINS * 2 - 2 个元素
  2. 而每个 bin 对应一个链表的哨兵结点,由于是双向链表,哨兵结点也没有数据,只需要保存前驱和后继两个指针,即每个 bin 只需要存两个指针的空间,也就是 2 * sizeof(size_t)
  3. 正好 bins 数组给每个 bin 留出了 2 * sizeof(size_t) 的空间(bin 0 除外,这个 bin 不存在),所以实际上这些哨兵结点的前驱和后继指针就保存在 bins 数组里,按顺序保存,首先是 bin 1 的前驱,然后是 bin 1 的后继,接着是 bin 2 的前驱,依此类推
  4. 虽然空间对上了,但是为了方便使用,代码里用 bin_at 宏来计算出一个 malloc_chunk 结构体的指针,而已知 bins 数组只保存了 fdbk 两个指针,并且 bin 的下标从 1 开始,所以 bin i 的 fd 指针地址就是 (char *) &((m)->bins[((i) - 1) * 2]),再减去 malloc_chunk 结构体中 fd 成员的偏移,就得到了一个 malloc_chunk 结构体的指针,当然了,这个结构体只有 fdbk 两个字段是合法的,其他字段如果访问了,就会访问到其他 bin 那里去

抛开这些 trick,其实就等价于用一个数组保存了每个 bin 的 fdbk 指针,至于为什么要强行转换成 malloc_chunk 类型的指针,可能是为了方便代码的编写,不需要区分空闲块的结点和哨兵结点。

此外,small bin 的处理里还多了一次 set_inuse_bit_at_offset (victim, nb),它的定义如下:

#define set_inuse_bit_at_offset(p, s) \ (((mchunkptr) (((char *) (p)) + (s)))->mchunk_size |= PREV_INUSE)

乍一看会觉得很奇怪,这个访问不是越界了吗?其实这个就是跨过当前的 chunk,访问相邻的下一个 chunk,在它的 mchunk_size 字段上打标记,表示它的前一个 chunk 已经被占用。前面提到过,mchunk_size 同时保存了 chunk 的大小和一些 flag,由于 chunk 的大小至少是 8 字节对齐的(32 位系统上),所以最低的 3 位就被拿来保存如下的 flag:

  1. PREV_INUSE(0x1): 前一个 chunk 已经被分配
  2. IS_MAPPED(0x2):当前 chunk 的内存来自于 mmap
  3. NON_MAIN_ARENA(0x4):当前 chunk 来自于 main arena 以外的其他 arena

在这里,就是设置了 PREV_INUSE flag,方便后续的相邻块的合并。

可以注意到,small bin 的分配范围是 nb < MIN_LARGE_SIZE,因此在 64 位上,malloc(1000) 或更小的分配会被 small bin 分配,而 malloc(1001) 或更大的分配则不可以。

free

在讲述 small bin 在 free 中的实现之前,先讨论 _int_malloc 的后续逻辑,最后再回过头来看 free 的部分。

consolidate

当要分配的块经过 fast bin 和 small bin 两段逻辑都没能分配成功,并且要分配的块比较大的时候(!in_small_range (nb)),会进行一次 malloc_consolidate 调用,这个函数会尝试对 fast bin 中的空闲块进行合并,然后把新的块插入到 unsorted bin 当中。它的实现如下:

unsorted_bin = unsorted_chunks(av);maxfb = &fastbin (av, NFASTBINS - 1);fb = &fastbin (av, 0);do { p = atomic_exchange_acq (fb, NULL); if (p != 0) { do { /* malloc check omitted */ check_inuse_chunk(av, p); nextp = p->fd; /* Slightly streamlined version of consolidation code in free() */ size = chunksize (p); nextchunk = chunk_at_offset(p, size); nextsize = chunksize(nextchunk); if (!prev_inuse(p)) { prevsize = prev_size (p); size += prevsize; p = chunk_at_offset(p, -((long) prevsize)); /* malloc check omitted */ unlink_chunk (av, p); } if (nextchunk != av->top) { nextinuse = inuse_bit_at_offset(nextchunk, nextsize); if (!nextinuse) { size += nextsize; unlink_chunk (av, nextchunk); } else clear_inuse_bit_at_offset(nextchunk, 0); first_unsorted = unsorted_bin->fd; unsorted_bin->fd = p; first_unsorted->bk = p; if (!in_smallbin_range (size)) { p->fd_nextsize = NULL; p->bk_nextsize = NULL; } set_head(p, size | PREV_INUSE); p->bk = unsorted_bin; p->fd = first_unsorted; set_foot(p, size); } else { size += nextsize; set_head(p, size | PREV_INUSE); av->top = p; } } while ( (p = nextp) != 0); }} while (fb++ != maxfb);
  1. 第一层循环,遍历每个非空的 fast bin,进行下列操作
  2. 第二层循环,每个非空的 fast bin 有一个单向链表,沿着链表进行迭代,遍历链表上的每个空闲块,进行下列操作
  3. 循环内部,检查当前空闲块能否和前后的空闲块合并
  4. 首先检查在它前面(地址更低的)相邻的块是否空闲:如果 PREV_INUSE 没有被设置,可以通过 mchunk_prev_size 找到前面相邻的块的开头,然后把两个块合并起来;如果前面相邻的块已经在某个双向链表当中(例如 small bin),把它从双向链表中删除:unlink_chunk (av, p);;为什么前面要用双向链表,也是为了在这里可以直接从链表中间删除一个结点
  5. 接着检查在它后面(地址更高的)相邻的块是否空闲:根据自己的 size,计算出下一个块的地址,得到下一个块的大小,再读取下一个块的下一个块,根据它的 PREV_INUSE,判断下一个块是否空闲;如果空闲,那就把下一个块也合并进来,同理也要把它从双向链表中删除:unlink_chunk (av, nextchunk);;代码中还有对 top chunk 的特殊处理,这里先略过
  6. 合并完成以后,把当前的空闲块放到 unsorted bin 当中,也是一个简单的双向链表向链表头的插入算法:first_unsorted = unsorted_bin->fd; unsorted_bin->fd = p; first_unsorted->bk = p; p->bk = unsorted_bin; p->fd = first_unsorted;

unlink_chunk 的实现就是经典的双向链表删除结点的算法:

/* Take a chunk off a bin list. */static voidunlink_chunk (mstate av, mchunkptr p){ /* malloc check omitted */ mchunkptr fd = p->fd; mchunkptr bk = p->bk; /* malloc check omitted */ fd->bk = bk; bk->fd = fd; if (!in_smallbin_range (chunksize_nomask (p)) && p->fd_nextsize != NULL) { /* malloc check omitted */ if (fd->fd_nextsize == NULL) { if (p->fd_nextsize == p) fd->fd_nextsize = fd->bk_nextsize = fd; else { fd->fd_nextsize = p->fd_nextsize; fd->bk_nextsize = p->bk_nextsize; p->fd_nextsize->bk_nextsize = fd; p->bk_nextsize->fd_nextsize = fd; } } else { p->fd_nextsize->bk_nextsize = p->bk_nextsize; p->bk_nextsize->fd_nextsize = p->fd_nextsize; } }}

这里的 fd_nextsizebk_nextsize 字段适用于 large bin,后面会讨论这个双向链表的细节。

因此 unsorted bin 保存了一些从 fast bin 合并而来的一些块,由于 unsorted bin 只有一个,所以它里面会保存各种大小的空闲块。实际上,unsorted bin 占用的就是 malloc_state 结构中的 bin 1,因为我们已经知道,块的大小至少是 32,而大小为 32 的块,对应的 small bin index 是 2,说明 1 没有被用到,其实就是留给 unsorted bin 用的。在 64 位系统下,malloc_state 的 127 个 bin 分配如下:

  1. bin 1 是 unsorted bin
  2. bin 2 到 bin 63 是 small bin
  3. bin 64 到 bin 126 是 large bin

bin 127 没有用到。

经过这次合并之后,接下来 _int_malloc 尝试从 unsorted bin 和 large bin 中分配空闲块。

再次回到 __libc_malloc

接下来,_int_malloc 有一大段代码来进行后续的内存分配,大概步骤包括:

  1. 把 unsorted bin 中的空闲块的处理,放到 small bin 或者 large bin 中,同时如果有合适的块,就分配给 malloc 的调用者
  2. 如果还是没有找到合适大小的块,就在 large bin 里寻找空闲块来分配;如果找不到合适大小的块,进行 consolidate,尝试更多的合并,得到更大的块;重复这个过程多次
  3. 如果还是找不到合适的块,就从堆顶分配新的块,如果堆已经满了,还需要去扩大堆,或者直接用 mmap 分配一片内存

现在分步骤观察这个过程,首先观察 unsorted bin 的处理。

unsorted bin

首先是 unsorted bin 的空闲块的处理:

int iters = 0;while ((victim = unsorted_chunks (av)->bk) != unsorted_chunks (av)) { bck = victim->bk; size = chunksize (victim); mchunkptr next = chunk_at_offset (victim, size); /* malloc checks omitted */ /* If a small request, try to use last remainder if it is the only chunk in unsorted bin. This helps promote locality for runs of consecutive small requests. This is the only exception to best-fit, and applies only when there is no exact fit for a small chunk. */ if (in_smallbin_range (nb) && bck == unsorted_chunks (av) && victim == av->last_remainder && (unsigned long) (size) > (unsigned long) (nb + MINSIZE)) { /* split and reattach remainder */ remainder_size = size - nb; remainder = chunk_at_offset (victim, nb); unsorted_chunks (av)->bk = unsorted_chunks (av)->fd = remainder; av->last_remainder = remainder; remainder->bk = remainder->fd = unsorted_chunks (av); if (!in_smallbin_range (remainder_size)) { remainder->fd_nextsize = NULL; remainder->bk_nextsize = NULL; } set_head (victim, nb | PREV_INUSE | (av != &main_arena ? NON_MAIN_ARENA : 0)); set_head (remainder, remainder_size | PREV_INUSE); set_foot (remainder, remainder_size); check_malloced_chunk (av, victim, nb); void *p = chunk2mem (victim); alloc_perturb (p, bytes); return p; } /* remove from unsorted list */ /* malloc checks omitted */ unsorted_chunks (av)->bk = bck; bck->fd = unsorted_chunks (av); /* Take now instead of binning if exact fit */ if (size == nb) { set_inuse_bit_at_offset (victim, size); if (av != &main_arena) set_non_main_arena (victim); /* Fill cache first, return to user only if cache fills. We may return one of these chunks later. */ if (tcache_nb && tcache->counts[tc_idx] < mp_.tcache_count) { tcache_put (victim, tc_idx); return_cached = 1; continue; } else { check_malloced_chunk (av, victim, nb); void *p = chunk2mem (victim); alloc_perturb (p, bytes); return p; } } /* place chunk in bin */ if (in_smallbin_range (size)) { victim_index = smallbin_index (size); bck = bin_at (av, victim_index); fwd = bck->fd; } else { victim_index = largebin_index (size); bck = bin_at (av, victim_index); fwd = bck->fd; /* maintain large bins in sorted order */ if (fwd != bck) { /* Or with inuse bit to speed comparisons */ size |= PREV_INUSE; /* if smaller than smallest, bypass loop below */ assert (chunk_main_arena (bck->bk)); if ((unsigned long) (size) < (unsigned long) chunksize_nomask (bck->bk)) { fwd = bck; bck = bck->bk; victim->fd_nextsize = fwd->fd; victim->bk_nextsize = fwd->fd->bk_nextsize; fwd->fd->bk_nextsize = victim->bk_nextsize->fd_nextsize = victim; } else { assert (chunk_main_arena (fwd)); while ((unsigned long) size < chunksize_nomask (fwd)) { fwd = fwd->fd_nextsize; assert (chunk_main_arena (fwd)); } if ((unsigned long) size == (unsigned long) chunksize_nomask (fwd)) /* Always insert in the second position. */ fwd = fwd->fd; else { victim->fd_nextsize = fwd; victim->bk_nextsize = fwd->bk_nextsize; /* malloc checks omitted */ fwd->bk_nextsize = victim; victim->bk_nextsize->fd_nextsize = victim; } bck = fwd->bk; /* malloc checks omitted */ } } else victim->fd_nextsize = victim->bk_nextsize = victim; } mark_bin (av, victim_index); victim->bk = bck; victim->fd = fwd; fwd->bk = victim; bck->fd = victim; /* If we've processed as many chunks as we're allowed while filling the cache, return one of the cached ones. */ ++tcache_unsorted_count; if (return_cached && mp_.tcache_unsorted_limit > 0 && tcache_unsorted_count > mp_.tcache_unsorted_limit) { return tcache_get (tc_idx); }#define MAX_ITERS 10000 if (++iters >= MAX_ITERS) break; }

它的流程如下:

  1. 遍历 unsorted bin 双向链表,从哨兵结点开始,从后往前遍历空闲块
  2. fast path 逻辑:如果要申请的块比当前空闲块小,并且当前空闲块可以拆分,那就拆分当前的空闲块,然后直接分配拆分后的空闲块
  3. 如果要申请的块的大小和当前空闲块的大小相同,把空闲块放到 tcache,或者直接分配这个空闲块
  4. 把当前空闲块根据大小,分发到 small bin 或者 large bin
  5. 如果 tcache 中有合适的空闲块,就分配它

由此可见,unsorted bin 中的空闲块在 malloc 的时候会被分派到对应的 small bin 或者 large bin 当中。small bin 的处理比较简单,因为每个 bin 的块大小都相同,直接加入到双向链表即可。large bin 的处理则比较复杂,下面主要来分析 large bin 的结构。

large bin

large bin 和其他 bin 的不同的地方在于,它每个 bin 的大小不是一个固定的值,而是一个范围。在 64 位下,bin 64 到 bin 127 对应的块大小范围:

  1. bin 64 到 bin 96: 从 1024 字节开始,每个 bin 覆盖 64 字节的长度范围,例如 bin 64 对应 1024-1087 字节范围,bin 96 对应 3072-3135 字节范围
  2. bin 97 到 bin 111: 从 3136 字节开始,每个 bin 覆盖 512 字节的长度范围,例如 bin 97 对应 3136-3583 字节范围(没有涵盖 512 字节是因为对齐问题,其他的都是涵盖 512 字节),bin 111 对应 10240-10751 字节范围
  3. bin 112 到 bin 119: 从 10752 字节开始,每个 bin 覆盖 4096 字节的长度范围,例如 bin 112 对应 10752-12287 字节范围(没有涵盖 4096 字节是因为对齐问题,其他的都是涵盖 4096 字节),bin 119 对应 36864-40959 字节范围
  4. bin 120 到 bin 123: 从 40960 字节开始,每个 bin 覆盖 32768 字节的长度范围,例如 bin 120 对应 40960-65535 字节范围(没有涵盖 32768 字节是因为对齐问题,其他的都是涵盖 32768 字节),bin 123 对应 131072-163839 字节范围
  5. bin 124: 163840-262143 共 98304 个字节的范围
  6. bin 125: 262144-524287 共 262144 个字节的范围
  7. bin 126: 524288 或更长

可以看到,比较短的长度范围给的 bin 也比较多,后面则更加稀疏。上述各个 bin 的大小范围可以通过以下代码打印:

#include <stdio.h>#define largebin_index_64(sz) \ (((((unsigned long)(sz)) >> 6) <= 48) \ ? 48 + (((unsigned long)(sz)) >> 6) \ : ((((unsigned long)(sz)) >> 9) <= 20) \ ? 91 + (((unsigned long)(sz)) >> 9) \ : ((((unsigned long)(sz)) >> 12) <= 10) \ ? 110 + (((unsigned long)(sz)) >> 12) \ : ((((unsigned long)(sz)) >> 15) <= 4) \ ? 119 + (((unsigned long)(sz)) >> 15) \ : ((((unsigned long)(sz)) >> 18) <= 2) \ ? 124 + (((unsigned long)(sz)) >> 18) \ : 126)int main() { int last_bin = largebin_index_64(1024); int last_i = 1024; for (int i = 1024; i < 1000000; i++) { if (largebin_index_64(i) != last_bin) { printf("%d-%d: %d, length %d\n", last_i, i - 1, last_bin, i - last_i); last_i = i; last_bin = largebin_index_64(i); } } printf("%d-inf: %d\n", last_i, last_bin); return 0;}

因此 large bin 里面会有不同 chunk 大小的空闲块。为了快速地寻找想要的大小的空闲块,large bin 中空闲块按照从大到小的顺序组成链表,同时通过 fd_nextsizebk_nextsize 把每种大小出现的第一个块组成双向链表。大致的连接方式如下,参考了 Malloc Internals 给的示例:

 bins[id] chunk A chunk B chunk C+----------+ +-----------------+ +-----------------+ +-----------------+| fd = A | | size = 1072 | | size = 1072 | | size = 1040 |+----------+ +-----------------+ +-----------------+ +-----------------+| bk = C | | fd = B | | fd = C | | fd = bins[id] |+----------+ +-----------------+ +-----------------+ +-----------------+ | bk = bins[id] | | bk = A | | bk = B | +-----------------+ +-----------------+ +-----------------+ | fd_nextsize = C | | fd_nextsize = A | +-----------------+ +-----------------+ | bk_nextsize = C | | bk_nextsize = A | +-----------------+ +-----------------+

即所有的空闲块加哨兵组成一个双向链表,从大到小按照 A -> B -> C 的顺序连接;然后每种大小的第一个块 AC 单独在一个 nextsize 链表当中。

因此在插入 large bin 的时候,分如下几种情况:

第一种情况,如果新插入的空闲块的大小比目前已有空闲块都小,则直接插入到空闲块和 nextsize 链表的尾部。例如要插入一个 size = 1024 的空闲块 D,插入后状态为:

 bins[id] chunk A chunk B chunk C chunk D +----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+| fd = A | | size = 1072 | | size = 1072 | | size = 1040 | | size = 1024 |+----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+| bk = D | | fd = B | | fd = C | | fd = D | | fd = bins[id] |+----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+ | bk = bins[id] | | bk = A | | bk = B | | bk = C | +-----------------+ +-----------------+ +-----------------+ +-----------------+ | fd_nextsize = C | | fd_nextsize = D | | fd_nextsize = A | +-----------------+ +-----------------+ +-----------------+ | bk_nextsize = D | | bk_nextsize = A | | bk_nextsize = C | +-----------------+ +-----------------+ +-----------------+

第二种情况,在遍历 nextsize 链表过程中,发现已经有大小相同的块,那就把新的块插入到它的后面。例如要插入一个 size = 1072 的空闲块 D,通过遍历 nextsize 链表找到了 A,插入之后的状态为:

 bins[id] chunk A chunk D chunk B chunk C +----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+| fd = A | | size = 1072 | | size = 1072 | | size = 1072 | | size = 1040 |+----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+| bk = C | | fd = D | | fd = B | | fd = C | | fd = bins[id] |+----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+ | bk = bins[id] | | bk = A | | bk = D | | bk = C | +-----------------+ +-----------------+ +-----------------+ +-----------------+ | fd_nextsize = C | | fd_nextsize = A | +-----------------+ +-----------------+ | bk_nextsize = C | | bk_nextsize = A | +-----------------+ +-----------------+

第三种情况,在遍历 nextsize 链表过程中,没有大小相同的块,那就按照从大到小的顺序插入到合适的位置。例如要插入一个 size = 1056 的空闲块 D,通过遍历 nextsize 链表找到了 A 和 C,插入之后的状态为:

 bins[id] chunk A chunk B chunk D chunk C +----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+| fd = A | | size = 1072 | | size = 1072 | | size = 1056 | | size = 1040 |+----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+| bk = D | | fd = B | | fd = D | | fd = C | | fd = bins[id] |+----------+ +-----------------+ +-----------------+ +-----------------+ +-----------------+ | bk = bins[id] | | bk = A | | bk = B | | bk = D | +-----------------+ +-----------------+ +-----------------+ +-----------------+ | fd_nextsize = D | | fd_nextsize = C | | fd_nextsize = A | +-----------------+ +-----------------+ +-----------------+ | bk_nextsize = C | | bk_nextsize = A | | bk_nextsize = D | +-----------------+ +-----------------+ +-----------------+

这样就保证了插入以后的 large bin,依然满足从大到小排序,并且每种大小的第一个块组成 nextsize 链表的性质。

malloc

接着回到 _libc_malloc。前面提到,unsorted bin 中空闲块已经被挪到了 small bin 或者 large bin,并在这个过程中把合适大小的空闲块直接分配。如果还是没有分配成功,接下来就要在 large bin 里寻找一个块来分配:

if (!in_smallbin_range (nb)) { bin = bin_at (av, idx); /* skip scan if empty or largest chunk is too small */ if ((victim = first (bin)) != bin && (unsigned long) chunksize_nomask (victim) >= (unsigned long) (nb)) { victim = victim->bk_nextsize; while (((unsigned long) (size = chunksize (victim)) < (unsigned long) (nb))) victim = victim->bk_nextsize; /* Avoid removing the first entry for a size so that the skip list does not have to be rerouted. */ if (victim != last (bin) && chunksize_nomask (victim) == chunksize_nomask (victim->fd)) victim = victim->fd; remainder_size = size - nb; unlink_chunk (av, victim); /* Exhaust */ if (remainder_size < MINSIZE) { set_inuse_bit_at_offset (victim, size); if (av != &main_arena) set_non_main_arena (victim); } /* Split */ else { remainder = chunk_at_offset (victim, nb); /* We cannot assume the unsorted list is empty and therefore have to perform a complete insert here. */ bck = unsorted_chunks (av); fwd = bck->fd; if (__glibc_unlikely (fwd->bk != bck)) malloc_printerr ("malloc(): corrupted unsorted chunks"); remainder->bk = bck; remainder->fd = fwd; bck->fd = remainder; fwd->bk = remainder; if (!in_smallbin_range (remainder_size)) { remainder->fd_nextsize = NULL; remainder->bk_nextsize = NULL; } set_head (victim, nb | PREV_INUSE | (av != &main_arena ? NON_MAIN_ARENA : 0)); set_head (remainder, remainder_size | PREV_INUSE); set_foot (remainder, remainder_size); } check_malloced_chunk (av, victim, nb); void *p = chunk2mem (victim); alloc_perturb (p, bytes); return p; } }

从 large bin 分配空闲块的过程如下:

  1. 根据大小找到对应的 large bin
  2. 如果 large bin 中最大的空闲块足够大,遍历 nextsize 链表,找到一个比要分配的大小更大的最小的空闲块
  3. 为了避免更新 nextsize 链表,如果当前块大小对应了不止一个空闲块,那就取第二个空闲块,这样就不用更新 nextsize 链表
  4. 计算空闲块大小和要分配的大小的差值,如果差值太小,多余的部分就直接浪费;如果差的空间还能放下一个 chunk,就进行拆分,把拆出来的剩下的部分放到 unsorted bin 中
  5. 计算 payload 地址,进行可选的 perturb,完成分配

寻找更大的 bin

如果在当前 bin 内还是找不到空闲块,那就只能从更大的 bin 里寻找空闲块了:

++idx;bin = bin_at (av, idx);block = idx2block (idx);map = av->binmap[block];bit = idx2bit (idx);for (;; ) { /* Skip rest of block if there are no more set bits in this block. */ if (bit > map || bit == 0) { do { if (++block >= BINMAPSIZE) /* out of bins */ goto use_top; } while ((map = av->binmap[block]) == 0); bin = bin_at (av, (block << BINMAPSHIFT)); bit = 1; } /* Advance to bin with set bit. There must be one. */ while ((bit & map) == 0) { bin = next_bin (bin); bit <<= 1; assert (bit != 0); } /* Inspect the bin. It is likely to be non-empty */ victim = last (bin); /* If a false alarm (empty bin), clear the bit. */ if (victim == bin) { av->binmap[block] = map &= ~bit; /* Write through */ bin = next_bin (bin); bit <<= 1; } else { size = chunksize (victim); /* We know the first chunk in this bin is big enough to use. */ assert ((unsigned long) (size) >= (unsigned long) (nb)); remainder_size = size - nb; /* unlink */ unlink_chunk (av, victim); /* Exhaust */ if (remainder_size < MINSIZE) { set_inuse_bit_at_offset (victim, size); if (av != &main_arena) set_non_main_arena (victim); } /* Split */ else { remainder = chunk_at_offset (victim, nb); /* We cannot assume the unsorted list is empty and therefore have to perform a complete insert here. */ bck = unsorted_chunks (av); fwd = bck->fd; if (__glibc_unlikely (fwd->bk != bck)) malloc_printerr ("malloc(): corrupted unsorted chunks 2"); remainder->bk = bck; remainder->fd = fwd; bck->fd = remainder; fwd->bk = remainder; /* advertise as last remainder */ if (in_smallbin_range (nb)) av->last_remainder = remainder; if (!in_smallbin_range (remainder_size)) { remainder->fd_nextsize = NULL; remainder->bk_nextsize = NULL; } set_head (victim, nb | PREV_INUSE | (av != &main_arena ? NON_MAIN_ARENA : 0)); set_head (remainder, remainder_size | PREV_INUSE); set_foot (remainder, remainder_size); } check_malloced_chunk (av, victim, nb); void *p = chunk2mem (victim); alloc_perturb (p, bytes); return p; } }

为了跳过空的 bin,维护了一个 bitmap,记录哪些 bin 会有内容。找到一个非空的 bin 以后,它的大小肯定是足够分配的,接下来就和之前一样,要么舍弃多余的空间,要么把多余的空间做成一个 chunk,插入到 unsorted bin 当中。

如果所有 bin 都空了,说明没有可以分配的可能了,就跳转到 use_top 逻辑。

top chunk 分配

如果已有的 bin 都无法分配了,就尝试拆分 top chunk 来进行分配。top chunk 指的是当前堆里地址最高的那个 chunk,也可以理解为未分配的部分。分配的逻辑如下:

use_top:victim = av->top;size = chunksize (victim);if (__glibc_unlikely (size > av->system_mem)) malloc_printerr ("malloc(): corrupted top size");if ((unsigned long) (size) >= (unsigned long) (nb + MINSIZE)) { remainder_size = size - nb; remainder = chunk_at_offset (victim, nb); av->top = remainder; set_head (victim, nb | PREV_INUSE | (av != &main_arena ? NON_MAIN_ARENA : 0)); set_head (remainder, remainder_size | PREV_INUSE); check_malloced_chunk (av, victim, nb); void *p = chunk2mem (victim); alloc_perturb (p, bytes); return p; }/* When we are using atomic ops to free fast chunks we can get here for all block sizes. */else if (atomic_load_relaxed (&av->have_fastchunks)) { malloc_consolidate (av); /* restore original bin index */ if (in_smallbin_range (nb)) idx = smallbin_index (nb); else idx = largebin_index (nb); }/* Otherwise, relay to handle system-dependent cases */else { void *p = sysmalloc (nb, av); if (p != NULL) alloc_perturb (p, bytes); return p; }

如果 top chunk 的空间够大,那就对 top chunk 进行拆分,低地址的部分分配出去,剩下的部分成为新的 top chunk。如果 top chunk 不够大,并且 fast bin 还有空间,那就再挣扎一下,consolidate 一下,重新分配一次。如果这些方法都失败了,那就调用 sysmalloc,通过 mmap 或者 sbrk 等方式来分配新的空间。

前面在讨论 consolidate 的时候,跳过了对 top chunk 的特殊处理。其实,top chunk 的特殊处理也很简单,如果当前空闲块的下一个块就是 top chunk,那就把当前空闲块合并到 top chunk 即可。

free

前面把 malloc 的流程基本分析完了,接下来分析一下 free 的逻辑,它做的事情包括:

  1. 前面已经分析过,如果 tcache 对应的 bin 存在且非满,则把空闲块插入到 tcache 的链表头
  2. 如果存在对应的 fast bin,则插入空闲块到 fast bin 对应链表的头部
  3. 尝试和它前后的空闲块进行合并,实现和前面 consolidate 类似,合并后进入 unsorted bin
  4. 如果释放的内存比较多,检查 top chunk 大小,如果剩余的空间比较多,则归还一部分内存给操作系统
  5. 对于 mmap 分配的内存,用 munmap 释放掉

由于 free 的实现相对简单,在这里就不详细解析了,比较详细的实现分析见后。

realloc

realloc 的实现在 __libc_realloc 当中,它的实现比较简单:

  1. 如果重新分配的大小是 0,realloc 等价为 free,就调用 free
  2. 如果旧指针是 NULL,realloc 等价为 malloc,就调用 malloc
  3. 如果直接是 mmap 出来的块,利用 mremap 来扩展空间
  4. 如果是要申请更少的内存,把多出来的部分拆成一个单独的块,然后 free 掉它
  5. 如果是要申请更多的内存,尝试从内存更高地址的相邻块获取空间,如果有的话,合并两个块,然后把多余的空间拆成一个单独的块,然后 free 掉它;如果内存更高地址的相邻块已经被占用,就重新 malloc 一个块,用 memcpy 把数据拷贝过去,再 free 掉旧的内存

calloc

calloc 的实现在 __libc_calloc 当中,它的语义相比 malloc 多了一个清零,所以它的实现也不复杂:

  1. 如果 top chunk 还有空间,并且 top chunk 的数据已经被清零,则优先从 top chunk 分配空间,避免了 memset 的开销
  2. fallback 到 _int_malloc 进行内存分配,分配成功后,再 memset 清零

arena 和 heap

前面讨论了各种 chunk 在内存分配器内部流转的情况,但并没有讨论这些空间是怎么从操作系统分配而来的,又是怎么维护的。glibc 内存分配器实际上设计了两个层次:

  1. arena 层次:对应锁的粒度,一个 arena 可以对应多个 heap,有一个特殊的 arena 是 main_arena;arena 的数量有限制,在 64 位系统下默认的数量限制是处理器核心数的 8 倍,避免出现太多的内存碎片
  2. heap 层次:每个 heap 大小有上限:1024 * 1024 字节,也就是 1MB;当 arena 需要更多空间的时候,可以分配新的 heap;arena 自身就保存在 arena 的第一个 heap 内部的空间,同一个 arena 的多个 heap 之间通单向链表连接起来;arena 的 top chunk 指向最后一个创建的 heap 的顶部的空闲块

arena 的结构就是前面看到的 malloc_state,包括如下字段:

struct malloc_state{ /* Serialize access. */ __libc_lock_define (, mutex); /* Flags (formerly in max_fast). */ int flags; /* Set if the fastbin chunks contain recently inserted free blocks. */ /* Note this is a bool but not all targets support atomics on booleans. */ int have_fastchunks; /* Fastbins */ mfastbinptr fastbinsY[NFASTBINS]; /* Base of the topmost chunk -- not otherwise kept in a bin */ mchunkptr top; /* The remainder from the most recent split of a small request */ mchunkptr last_remainder; /* Normal bins packed as described above */ mchunkptr bins[NBINS * 2 - 2]; /* Bitmap of bins */ unsigned int binmap[BINMAPSIZE]; /* Linked list */ struct malloc_state *next; /* Linked list for free arenas. Access to this field is serialized by free_list_lock in arena.c. */ struct malloc_state *next_free; /* Number of threads attached to this arena. 0 if the arena is on the free list. Access to this field is serialized by free_list_lock in arena.c. */ INTERNAL_SIZE_T attached_threads; /* Memory allocated from the system in this arena. */ INTERNAL_SIZE_T system_mem; INTERNAL_SIZE_T max_system_mem;};

这里很多字段在之前已经见过了,比如:

  1. mutex:arena 的互斥锁
  2. have_fastchunks:记录 fast bin 中是否还有空闲块,用于判断是否需要 consolidate
  3. fastbinsY:保存 fast bin 每个 bin 的头指针的数组
  4. top:指向 top chunk
  5. last_remainder:指向最近一次 split 出来的空闲块,用于访存局部性优化
  6. bins:保存 unsorted bin,small bin 和 large bin 各个双向链表的哨兵结点的 fdbk 指针
  7. binmap:记录哪些 small 或 large bin 里面有空闲块,用于加速寻找下一个有空闲块的 bin

之前没有涉及到的字段包括:

  1. flags: 维护 NONCONTIGUOUS_BIT 标记,即 arena 所使用的内存是否是连续的,例如用 sbrk 分配出来的内存是连续的,用 mmap 则不是
  2. next: 维护所有 arena 的单向链表,链表头就是 main_arena
  3. next_free: 维护所有空闲的 arena 的单向链表 free list,链表头保存在 static mstate free_list
  4. attached_threads: 记录有多少个线程会使用这个 arena,类似于一种引用计数,当它减到零的时候,意味着 arena 可以被释放到 free list 了
  5. system_mem: 记录它从操作系统分配了多少的内存的大小
  6. max_system_mem:记录它历史上从操作系统分配最多的内存的大小

可见 arena 的结构还是比较简单的,接下来分析 heap 的结构:

typedef struct _heap_info{ mstate ar_ptr; /* Arena for this heap. */ struct _heap_info *prev; /* Previous heap. */ size_t size; /* Current size in bytes. */ size_t mprotect_size; /* Size in bytes that has been mprotected PROT_READ|PROT_WRITE. */ /* Make sure the following data is properly aligned, particularly that sizeof (heap_info) + 2 * SIZE_SZ is a multiple of MALLOC_ALIGNMENT. */ char pad[-6 * SIZE_SZ & MALLOC_ALIGN_MASK];} heap_info;

字段如下:

  1. ar_ptr:指向 heap 所属的 arena
  2. prev:指向前一个 heap,组成一个 heap 的单向链表,新添加的 heap 放到链表的尾部
  3. size: heap 的大小
  4. mprotect_size: heap 被设置为可读写的部分的内存大小,也就是 heap 的活跃部分大小,对齐到页的边界;默认情况下,heap 的未分配空间被映射为不可读不可写不可执行的属性
  5. pad: 添加 padding,保证它的大小是 MALLOC_ALIGNMENT 的倍数

前面提到过,arena 的空间会复用它的第一个 heap 的空间,紧接着放在 heap_info 结构体的后面。这个 heap_info 结构体就放在 heap 所用空间的开头。

heap 有一个特性,就是它的起始地址,一定是对齐到 HEAP_MAX_SIZE(默认是 64MB)的整数倍数,并且它的大小也不会超过 HEAP_MAX_SIZE,所以如果要知道某个 chunk 属于哪个 heap,只需要向下取整到 HEAP_MAX_SIZE 的倍数即可。如果要知道某个 chunk 属于哪个 arena,就先找到 heap,再从 heap_info 获取 ar_ptr 就可以了。

比较有意思的一个点是,heap 保存了 arena 的指针,但是反过来,arena 并没有保存 heap 的指针,那么怎么从 arena 找到属于这个 arena 的所有 heap 呢?这会用到一个性质:arena 的 top 永远指向最新的一个 heap 的地址最高的空闲块,而这个最新的 heap 正好处于 heap 链表的尾部,所以如果要遍历 arena 里的 heap,只需要:

  1. 获取 arena 的 top 指针
  2. 把 top 指针向下取整到 HEAP_MAX_SIZE 的整倍数,得到 top 所在 heap 的 heap_info 指针
  3. 沿着 heap_info 的 prev 指针向前走,一直遍历,直到 prev 指针为 NULL 为止

所以 top 指针也充当了 heap 链表的尾指针的作用。

接下来观察 arena 和 heap 是怎么初始化的。

main arena 是特殊的,因为它直接保存在 glibc 的 data 段当中,所以不需要动态分配,并且 main arena 的数据是通过 sbrk 从系统分配的,它的维护逻辑在 sysmalloc 函数中实现:当 malloc 尝试各种办法都找不到空间分配时,就会调用 sysmalloc 来扩展 top chunk 并从 top chunk 中分配新的块。当 sysmalloc 遇到 main arena 的时候,就会尝试用 sbrk 扩展堆的大小,从而扩大 top chunk。当然了,sbrk 可能会失败,这个时候 main arena 也会通过 mmap 来分配更多的内存。

其他的 arena 则是通过 _int_new_arena 分配的,它的流程是:

  1. 调用 new_heap 创建一个堆,至少能够放 heap_infomalloc_state 的空间
  2. 这段空间的开头就是 heap_info,紧随其后就是 arena 自己的 malloc_state,然后把 top chunk 指向 malloc_state 后面的空闲空间

new_heap 则是会通过 mmap 向操作系统申请内存。因此除了 main_arena 以外,所有的 arena 的 heap 都会放在 mmap 出来的空间里。

于是 sysmalloc 要做的事情也比较清晰了,它要做的就是,在 top chunk 不够大的时候,分配更多空间给 top chunk:

  1. 如果要分配的块特别大,超出了阈值 mmap_threshold,就直接用 mmap 申请内存
  2. 如果不是 main arena,就尝试扩大 top 所在的 heap:heap 在初始化的时候,虽然会 mmap 一个 HEAP_MAX_SIZE 大小的内存,但大部分空间都被映射为不可读不可写不可执行;所以扩大 heap,实际上就是把要用的部分通过 mprotect 添加可读和可写的权限;如果 heap 达到了大小的上限,那就新建一个 heap,把 top chunk 放到新的 heap 上去
  3. 如果是 main arena,就用 sbrk 扩大 top chunk;如果扩大失败,那就用 mmap 来分配内存

小结

结构

到这里就基本把 glibc 的内存分配器分析得差不多了。glibc 把空闲块放在如下四种 bin 内:

  1. fast bin: 每个 bin 对应固定大小的空闲块,用单向链表维护,链表头指针保存在 malloc_statefastbinsY 成员
  2. unsorted bin: 一个双向链表,维护一些刚被 free 的空闲块,无大小要求,链表的哨兵结点保存在 malloc_statebins 成员刚开头
  3. small bin: 每个 bin 对应固定大小的空闲块,用双向链表维护,链表的哨兵结点保存在 malloc_statebins 成员,紧接在 unsorted bin 后面
  4. large bin: 每个 bin 对应一段大小范围的空闲块,用双向链表维护,按照块大小从大到小排序,每个大小的第一个空闲块在 nextsize 双向链表当中,链表的哨兵结点保存在 malloc_statebins 成员中,紧接在 small bin 后面

除了这四种 bin 以外,还有一个 per thread 的 tcache 机制,结构和 fast bin 类似,每个 bin 对应固定大小的空闲块,用单向链表维护,链表头指针保存在 tcacheentries 成员。

内存在分配器中流转的过程大致如下:

  1. 一开始从 top chunk 当中被分配出来
  2. 被 free 了以后,进入 tcache,或者 fast bin,或者合并后放到 unsorted bin,或者合并到 top chunk
  3. 在 malloc 的时候,从 tcache 或者 fast bin 分配,又或者从 unsorted bin 中取出,放到 small bin 或 large bin,中途可能被分配、拆分或者合并
flowchart TD top_chunk[top chunk] user[user allocated] tcache fast_bin[fast bin] small_bin[small bin] large_bin[large bin] unsorted_bin[unsorted bin] top_chunk -->|malloc| user tcache -->|malloc| user fast_bin -->|malloc| user fast_bin -->|malloc| tcache fast_bin -->|consolidate| unsorted_bin fast_bin -->|consolidate| top_chunk small_bin -->|malloc| user small_bin -->|malloc| tcache large_bin -->|malloc| user large_bin -->|malloc| unsorted_bin unsorted_bin -->|malloc| user unsorted_bin -->|malloc| tcache unsorted_bin -->|malloc| small_bin unsorted_bin -->|malloc| large_bin user -->|free| tcache user -->|free| fast_bin user -->|free| unsorted_bin user -->|free| top_chunk small_bin -->|free| unsorted_bin large_bin -->|free| unsorted_bin

malloc

malloc 的完整流程图如下:

---config: theme: base themeVariables: fontSize: "30px"---flowchart TD malloc[malloc 入口(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3022">malloc.c:3022</a>)] tcache[尝试从 tcache 中分配空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3047">malloc.c:3047</a>)] fastbin[尝试从 fast bin 中分配空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3577">malloc.c:3577</a>)] fastbin_migrate[如果 fast bin 还有空闲块且 tcache 没有满,则把空闲块从 fast bin 挪到 tcache(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3596">malloc.c:3596</a>)] smallbin[尝试从 small bin 中分配空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3635">malloc.c:3635</a>)] smallbin_migrate[如果 small bin 还有空闲块且 tcache 没有满,则把空闲块从 small bin 挪到 tcache(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3652">malloc.c:3652</a>)] malloc_ret[malloc 结束] consolidate[consolidate:遍历 fast bin,把空闲块和相邻的空闲块合并,插入到 unsorted bin(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L4440">malloc.c:4440</a>)] consolidate_2[consolidate:遍历 fast bin,把空闲块和相邻的空闲块合并,插入到 unsorted bin(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L4440">malloc.c:4440</a>)] check_large[块大小是否对应 large bin(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3695">malloc.c:3695</a>)] loop[开始循环(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3725">malloc.c:3725</a>)] unsorted_bin[遍历 unsorted bin(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3728">malloc.c:3728</a>)] remainder[内存局部性优化:最近一次 split 出来的 chunk 是否有足够的空间分配(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3756">malloc.c:3756</a>)] remainder_success[拆分这个 chunk 以完成分配(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3761">malloc.c:3761</a>)] remove_unsorted_bin[把当前 chunk 从 unsorted bin 中删除(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3784">malloc.c:3784</a>)] check_same_size[当前空闲块大小和要分配的大小是否相同(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3792">malloc.c:3792</a>)] check_tcache_full[tcache bin 是否已满(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3800">malloc.c:3800</a>)] alloc_now[分配当前空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3807">malloc.c:3807</a>)] move_to_tcache[把当前空闲块挪到 tcache(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3803">malloc.c:3803</a>)] move_to_bin[根据当前空闲块的大小,挪到 small bin 或 large bin(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3821">malloc.c:3821</a>)] check_tcache[检查 tcache 是否有空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3891">malloc.c:3891</a>)] alloc_tcache[从 tcache 分配空闲块] unsorted_bin_exit[检查 tcache 是否有空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3906">malloc.c:3906</a>)] check_large_alloc[块大小是否对应 large bin(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3917">malloc.c:3917</a>)] alloc_large[尝试从 large bin 中分配空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3922">malloc.c:3922</a>)] alloc_larger[遍历更大的 bin,尝试分配空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L3990">malloc.c:3990</a>)] alloc_top[尝试从 top chunk 分配空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L4087">malloc.c:4087</a>)] alloc_system[从系统请求更多内存来分配空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L4139">malloc.c:4139</a>)] check_fast[fast bin 是否有空闲块(<a href="https://github.com/bminor/glibc/blob/glibc-2.31/malloc/malloc.c#L4126">malloc.c:4126</a>)] malloc --> tcache tcache -->|分配成功| malloc_ret tcache -->|分配失败| fastbin fastbin -->|分配失败| smallbin fastbin -->|分配成功| fastbin_migrate  fastbin_migrate --> malloc_ret smallbin -->|分配成功| smallbin_migrate smallbin_migrate --> malloc_ret smallbin -->|分配失败| check_large check_large -->|是| consolidate check_large -->|否| loop consolidate --> loop loop --> unsorted_bin unsorted_bin -->|unsorted bin 非空,或者循环次数不足 10000 次| remainder remainder -->|是| remainder_success remainder_success --> malloc_ret remainder -->|否| remove_unsorted_bin remove_unsorted_bin --> check_same_size check_same_size -->|是| check_tcache_full check_tcache_full -->|否| move_to_tcache check_tcache_full --> |是| alloc_now alloc_now --> malloc_ret move_to_tcache --> unsorted_bin check_same_size -->|否| move_to_bin move_to_bin --> check_tcache check_tcache -->|是| alloc_tcache alloc_tcache --> malloc_ret check_tcache -->|否| unsorted_bin unsorted_bin -->|unsorted bin 已空,或者循环次数超过 10000 次| unsorted_bin_exit unsorted_bin_exit -->|是| alloc_tcache unsorted_bin_exit -->|否| check_large_alloc check_large_alloc -->|是| alloc_large alloc_large -->|分配成功| malloc_ret alloc_large -->|分配失败| alloc_larger check_large_alloc -->|否| alloc_larger alloc_larger -->|分配成功| malloc_ret alloc_larger -->|分配失败| alloc_top alloc_top -->|分配成功| malloc_ret alloc_top -->|分配失败| check_fast check_fast -->|是| consolidate_2 consolidate_2 --> loop check_fast -->|否| alloc_system alloc_system --> malloc_ret

前面分段整理了 malloc 的实现,在这里列出完整的 malloc 流程:

  1. malloc 的入口是 __libc_malloc (size_t bytes) 函数
  2. 如果配置了 malloc hook,则调用 malloc hook,直接返回结果
  3. 根据用户传入的 malloc 的字节数(bytes),用 checked_request2size 计算出实际的 chunk 大小,算法是先加上 sizeof(size_t)(给 mchunk_size 预留空间),然后向上对齐到 MALLOC_ALIGNMENT(通常是 2 * sizeof(size_t))的倍数,再和 MINSIZE 取 max,其中 MINSIZE 通常是 4 * sizeof(size_t),因为空闲块至少要保存两个 size 加上双向链表的 fdbk 指针
  4. 如果 tcache 还没初始化,就初始化 tcache
  5. 根据 chunk 大小,计算 tcache index,检查对应的 bin 是否有空闲块;如果有,直接分配空闲块并返回
  6. 接着获取一个 arena,如有必要,获取 arena 的锁;在单线程情况下,只有一个 main_arena;多线程情况下,每个线程有一个默认的 arena 指针(static __thread mstate thread_arena),在遇到 lock contention 的时候可以动态切换
  7. 进入 _int_malloc 从 arena 中分配一个 chunk,分配完成后释放 arena 的锁
  8. 接着分析 _int_malloc 的实现,除 tcache 以外的大部分逻辑都在 _int_malloc 函数中
  9. 判断 chunk size 大小,如果对应 fast bin 的块大小,在对应的 fast bin 的单向链表中寻找空闲块;如果链表非空,则取出链表头的空闲块,作为分配给 malloc 调用者的块,接着把 fast bin 链表上剩余的空闲块挪到 tcache 当中,直到 fast bin 链表空或者 tcache 满为止,然后函数结束
  10. 判断 chunk size 大小,如果对应 small bin 的块大小,在对应的 small bin 的双向链表中寻找空闲块;如果链表非空,则取出链表尾的空闲块,作为分配给 malloc 调用者的块,接着把 small bin 链表上剩余的空闲块挪到 tcache 当中,直到 small bin 链表空或者 tcache 满为止,然后函数结束
  11. 判断 chunk size 大小,如果对应 large bin 的块大小,则进行一次 malloc_consolidate:遍历 fast bin 每一个 bin 的每一个空闲块,尝试把它和内存上相邻的前后空闲块合并,合并后的空闲块放入 unsorted bin;特别地,如果空闲块和 top chunk 相邻,就会直接合并到 top chunk,这样就不需要把空闲块放入 unsorted bin
  12. 开始一个大的无限循环 for (;;),如果后续尝试各种方式都分配不成功,但是 fast bin 还有空闲块,在 malloc_consolidate 后会从这里开始再尝试一次分配
  13. 遍历 unsorted bin,最多处理 10000 个空闲块:
    1. 如果空闲块的大小对应 small bin,并且它是最近刚 split 出来的空闲块,并且可以放得下要分配的块,就原地把这个空闲块进行拆分,前面的部分是分配给 malloc 调用者的块,后面的部分则放回到 unsorted bin,然后函数结束
    2. 把空闲块从 unsorted bin 链表中删除
    3. 如果空闲块的大小正好是要分配的块的大小,判断 tcache 是否还有空间;如果 tcache 已经满了,直接把这个空闲块作为分配给 malloc 调用者的块,然后函数结束;如果 tcache 还没满,则先把空闲块挪到 tcache 当中,继续处理 unsorted bin 的其他空闲块,这样做的目的是尽量把 tcache 填满
    4. 根据空闲块的大小,插入到对应的 small bin 或者 large bin 当中
    5. 记录插入到 small bin 或者 large bin 的空闲块个数,如果超过了阈值,并且之前已经找到一个空闲块的大小正好是要分配的块的大小,同时挪到了 tcache 当中,则立即把这个空闲块从 tcache 中取出并分配给 malloc 调用者,然后函数结束;这样做的目的是避免处理太多无关的 unsorted bin 中的空闲块,导致 malloc 调用时间过长
  14. 如果在遍历 unsorted bin 过程中找到了和要分配的块一样大的空闲块,那么这个空闲块已经在 tcache 当中了,则立即把这个空闲块从 tcache 中取出并分配给 malloc 调用者,然后函数结束
  15. 判断 chunk size 大小,如果属于 large bin 的块大小,则找到对应的 large bin,从小到大通过 nextsize 链表遍历 large bin 中的空闲块,找到一个足够大的空闲块,对它进行拆分,前面的部分是分配给 malloc 调用者的块,后面的部分则放回到 unsorted bin,然后函数结束
  16. 根据 chunk size 大小,找到对应的 small bin 或者 large bin,然后从小到大遍历各个 bin(可能从 small bin 一路遍历到 large bin),通过 bitmap 跳过那些空的 bin,找到第一个非空的 bin 的空闲块,对它进行拆分,前面的部分是分配给 malloc 调用者的块,后面的部分则放回到 unsorted bin,然后函数结束
  17. 如果 top chunk 足够大,则对它进行拆分,前面的部分是分配给 malloc 调用者的块,后面的部分成为新的 top chunk,然后函数结束
  18. 如果此时 fast bin 有空闲块,调用 malloc_consolidate,然后回到无限循环的开头再尝试一次分配
  19. 最后的兜底分配方法:调用 sysmalloc,通过 mmap 或 sbrk 向操作系统申请更多的内存

free

在这里列出完整的 free 流程:

  1. free 的入口是 __libc_free (void *mem) 函数
  2. 如果配置了 free hook,则调用 free hook,直接返回
  3. 如果是调用 free(NULL),直接返回
  4. 检查 mchunk_sizeIS_MAPPED 字段,如果它之前是通过 mmap 分配的,那么对它进行 munmap,然后返回
  5. 如果 tcache 还没初始化,就初始化 tcache
  6. 找到这个 chunk 从哪个 arena 分配的:检查 mchunk_sizeNON_MAIN_ARENA 字段,如果它不是从 main arena 分配的,则根据 chunk 的地址,找到 heap 的地址(heap 的大小是有上限的,并且 heap 的起始地址是对齐到 HEAP_MAX_SIZE 的整倍数边界的),再根据 heap 开头的 heap_info 找到 arena 的地址
  7. 进入 _int_free,接着分析 _int_free 的实现
  8. 根据 chunk size 找到对应的 tcache bin,如果它还没有满,则把空闲块放到 tcache 当中,然后返回
  9. 判断 chunk size 大小,如果对应 fast bin 的块大小,把空闲块放到对应的 fast bin 的单向链表中,然后返回;注意此时没有获取 arena 的锁,所以 fast bin 的操作会用到原子指令,同理 malloc 中对 fast bin 的操作也要用到原子指令,即使 malloc 持有了 arena 的锁
  10. 获取 arena 的锁,尝试把空闲块和在内存中相邻的前后空闲块进行合并,合并后的空闲块放入 unsorted bin;合并时,如果被合并的空闲块已经在 small bin 或者 large bin 当中,利用双向链表的特性,把它从双向链表中删除;如果和 top chunk 相邻,则可以直接合并到 top chunk 上,然后返回
  11. 如果释放的块比较大,超过了阈值,则触发一次 malloc_consolidate

各种常量的默认值

下面给出 glibc 内存分配器各常量在 64 位下的默认值:

  1. MALLOC_ALIGNMENT = max(2 * SIZE_SZ, __alignof__ (long double)) 等于 16
  2. MIN_CHUNK_SIZE = offsetof(struct malloc_chunk, fd_nextsize) 等于 32
  3. MINSIZE = alignUp(MIN_CHUNK_SIZE, MALLOC_ALIGNMENT) 等于 32
  4. MAX_FAST_SIZE = 80 * SIZE_SZ / 4 等于 160
  5. NSMALLBINS = 64
  6. MIN_LARGE_SIZE = (NSMALLBINS - SMALLBIN_CORRECTION) * SMALLBIN_WIDTH 等于 1024
  7. DEFAULT_MMAP_THRESHOLD_MIN = 128 * 1024 即 128KB
  8. DEFAULT_MMAP_THRESHOLD_MAX = 4 * 1024 * 1024 * sizeof(long) 即 32MB
  9. HEAP_MIN_SIZE = 32 * 1024 即 32KB
  10. HEAP_MAX_SIZE = 2 * DEFAULT_MMAP_THRESHOLD_MAX 即 64MB
  11. TCACHE_MAX_BINS = 64
  12. TCACHE_FILL_COUNT = 7
  13. NFASTBINS = fastbin_index(request2size(MAX_FAST_SIZE)) + 1 即 10
  14. NBINS = 128
  15. DEFAULT_MXFAST = 64 * sizeof(size_t) / 4 即 128

默认情况下各个 bin 负责的块大小范围:

  1. tcache: 块大小不超过 1040 字节,对应 malloc(1032) 或更小
  2. fast bin: 块大小不超过 128 字节,对应 malloc(120) 或更小
  3. small bin: 块大小不超过 1008 字节,对应 malloc(1000) 或更小
  4. large bin: 块大小不小于 1024 字节,不超过 131056 字节,对应 malloc(1001)malloc(131048) 的范围,更大的内存分配会直接走 mmap

性能优化

简单总结一下 glibc 内存分配器的各种性能优化特性:

  1. tcache 作为一个 thread local 的结构,不需要锁,性能是最好的,所以尽量把空闲块都丢到 tcache 里面,无论是刚 free 的空闲块,还是在 malloc 过程中,顺带把一些空闲块从 fast bin 或者 small bin 丢到 tcache 里,这样也减少了 lock arena 的次数
  2. fast bin 虽然不再是 thread local,但它在 free 路径上使用原子指令来代替锁,使得 free 在很多时候不需要获取 arena 的锁;而把 fast bin 的空闲块的合并操作挪到 malloc 中进行,此时 arena 的锁是 lock 状态,尽量在一次 lock 的临界区里做更多的事情,减少 lock 的次数
  3. small bin 和 large bin 的区分,主要是考虑到了分配的块的大小分布,越大倾向于越稀疏;代价是 large bin 需要额外维护 nextsize 链表来快速地寻找不同大小的空闲块
  4. 在回收 unsorted bin 的时候,会进行一个内存局部性优化,即倾向于连续地从同一个块中切出小块用于分配,适合在循环中分配内存的场景
  5. 回收 unsorted bin 时,如果遇到了正好和要分配的块大小相同的空闲块时,先不急着分配,而是丢到 tcache 中,然后继续往前回收若干个空闲块,直到 tcache 满了或者遇到了足够多的大小不同的空闲块为止:这是利用了 unsorted bin 中空闲块大小的局部性,有机会把一系列连续的相同大小的空闲块拿到 tcache 当中,并且限制了搜索的长度,避免带来过多额外的延迟
  6. 如果尝试了 unsorted bin、small bin、large bin 和 top chunk 都无法分配,最后再检查一次 fast bin 是否为空,如果是空的,则进行一次 consolidate,把 fast bin 里的空闲块丢到 unsorted bin 中,再重新尝试分配一次:注意这整个过程 malloc 都是持有 arena 锁的,而 fast bin 在 free 中的写入是不需要持有 arena 锁的,而是直接用原子指令更新,所以这是考虑到其他线程在同时往同一个 arena free 的情况
  7. 在合并相邻空闲块的时候,被合并的空闲块可能已经在 unsorted bin、small bin 或者 large bin 当中,为了能够把空闲块从这些 bin 里删除,用双向链表来实现 O(1) 时间的删除

参考

Android Runtime 解释器的实现探究

2025-03-06 08:00:00

Android Runtime 解释器的实现探究

背景

V8 Ignition 解释器的内部实现探究 中探究了 JavaScript 引擎 V8 的解释器的实现,接下来分析一下 Android Runtime (ART) 的解释器,其原理也是类似的。本博客在 ARM64 Ubuntu 24.04 平台上针对 Android Runtime (ART) 15.0.0 r1 版本进行分析。

Dalvik Bytecode

在分析解释器的代码前,需要先了解一下解释器的输入,也就是它执行的字节码格式是什么。Android Runtime 继承和发展了 Dalvik VM 的字节码 Dalvik Bytecode 格式,因此在打包 Android 应用的时候,Java 代码最终会被翻译成 Dalvik Bytecode。

接下来来实践一下这个过程,从 Java 代码到 Dalvik Bytecode:

第一步是编写一个简单的 Java 程序如下,保存到 MainActivity.java

public class MainActivity { public static void main(String[] args) { System.out.println("Hello, world!"); } public static int add(int a, int b) { return a + b; }}

首先用 javac MainActivity.java 命令把源码编译到 Java Bytecode。可以用 javap -c MainActivity.class 查看生成的 Java Bytecode:

Compiled from "MainActivity.java"public class MainActivity { public MainActivity(); Code: 0: aload_0 1: invokespecial #1 // Method java/lang/Object."<init>":()V 4: return public static void main(java.lang.String[]); Code: 0: getstatic #7 // Field java/lang/System.out:Ljava/io/PrintStream; 3: ldc #13 // String Hello, world! 5: invokevirtual #15 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 8: return public static int add(int, int); Code: 0: iload_0 1: iload_1 2: iadd 3: ireturn}

Java Bytecode 是个典型的栈式字节码,因此从 int add(int, int) 函数可以看到,它分别压栈第零个和第一个局部遍变量(即参数 ab),然后用 iadd 指令从栈顶弹出两个元素,求和后再把结果压栈。

接着,用 Android SDK 的 Build Tools 提供的命令 d8 来把它转换为 Dalvik Bytecode。如果你还没有安装 Android SDK,可以按照 sdkmanager 文档 来安装 sdkmanager,再用 sdkmanager 安装较新版本的 build-tools。转换的命令为 $ANDROID_HOME/build-tools/$VERSION/d8 MainActivity.class,结果会保存在当前目录的 classes.dex 文件内。接着可以用 $ANDROID_HOME/build-tools/$VERSION/dexdump -d classes.dex 来查看 Dalvik Bytecode:

Processing 'classes.dex'...Opened 'classes.dex', DEX version '035'Class #0 - Class descriptor : 'LMainActivity;' Access flags : 0x0001 (PUBLIC) Superclass : 'Ljava/lang/Object;' Interfaces - Static fields - Instance fields - Direct methods - #0 : (in LMainActivity;) name : '<init>' type : '()V' access : 0x10001 (PUBLIC CONSTRUCTOR) code - registers : 1 ins : 1 outs : 1 insns size : 4 16-bit code units00016c: |[00016c] MainActivity.<init>:()V00017c: 7010 0400 0000 |0000: invoke-direct {v0}, Ljava/lang/Object;.<init>:()V // method@0004000182: 0e00 |0003: return-void catches : (none) positions : 0x0000 line=1 locals : 0x0000 - 0x0004 reg=0 this LMainActivity; #1 : (in LMainActivity;) name : 'add' type : '(II)I' access : 0x0009 (PUBLIC STATIC) code - registers : 2 ins : 2 outs : 0 insns size : 2 16-bit code units000158: |[000158] MainActivity.add:(II)I000168: b010 |0000: add-int/2addr v0, v100016a: 0f00 |0001: return v0 catches : (none) positions : 0x0000 line=7 locals : 0x0000 - 0x0002 reg=0 (null) I 0x0000 - 0x0002 reg=1 (null) I #2 : (in LMainActivity;) name : 'main' type : '([Ljava/lang/String;)V' access : 0x0009 (PUBLIC STATIC) code - registers : 2 ins : 1 outs : 2 insns size : 8 16-bit code units000184: |[000184] MainActivity.main:([Ljava/lang/String;)V000194: 6201 0000 |0000: sget-object v1, Ljava/lang/System;.out:Ljava/io/PrintStream; // field@0000000198: 1a00 0100 |0002: const-string v0, "Hello, world!" // string@000100019c: 6e20 0300 0100 |0004: invoke-virtual {v1, v0}, Ljava/io/PrintStream;.println:(Ljava/lang/String;)V // method@00030001a2: 0e00 |0007: return-void catches : (none) positions : 0x0000 line=3 0x0007 line=4 locals : 0x0000 - 0x0008 reg=1 (null) [Ljava/lang/String; Virtual methods - source_file_idx : 9 (MainActivity.java)

对比 Java Bytecode,在 Dalvik Bytecode 里的 add 函数的实现就大不相同了:

  1. add-int/2addr v0, v1: 求寄存器 v1 和寄存器 v0 之和,在这里就对应 ab 两个参数,结果写到 v0 寄存器当中
  2. return v0: 以寄存器 v0 为返回值,结束当前函数

可见 Dalvik Bytecode 采用的是类似 V8 的基于寄存器的字节码,不过没有 V8 的 accumulator

Dalvik Bytecode 的完整列表见 Dalvik bytecode format,它的格式基本上是两个字节为一组,每组里第一个字节代表 Op 类型,第二个字节代表参数,有一些 Op 后面还会带有多组参数。

例如上面的 add-int/2addr vA, vB 指令的编码是:

  1. 第一个字节是 0xb0,表示这是一个 add-int/2addr Op
  2. 第二个字节共 8 位,低 4 位编码了 vA 的寄存器编号 A,高 4 位编码了 vB 的寄存器编号 B

所以 add-int/2addr v0, v1 的编码就是 0xb0, 0 | (1 << 4)0xb0, 0x10。因为存得很紧凑,寄存器编号只有 4 位,所以这个 Op 的操作数不能访问 v16 或更高的寄存器。

return vAA 指令的编码类似,不过因为只需要编码一个操作数,所以有 8 位可以编码返回值用哪个寄存器;为了区分是 4 位的编码还是 8 位的编码,这里用 vAA 表示可以用 8 位来记录寄存器编号。return vAA 的第一个字节是 0x0f 表示 Op 类型,第二个字节就是寄存器编号 A。上面出现的 return v0 的编码就是 0x0f, 0x00

一些比较复杂的 Op 会附带更多的参数,此时编码就可能涉及到更多的字节。比如 invoke-virtual {vC, vD, vE, vF, vG}, meth@BBBB,可以携带可变个寄存器参数,在编码的时候,格式如下:

  1. 第一个字节 0x6e 表示这是一个 invoke-virtual Op
  2. 第二个字节的高 4 位记录了参数个数
  3. 第三和第四个字节共 16 位,记录了要调用的函数的 index,这个 index 会被拿来索引 DEX 的 method_ids 表
  4. 第五和第六个字节共 16 位,配合第二个字节的低 4 位,最多可以传递 5 个寄存器参数,每个寄存器参数 4 位

因此在上面的代码中,invoke-virtual {v1, v0}, Ljava/io/PrintStream;.println:(Ljava/lang/String;)V // method@0003 被编码为:0x6e, 0x20, 0x03, 0x00, 0x01, 0x00。另外构造了一个例子,把五个参数都用上:invoke-virtual {v1, v4, v0, v2, v3}, LMainActivity;.add4:(IIII)I // method@0002 被编码为 0x6e, 0x53, 0x02, 0x00, 0x41, 0x20,可以看到五个参数的编码顺序是第五个字节的低 4 位(v1)和高 4 位(v4),第六个字节的低 4 位(v0)和高 4 位(v2),最后是第二个字节的低 4 位(v3)。

了解了 Dalvik Bytecode 的结构,接下来观察它是怎么被解释执行的。

解释器

Android Runtime (ART) 的解释器放在 runtime/interpreter 目录下。如果进行一些考古,可以看到这个解释器的实现是从更早的 Dalvik VM 来的。它有两种不同的解释器实现:

第一个解释器基于 switch-case 的 C++ 代码实现,其逐个遍历 Op,根据 Op 的类型 Opcode 执行相应的操作,类似下面的代码:

for (each op of current function) { switch (op.opcode) { case op_add: // implement add here break; // ... other opcode handlers }}

第二个解释器以 Token threading 的方式实现,每种 Op 对应一段代码。这段代码在完成 Op 的操作后,读取下一个 Op,再间接跳转到下一个 Op 对应的代码。其工作原理类似下面的代码,这里 goto * 是 GNU C 的扩展,对应间接跳转指令,其目的地址取决于 handlers[next_opcode] 的值,意思是根据下一个 op 的 Opcode,找到对应的 handler,直接跳转过去:

 // op handlers array handlers = {&op_add, &op_sub};op_add: // implement add here // read next opcode here goto *handlers[next_opcode];

实际实现的时候更进一步,用汇编实现各个 op handler,并把 handler 放在了 128 字节对齐的位置,保证每个 handler 不超过 128 个字节,从而把读取 handlers 数组再跳转的 goto * 改成了用乘法和加法计算出 handler 的地址再跳转(computed goto):

handlers_begin:op_add: .balign 128 # implement add here # read next opcode here jmp to (handlers_begin + 128 * next_opcode);op_sub: .balign 128 # implement add here # read next opcode here jmp to (handlers_begin + 128 * next_opcode);

下面结合源码来具体分析这两种解释器的实现。

基于 switch-case 的解释器

目前 Android Runtime 包括一个基于 switch-case 的解释器,实现在 runtime/interpreter/interpreter_switch_impl-inl.h 文件当中,它的核心逻辑就是一个循环套 switch-case:

 while (true) { const Instruction* const inst = next; dex_pc = inst->GetDexPc(insns); shadow_frame.SetDexPC(dex_pc); TraceExecution(shadow_frame, inst, dex_pc); uint16_t inst_data = inst->Fetch16(0); bool exit = false; bool success; // Moved outside to keep frames small under asan. if (InstructionHandler<transaction_active, Instruction::kInvalidFormat>( ctx, instrumentation, self, shadow_frame, dex_pc, inst, inst_data, next, exit). Preamble()) { DCHECK_EQ(self->IsExceptionPending(), inst->Opcode(inst_data) == Instruction::MOVE_EXCEPTION); switch (inst->Opcode(inst_data)) {#define OPCODE_CASE(OPCODE, OPCODE_NAME, NAME, FORMAT, i, a, e, v) \ case OPCODE: { \ next = inst->RelativeAt(Instruction::SizeInCodeUnits(Instruction::FORMAT)); \ success = OP_##OPCODE_NAME<transaction_active>( \ ctx, instrumentation, self, shadow_frame, dex_pc, inst, inst_data, next, exit); \ if (success && LIKELY(!interpret_one_instruction)) { \ continue; \ } \ break; \ } DEX_INSTRUCTION_LIST(OPCODE_CASE)#undef OPCODE_CASE } } // exit condition handling omitted }

代码中使用了 X macro 的编程技巧:如果你需要在不同的地方重复出现同一个 list,比如在这里,就是所有可能的 Opcode 类型,你可以在一个头文件中用一个宏,以另一个宏为参数去列出来:

// V(opcode, instruction_code, name, format, index, flags, extended_flags, verifier_flags);#define DEX_INSTRUCTION_LIST(V) \ V(0x00, NOP, "nop", k10x, kIndexNone, kContinue, 0, kVerifyNothing) \ V(0x01, MOVE, "move", k12x, kIndexNone, kContinue, 0, kVerifyRegA | kVerifyRegB) \ // omitted

这个宏定义在 libdexfile/dex/dex_instruction_list.h 头文件当中。在使用的时候,临时定义一个宏,然后把新定义的宏传入 DEX_INSTRUCTION_LIST 的参数即可。例如要生成一个数组,记录所有的 op 名字,可以:

// taken from libdexfile/dex/dex_instruction.ccconst char* const Instruction::kInstructionNames[] = {#define INSTRUCTION_NAME(o, c, pname, f, i, a, e, v) pname,#include "dex_instruction_list.h" DEX_INSTRUCTION_LIST(INSTRUCTION_NAME)#undef DEX_INSTRUCTION_LIST#undef INSTRUCTION_NAME};

这段代码经过 C 预处理器,首先会被展开为:

const char* const Instruction::kInstructionNames[] = {#define INSTRUCTION_NAME(o, c, pname, f, i, a, e, v) pname, INSTRUCTION_NAME(0x00, NOP, "nop", k10x, kIndexNone, kContinue, 0, kVerifyNothing) \ INSTRUCTION_NAME(0x01, MOVE, "move", k12x, kIndexNone, kContinue, 0, kVerifyRegA | kVerifyRegB) \ // omitted#undef INSTRUCTION_NAME};

继续展开,就得到了想要留下的内容:

const char* const Instruction::kInstructionNames[] = { "nop", "move", // omitted};

回到 switch-case 的地方,可以预见到,预处理会生成的代码大概是:

switch (inst->Opcode(inst_data)) { case 0x00: { \ next = inst->RelativeAt(Instruction::SizeInCodeUnits(Instruction::k10x)); \ success = OP_NOP<transaction_active>( \ ctx, instrumentation, self, shadow_frame, dex_pc, inst, inst_data, next, exit); \ if (success && LIKELY(!interpret_one_instruction)) { \ continue; \ } \ break; \ } // omitted}

其中 next = inst->RelativeAt(Instruction::SizeInCodeUnits(Instruction::k10x)); 语句根据当前 Op 类型计算出它会占用多少个字节,从而得到下一个 Op 的地址。之后就是调用 OP_NOP 函数来进行实际的操作了。当然了,这个实际的操作,还是需要开发者去手动实现(OP_NOP 函数会调用下面的 NOP 函数):

HANDLER_ATTRIBUTES bool NOP() { return true;}HANDLER_ATTRIBUTES bool MOVE() { SetVReg(A(), GetVReg(B())); return true;}HANDLER_ATTRIBUTES bool ADD_INT() { SetVReg(A(), SafeAdd(GetVReg(B()), GetVReg(C()))); return true;}

基于 token threading 的解释器 mterp (nterp)

第二个解释器则是基于 token threading 的解释器,它的源码在 runtime/interpreter/mterp 目录下。由于这些代码是用汇编写的,直接写会有很多重复的部分。为了避免重复的代码,目前的解释器 mterp (现在叫 nterp) 用 Python 脚本来生成最终的汇编代码。要生成它,也很简单:

cd runtime/interpreter/mterp./gen_mterp.py mterp_arm64ng.S arm64ng/*.S

这个脚本是平台无关的,例如如果要生成 amd64 平台的汇编,只需要:

cd runtime/interpreter/mterp./gen_mterp.py mterp_x86_64ng.S x86_64ng/*.S

这样就可以看到完整的汇编代码了,后续的分析都会基于这份汇编代码。如果读者对 amd64 汇编比较熟悉,也可以在本地生成一份 amd64 的汇编再结合本文进行理解。

上面提到过 add-int/2addr vA, vB 这个做整数加法的 Op,直接在生成的汇编中,找到它对应的代码:

 .balign NTERP_HANDLER_SIZE.L_op_add_int_2addr: /* 0xb0 */ /* omitted */ /* binop/2addr vA, vB */ lsr w3, wINST, #12 // w3<- B ubfx w9, wINST, #8, #4 // w9<- A GET_VREG w1, w3 // w1<- vB GET_VREG w0, w9 // w0<- vA FETCH_ADVANCE_INST 1 // advance rPC, load rINST add w0, w0, w1 // w0<- op, w0-w3 changed GET_INST_OPCODE ip // extract opcode from rINST SET_VREG w0, w9 // vAA<- w0 GOTO_OPCODE ip // jump to next instruction /* omitted */

其中 wINST 表示当前 Op 的前两个字节的内容,前面提到,add-int/2addr vA, vB 编码为两个字节,第一个字节是固定的 0xb0,第二个字节共 8 位,低 4 位编码了 vA 的寄存器编号 A,高 4 位编码了 vB 的寄存器编号 B。由于这是小端序的处理器,那么解释为 16 位整数,从高位到低位依次是:4 位的 B,4 位的 A 和 8 位的 0xb0。知道这个背景以后,再来分析每条指令做的事情,就很清晰:

  1. lsr w3, wINST, #12:求 wINST 右移动 12 位,得到了 B
  2. ubfx w9, wINST, #8, #4: ubfx 是 Bit Extract 指令,这里的意思是从 wINST 第 8 位开始取 4 位数据出来,也就是 A
  3. GET_VREG w1, w3: 读取寄存器编号为 w3 的值,写到 w1 当中,结合第一条指令,可知此时 w1 等于 B 寄存器的值
  4. GET_VREG w0, w9: 读取寄存器编号为 w9 的值,写到 w0 当中,结合第二条指令,可知此时 w0 等于 A 寄存器的值
  5. FETCH_ADVANCE_INST 1: 把 "PC" 往前移动 1 个单位的距离,也就是两个字节,并读取下一个 Op 到 rINST 当中
  6. add w0, w0, w1: 进行实际的整数加法运算,结果保存在 w0 当中
  7. GET_INST_OPCODE ip: 根据第五条指令读取的下一个 Op 的值 rINST,解析出它的 Opcode
  8. SET_VREG w0, w9: 把整数加法的结果写回到寄存器编号为 w9 的寄存器当中,结合第二条指令,可知写入的是 A 寄存器
  9. GOTO_OPCODE ip: 跳转到下一个 Op 对应的 handler

整体代码还是比较清晰的,只是说把计算 A + B 写入 A 的过程和读取下一个 Op 并跳转的逻辑穿插了起来,手动做了一次寄存器调度。那么这些 GET_REGFETCH_ADVANCE_INST 等等具体又是怎么实现的呢?下面把宏展开后的代码贴出来:

 .balign NTERP_HANDLER_SIZE.L_op_add_int_2addr: /* 0xb0 */ /* omitted */ /* binop/2addr vA, vB */ // wINST is w23, the first 16-bit code unit of current instruction // lsr w3, wINST, #12 // w3<- B lsr w3, w23, #12  // ubfx w9, wINST, #8, #4 // w9<- A ubfx w9, w23, #8, #4  // virtual registers are stored relative to xFP(x29), the interpreted frame pointer, used for accessing locals and args // GET_VREG w1, w3 // w1<- vB ldr w1, [x29, w3, uxtw #2]  // GET_VREG w0, w9 // w0<- vA ldr w0, [x29, w9, uxtw #2]  // xPC(x22) is the interpreted program counter, used for fetching instructions // FETCH_ADVANCE_INST 1 // advance rPC, load rINST // a pre-index load instruction that both reads wINST from memory and increments xPC(x22) by 2 ldrh w23, [x22, #2]! // add w0, w0, w1 // w0<- op, w0-w3 changed add w0, w0, w1  // ip(x16) is a scratch register, used to store the first byte (opcode) of wINST // GET_INST_OPCODE ip // extract opcode from rINST and x16, x23, 0xff // save addition result to virtual register, which is relative to xFP(x29) // also set its object references to zero, which is relative to xREFS(x25) // SET_VREG w0, w9 // vAA<- w0 str w0, [x29, w9, uxtw #2] str wzr, [x25, w9, uxtw #2] // now x16 saves the opcode, and xIBASE(x24) interpreted instruction base pointer, used for computed goto // for opcode #k, the handler address of it would be `xIBASE + k * 128` // GOTO_OPCODE ip // jump to next instruction add x16, x24, x16, lsl #7 br x16 /* omitted */

各个寄存器的含义已经在上面的注释中写出,比如 w23 记录了当前 Op 的前 16 位的内容,x29 记录了当前的 frame pointer,通过它可以访问各个 virtual register;x11 是 PC,记录了正在执行的 Op 的地址;x24 记录了这些 op handler 的起始地址,由于每个 handler 都不超过 128 字节,且都对齐到 128 字节边界(.balign NTERP_HANDLER_SIZE),所以只需要简单的运算 xIBASE + opcode * 128 即可找到下一个 op 的 handler 地址,不需要再进行一次访存。

如果要比较一下 Android Runtime 的 mterp (nterp) 和 V8 的 Ignition 解释器的实现,有如下几点相同与不同:

  1. 两者都采用了 token threading 的方法,即在一个 Op 处理完成以后,计算出下一个 Op 的 handler 的地址,跳转过去
  2. V8 的 op handler 是动态生成的(mksnapshot 阶段),长度没有限制,允许生成比较复杂的汇编,但如果汇编比较短(比如 release 模式下),也可以节省一些内存;代价是需要一次额外的对 dispatch table 的访存,来找到 opcode 对应的 handler
  3. mterp 的 op handler 对齐到 128B 边界,带来的好处是不需要访问 dispatch table,直接根据 opcode 计算地址即可,不过由于很多 handler 很短,可能只有十条指令左右,就会浪费了一些内存
  4. V8 没有 handler 长度的限制,所以针对一些常见的 Op 做了优化(Short Star),可以减少一些跳转的开销
  5. V8 在区分 Smi(Small integer) 和对象的时候,做法是在 LSB 上打标记:0 表示 Smi,1 表示对象;mterp 则不同,它给每个虚拟寄存器维护了两个 32 位的值:一个保存在 xFP 指向的数组当中,记录的是它的实际的值,比如 int 的值,或者对象的引用;另一个保存在 xREFS 指向的数组当中,记录的是它引用的对象,如果不是对象,则记录的是 0

除了以上列举的不同的地方以外,其实整体来看是十分类似的,下面是二者实现把整数加载到寄存器(const/4 vA, #+BLdaSmi)的汇编的对比:

Operation mterp (nterp) Ignition
Extract Dest Register ubfx w0, w23, #8, #4 N/A (destination is always the accumulator)
Extract Const Integer sbfx w1, w23, #12, #4 add x1, x19, #1; ldrsb w1, [x20, x1]
Read Next Op ldrh w23, [x22, #2]! add x19, x19, #2; ldrb w3, [x20, x19]
Save Result str w1, [x29, w0, uxtw #2]; str wzr, [x25, w0, uxtw #2] add w0, w1, w1
Computed Goto and x16, x23, 0xff; add x16, x24, x16, lsl #7; br x16 ldr x2, [x21, x3, lsl #3]; mov x17, x2; br x2

在寄存器的约定和使用上的区别:

Purpose mterp (nterp) Ignition
Intepreter PC base + offset in x22 base in x20, offset in x19
Virtual Register relative to x29 relative to fp
Dispatch Table computed from x24 saved in x21

Lua 解释器

既然已经分析了 V8 和 Android Runtime 的解释器,也来简单看一下 Lua 的解释器实现。它写的非常简单,核心代码就在 lvm.c 当中:

vmdispatch (GET_OPCODE(i)) { vmcase(OP_MOVE) { StkId ra = RA(i); setobjs2s(L, ra, RB(i)); vmbreak; } vmcase(OP_LOADI) { StkId ra = RA(i); lua_Integer b = GETARG_sBx(i); setivalue(s2v(ra), b); vmbreak; } // ...}

可以看到,它把 switchcasebreak 替换成了三个宏 vmdispatchvmcasevmbreak。接下来看它可能的定义,第一种情况是编译器不支持 goto * 语法,此时就直接展开:

#define vmdispatch(o) switch(o)#define vmcase(l) case l:#define vmbreak break

如果编译器支持 goto * 语法,则展开成对应的 computed goto 形式:

#define vmdispatch(x) goto *disptab[x];#define vmcase(l) L_##l:#define vmbreak mfetch(); vmdispatch(GET_OPCODE(i));static const void *const disptab[NUM_OPCODES] = { &&L_OP_MOVE, &&L_OP_LOADI, // ...};

和之前写的解释器的不同实现方法对应,就不多阐述了。

参考

V8 Ignition 解释器的内部实现探究

2025-03-01 08:00:00

V8 Ignition 解释器的内部实现探究

背景

V8 是一个很常见的 JavaScript 引擎,运行在很多的设备上,因此想探究一下它内部的部分实现。本博客在 ARM64 Ubuntu 24.04 平台上针对 V8 12.8.374.31 版本进行分析。本博客主要分析了 V8 的 Ignition 解释器的解释执行部分。

编译 V8

首先简单过一下 v8 的源码获取以及编译流程,主要参考了 Checking out the V8 source codeCompiling on Arm64 Linux:

# setup depot_toolscd ~git clone https://chromium.googlesource.com/chromium/tools/depot_tools.gitexport PATH=$PWD/depot_tools:$PATH# clone v8 reposmkdir ~/v8cd ~/v8fetch v8cd v8# switch to specified taggit checkout 12.8.374.31gclient sync --verbose# install dependencies./build/install-build-deps.sh# install llvm 19wget https://mirrors.tuna.tsinghua.edu.cn/llvm-apt/llvm.shchmod +x llvm.shsudo ./llvm.sh 19 -m https://mirrors.tuna.tsinghua.edu.cn/llvm-aptrm llvm.shsudo apt install -y lld clang-19# fix incompatibilities with system clang 19sed -i "s/-Wno-missing-template-arg-list-after-template-kw//" build/config/compiler/BUILD.gn# compile v8 using system clang 19# for amd64: use x64.optdebug instead of arm64.optdebugtools/dev/gm.py arm64.optdebug --progress=verbose# d8 is compiled successfully at./out/arm64.optdebug/d8

如果不想编译 V8,也可以直接用 Node.JS 来代替 d8。不过 Node.JS 会加载很多 JS 代码,使得输出更加复杂,此时就需要手动过滤一些输出,或者通过命令行设置一些打印日志的过滤器。另外,后面有一些深入的调试信息,需要手动编译 V8 才能打开,因此还是建议读者上手自己编译一个 V8。

在 AMD64 上,默认会使用 V8 自带的 LLVM 版本来编译,此时就不需要额外安装 LLVM 19,也不需要修改 v8/build/config/compiler/BUILD.gn

解释器和编译器

通过 V8 的文档可以看到,V8 一共有这些解释器或编译器,按照其优化等级从小到大的顺序:

  1. Ignition: 解释器
  2. SparkPlug: 不优化的快速编译器,追求快的编译速度
  3. Maglev:做优化的编译器,寻求编译速度和编译质量的平衡
  4. TurboFan:做优化的编译器,寻求更好的编译质量

在 JS 的使用场景,不同代码被调用的次数以及对及时性的需求差别很大,为了适应不同的场景,V8 设计了这些解释器和编译器来提升整体的性能:执行次数少的代码,倾向于用更低优化等级的解释器或编译器,追求更低的优化开销;执行次数多的代码,当编译优化时间不再成为瓶颈,则倾向于用更高优化等级的编译器,追求更高的执行性能。

Ignition 解释器

分析样例 JS 代码

首先来观察一下 Ignition 解释器的工作流程。写一段简单的 JS 代码:

function add(a, b) { return a + b;}add(1, 2);

保存为 test.js,运行 ./out/arm64.optdebug/d8 --print-ast --print-bytecode test.js 以打印它的 AST 以及 Bytecode:

首先开始的是 top level 的 AST 以及 Bytecode,它做的事情就是:声明一个函数 add,然后以参数 (1, 2) 来调用它。

top level AST:

[generating bytecode for function: ]--- AST ---FUNC at 0. KIND 0. LITERAL ID 0. SUSPEND COUNT 0. NAME "". INFERRED NAME "". DECLS. . FUNCTION "add" = function add. EXPRESSION STATEMENT at 42. . kAssign at -1. . . VAR PROXY local[0] (0xc28698556308) (mode = TEMPORARY, assigned = true) ".result". . . CALL. . . . VAR PROXY unallocated (0xc28698556200) (mode = VAR, assigned = true) "add". . . . LITERAL 1. . . . LITERAL 2. RETURN at -1. . VAR PROXY local[0] (0xc28698556308) (mode = TEMPORARY, assigned = true) ".result"

首先声明了一个 add 函数,然后以 12 两个参数调用 add 函数,把结果绑定给局部变量 .result,最后以 .result 为结果返回。接下来看它会被翻译成什么字节码:

[generated bytecode for function: (0x1f8e002988f5 <SharedFunctionInfo>)]Bytecode length: 28Parameter count 1Register count 4Frame size 32 0x304700040048 @ 0 : 13 00 LdaConstant [0] 0x30470004004a @ 2 : c9 Star1 0x30470004004b @ 3 : 19 fe f7 Mov <closure>, r2 0x30470004004e @ 6 : 68 63 01 f8 02 CallRuntime [DeclareGlobals], r1-r2 0x304700040053 @ 11 : 21 01 00 LdaGlobal [1], [0] 0x304700040056 @ 14 : c9 Star1 0x304700040057 @ 15 : 0d 01 LdaSmi [1] 0x304700040059 @ 17 : c8 Star2 0x30470004005a @ 18 : 0d 02 LdaSmi [2] 0x30470004005c @ 20 : c7 Star3 0x30470004005d @ 21 : 66 f8 f7 f6 02 CallUndefinedReceiver2 r1, r2, r3, [2] 0x304700040062 @ 26 : ca Star0 0x304700040063 @ 27 : af ReturnConstant pool (size = 2)0x304700040011: [TrustedFixedArray] - map: 0x1f8e00000595 <Map(TRUSTED_FIXED_ARRAY_TYPE)> - length: 2 0: 0x1f8e00298945 <FixedArray[2]> 1: 0x1f8e000041dd <String[3]: #add>Handler Table (size = 0)Source Position Table (size = 0)

V8 的字节码采用的是基于寄存器的执行模型,而非其他很多字节码会采用的栈式。换句话说,每个函数有自己的若干个寄存器可供操作。每条字节码分为 Opcode(表示这条字节码要进行的操作)和操作数两部分。函数开头的 Register count 4 表明该函数有四个寄存器:r0-r3,此外还有一个特殊的 accumulator 寄存器,它一般不会出现在操作数列表中,而是隐含在 Opcode 内(Lda/Sta)。

完整的 Opcode 列表可以在 v8/src/interpreter/bytecodes.h 中找到,对应的实现可以在 v8/src/interpreter/interpreter-generator.cc 中找到。

上述字节码分为两部分,第一部分是声明 add 函数:

  1. LdaConstant [0]: 把 Constant Pool 的第 0 项也就是 FixedArray[2] 写入 accumulator 寄存器当中
  2. Star1: 把 accumulator 寄存器的值拷贝到 r1 寄存器,结合上一条字节码,就是设置 r1 = FixedArray[2]
  3. Mov <closure>, r2: 把 <closure> 拷贝到 r2 寄存器,猜测这里的 <closure> 对应的是 add 函数
  4. CallRuntime [DeclareGlobals], r1-r2: 调用运行时的 DeclareGlobals 函数,并传递两个参数,分别是 r1r2;有意思的是,CallRuntime 的参数必须保存在连续的寄存器当中,猜测是为了节省编码空间

至此,add 函数就声明完成了。接下来,就要实现 add(1, 2) 的调用:

  1. LdaGlobal [1], [0]: 把 Constant Pool 的第 1 项也就是 "add" 这个字符串写入 accumulator,最后的 [0] 和 FeedBackVector 有关,目前先忽略
  2. Star1: 把 accumulator 寄存器的值拷贝到 r1 寄存器,结合上一条字节码,就是设置 r1 = "add"
  3. LdaSmi [1]: 把小整数(Small integer, Smi)1 写入 accumulator
  4. Star2: 把 accumulator 寄存器的值拷贝到 r2 寄存器,结合上一条字节码,就是设置 r2 = 1
  5. LdaSmi [2]: 把小整数(Small integer, Smi)2 写入 accumulator
  6. Star3: 把 accumulator 寄存器的值拷贝到 r3 寄存器,结合上一条字节码,就是设置 r3 = 2
  7. CallUndefinedReceiver2 r1, r2, r3, [2]: 根据 r1 调用一个函数,并传递两个参数 r2, r3(函数名称最后的 2 表示有两个参数),最后的 [2] 也和 FeedBackVector 有关

这样就完成了函数调用。

接下来观察 add 函数的 AST:

[generating bytecode for function: add]--- AST ---FUNC at 12. KIND 0. LITERAL ID 1. SUSPEND COUNT 0. NAME "add". INFERRED NAME "". PARAMS. . VAR (0xc50512445280) (mode = VAR, assigned = false) "a". . VAR (0xc50512445300) (mode = VAR, assigned = false) "b". DECLS. . VARIABLE (0xc50512445280) (mode = VAR, assigned = false) "a". . VARIABLE (0xc50512445300) (mode = VAR, assigned = false) "b". RETURN at 25. . kAdd at 34. . . VAR PROXY parameter[0] (0xc50512445280) (mode = VAR, assigned = false) "a". . . VAR PROXY parameter[1] (0xc50512445300) (mode = VAR, assigned = false) "b"

add 函数的 AST 比较直接,a + b 直接对应了 kAdd 结点,直接作为返回值。

接下来观察 add 的 Bytecode:

[generated bytecode for function: add (0x10c700298955 <SharedFunctionInfo add>)]Bytecode length: 6Parameter count 3Register count 0Frame size 0 0x6ed0004008c @ 0 : 0b 04 Ldar a1 0x6ed0004008e @ 2 : 3b 03 00 Add a0, [0] 0x6ed00040091 @ 5 : af ReturnConstant pool (size = 0)Handler Table (size = 0)Source Position Table (size = 0)
  1. Ldar a1: 把第二个参数 a1 也就是 b 写入 accumulator 寄存器
  2. Add a0, [0]: 求第一个参数 a0 也就是 aaccumulator 寄存器的和,写入到 accumulator 寄存器当中,结合上一条 Bytecode,就是 accumulator = a0 + a1[0] 和 FeedBackVector 有关
  3. Return: 把 accumulator 中的值作为返回值,结束函数调用

简单小结一下 V8 的字节码:

  1. 有若干个局部的寄存器,在操作数中以 rn 的形式出现,n 是寄存器编号
  2. accumulator 局部寄存器,作为部分字节码的隐含输入或输出(Add
  3. 有若干个参数,在操作数中以 an 的形式出现,n 是参数编号
  4. 操作数还可以出现立即数参数 [imm],可能是整数字面量(LdaSmi),可能是下标(LdaConstant),也可能是 FeedBackVector 的 slot

有了字节码以后,接下来观察 Ignition 具体是怎么解释执行这些字节码的。

解释执行

为了实际执行这些字节码,Ignition 的做法是:

  1. 给每种可能的 Opcode 生成一段二进制代码,这段代码会实现这个 Opcode 的功能
  2. 在运行时维护一个 dispatch table,维护了 Opcode 到二进制代码地址的映射关系
  3. 在每段代码的结尾,找到下一个 Opcode 对应的代码的地址,然后跳转过去
  4. 调用函数时,先做一系列的准备,找到函数第一个字节码的 Opcode 对应的代码的地址,跳转过去

由于 Opcode 的种类是固定的,所以实际运行 V8 的时候,这些代码已经编译好了,只需要在运行时初始化对应的数据结构即可。这个代码的生成和编译过程,也不是由 C++ 编译器做的,而是有一个 mksnapshot 命令来完成初始化,你可以认为它把这些 Opcode 对应的汇编指令都预先生成好,运行时直接加载即可。

首先来看 Ignition 的怎么实现各种 Opcode 的,以 LdaSmi 为例,它的作用是小的把立即数(Smi=Small integer)写入到 accumulator 当中,这段在 v8/src/interpreter/interpreter-generator.cc 的代码实现了这个逻辑:

// LdaSmi <imm>//// Load an integer literal into the accumulator as a Smi.IGNITION_HANDLER(LdaSmi, InterpreterAssembler) { TNode<Smi> smi_int = BytecodeOperandImmSmi(0); SetAccumulator(smi_int); Dispatch();}

可以看到,逻辑并不复杂,就是取了第一个立即数操作数,设置到了 accumulator,最后调用 Dispatch,也就是读取下一个 Opcode 对应的汇编指令然后跳转。接下来看这几个步骤在汇编上是怎么实现的。

为了查看 Ignition 对各种 Opcode 具体生成了什么样的汇编指令,可以用 ./out/arm64.optdebug/mksnapshot --trace-ignition-codegen --code-comments 命令查看,下面列出了 LdaSmi 这个 Opcode 对应的汇编,由于这段汇编有点长,具体做的事情和对应的源码已经通过注释标注出来:

kind = BYTECODE_HANDLERname = LdaSmicompiler = turbofanaddress = 0x31a000906fdInstructions (size = 324)# 在代码的开头,检查寄存器是否正确,即 x2 是否保存了当前代码段的开始地址,对应的源码:# v8/src/compiler/backend/arm64/code-generator-arm64.cc CodeGenerator::AssembleCodeStartRegisterCheck():# UseScratchRegisterScope temps(masm());# Register scratch = temps.AcquireX();# __ ComputeCodeStartAddress(scratch); // becomes x16 in the following code# __ cmp(scratch, kJavaScriptCallCodeStartRegister);# __ Assert(eq, AbortReason::kWrongFunctionCodeStart);# 其中 kJavaScriptCallCodeStartRegister 定义在 v8/src/codegen/arm64/register-arm64.h:# constexpr Register kJavaScriptCallCodeStartRegister = x2; [ Frame: MANUAL -- Prologue: check code start register -- - AssembleCode@../../src/compiler/backend/code-generator.cc:2320xc6ccfe4a8b00 0 10000010 adr x16, #+0x0 (addr 0xc6ccfe4a8b00)0xc6ccfe4a8b04 4 eb02021f cmp x16, x20xc6ccfe4a8b08 8 54000080 b.eq #+0x10 (addr 0xc6ccfe4a8b18) [ - Abort@../../src/codegen/arm64/macro-assembler-arm64.cc:4008 Abort message: - Abort@../../src/codegen/arm64/macro-assembler-arm64.cc:4010 Wrong value in code start register passed - Abort@../../src/codegen/arm64/macro-assembler-arm64.cc:40110xc6ccfe4a8b0c c d2801081 movz x1, #0x84 [ Frame: NO_FRAME_TYPE [ - EntryFromBuiltinAsOperand@../../src/codegen/arm64/macro-assembler-arm64.cc:2377 ]0xc6ccfe4a8b10 10 f96a3750 ldr x16, [x26, #21608]0xc6ccfe4a8b14 14 d63f0200 blr x16 ] ]# 栈对齐检查,定义在:# v8/src/codegen/arm64/macro-assembler-arm64.cc MacroAssembler::AssertSpAligned():# if (!v8_flags.debug_code) return;# ASM_CODE_COMMENT(this);# HardAbortScope hard_abort(this); // Avoid calls to Abort.# // Arm64 requires the stack pointer to be 16-byte aligned prior to address# // calculation.# UseScratchRegisterScope scope(this);# Register temp = scope.AcquireX(); // becomes x16 in the following code# Mov(temp, sp);# Tst(temp, 15);# Check(eq, AbortReason::kUnexpectedStackPointer); -- B0 start (construct frame) -- [ - AssertSpAligned@../../src/codegen/arm64/macro-assembler-arm64.cc:15900xc6ccfe4a8b18 18 910003f0 mov x16, sp [ - LogicalMacro@../../src/codegen/arm64/macro-assembler-arm64.cc:1970xc6ccfe4a8b1c 1c f2400e1f tst x16, #0xf ]0xc6ccfe4a8b20 20 54000080 b.eq #+0x10 (addr 0xc6ccfe4a8b30) [ - Abort@../../src/codegen/arm64/macro-assembler-arm64.cc:4008 Abort message: - Abort@../../src/codegen/arm64/macro-assembler-arm64.cc:4010 The stack pointer is not the expected value - Abort@../../src/codegen/arm64/macro-assembler-arm64.cc:4011 [ Frame: NO_FRAME_TYPE0xc6ccfe4a8b24 24 52800780 movz w0, #0x3c0xc6ccfe4a8b28 28 f94e7750 ldr x16, [x26, #7400]0xc6ccfe4a8b2c 2c d63f0200 blr x16 ] ] ]# 构建栈帧# 构建完成后会得到:sp = prev sp - 64, fp = sp + 48## 栈帧的示意图, 每一个方框表示 8 字节的内存:# sp + 64 +------------+# | lr | <= lr 是 link register 的缩写,表示返回地址# sp + 56 +------------+# | prev fp | <= 保存了调用前的 fp (frame pointer)# sp + 48 +------------+ <= 新的 fp (frame pointer) 指向这里# | x16 (0x22) |# sp + 40 +------------+# | x20 | <= bytecode array register# sp + 32 +------------+# | x21 | <= dispatch table register# sp + 24 +------------+# | x19 | <= bytecode offset register,记录当前正在执行的 bytecode 在 bytecode array 中的偏移# sp + 16 +------------+# | x0 | <= accumulator register# sp + 8 +------------+# | |# sp +------------+ <= 新的 sp (stack pointer) 指向这里## 这些寄存器定义在 v8/src/codegen/arm64/register-arm64.h 当中:# constexpr Register kInterpreterAccumulatorRegister = x0;# constexpr Register kInterpreterBytecodeOffsetRegister = x19;# constexpr Register kInterpreterBytecodeArrayRegister = x20;# constexpr Register kInterpreterDispatchTableRegister = x21;0xc6ccfe4a8b30 30 d2800450 movz x16, #0x220xc6ccfe4a8b34 34 a9be43ff stp xzr, x16, [sp, #-32]!0xc6ccfe4a8b38 38 a9017bfd stp fp, lr, [sp, #16]0xc6ccfe4a8b3c 3c 910043fd add fp, sp, #0x10 (16)0xc6ccfe4a8b40 40 d10083ff sub sp, sp, #0x20 (32)0xc6ccfe4a8b44 44 f90013f4 str x20, [sp, #32]0xc6ccfe4a8b48 48 f9000ff5 str x21, [sp, #24]0xc6ccfe4a8b4c 4c f90007e0 str x0, [sp, #8]0xc6ccfe4a8b50 50 f9000bf3 str x19, [sp, #16]# 调用了未知的 C 函数0xc6ccfe4a8b54 54 d28042c1 movz x1, #0x216 [ - LoadFromConstantsTable@../../src/codegen/arm64/macro-assembler-arm64.cc:2166 [ - LoadRoot@../../src/codegen/arm64/macro-assembler-arm64.cc:19540xc6ccfe4a8b58 58 f94d5f42 ldr x2, [x26, #6840] ] [ - DecompressTagged@../../src/codegen/arm64/macro-assembler-arm64.cc:34480xc6ccfe4a8b5c 5c d28bcef0 movz x16, #0x5e770xc6ccfe4a8b60 60 b8706842 ldr w2, [x2, x16]0xc6ccfe4a8b64 64 8b020382 add x2, x28, x2 ] ]0xc6ccfe4a8b68 68 aa1403e0 mov x0, x200xc6ccfe4a8b6c 6c f94ecf50 ldr x16, [x26, #7576] [ - CallCFunction@../../src/codegen/arm64/macro-assembler-arm64.cc:21060xc6ccfe4a8b70 70 10000068 adr x8, #+0xc (addr 0xc6ccfe4a8b7c)0xc6ccfe4a8b74 74 a93f235d stp fp, x8, [x26, #-16]0xc6ccfe4a8b78 78 d63f0200 blr x160xc6ccfe4a8b7c 7c f81f035f stur xzr, [x26, #-16] ]0xc6ccfe4a8b80 80 d2800001 movz x1, #0x0 [ - LoadFromConstantsTable@../../src/codegen/arm64/macro-assembler-arm64.cc:2166 [ - LoadRoot@../../src/codegen/arm64/macro-assembler-arm64.cc:19540xc6ccfe4a8b84 84 f94d5f42 ldr x2, [x26, #6840] ] [ - DecompressTagged@../../src/codegen/arm64/macro-assembler-arm64.cc:34480xc6ccfe4a8b88 88 d28bcf70 movz x16, #0x5e7b0xc6ccfe4a8b8c 8c b8706842 ldr w2, [x2, x16]0xc6ccfe4a8b90 90 8b020382 add x2, x28, x2 ] ]0xc6ccfe4a8b94 94 f94007e0 ldr x0, [sp, #8]0xc6ccfe4a8b98 98 f94ecf50 ldr x16, [x26, #7576] [ - CallCFunction@../../src/codegen/arm64/macro-assembler-arm64.cc:21060xc6ccfe4a8b9c 9c 10000068 adr x8, #+0xc (addr 0xc6ccfe4a8ba8)0xc6ccfe4a8ba0 a0 a93f235d stp fp, x8, [x26, #-16]0xc6ccfe4a8ba4 a4 d63f0200 blr x160xc6ccfe4a8ba8 a8 f81f035f stur xzr, [x26, #-16] ]# 从这里开始实现 LdaSmi 的语义# 从前面的分析可以看到 LdaSmi 由两个字节组成:# 1. 第一个字节是 0x0d,表示这是一条 LdaSmi# 2. 第二个字节就是要加载到 `accumulator` 的小整数# 如:0d 01 对应 LdaSmi [1],0d 02 对应 LdaSmi [2]# 所以,为了实现 LdaSmi,需要从 bytecode array 中读取 LdaSmi 字节码的第二个字节,# 保存到 `accumulator` 寄存器当中# 下面一条一条地分析指令在做的事情:# 1. 从 sp + 16 地址读取 bytecode offset 寄存器的值到 x3,# 它记录了 LdaSmi 相对 bytecode array 的偏移0xc6ccfe4a8bac ac f9400be3 ldr x3, [sp, #16]# 2. 计算 x3 + 1 的值并写入 x4,得到 LdaSmi 的第二个字节相对 bytecode array 的偏移0xc6ccfe4a8bb0 b0 91000464 add x4, x3, #0x1 (1)# 3. 从 sp + 32 地址读取 bytecode array 寄存器的值到 x200xc6ccfe4a8bb4 b4 f94013f4 ldr x20, [sp, #32]# 4. 从 x20 + x4 地址读取 LdaSmi 的第二个字节到 x4,也就是要加载到 `accumulator` 的值,# 之后 x4 的值会写入到 x0,也就是 `accumulator` 对应的寄存器0xc6ccfe4a8bb8 b8 38e46a84 ldrsb w4, [x20, x4]# Dispatch: 找到下一个 Opcode 对应的代码的入口,然后跳转过去 ========= Dispatch - Dispatch@../../src/interpreter/interpreter-assembler.cc:1278 - AssembleArchInstruction@../../src/compiler/backend/arm64/code-generator-arm64.cc:978# x3 是 LdaSmi 当前所在的 bytecode offset,加 2 是因为 LdaSmi 占用了两个字节# x19 = x3 + 2,就是 bytecode offset 前进两个字节,指向下一个字节码0xc6ccfe4a8bbc bc 91000873 add x19, x3, #0x2 (2)# x20 是 bytecode array,从 bytecode array 读取下一个字节码的第一个字节到 x3 寄存器0xc6ccfe4a8bc0 c0 38736a83 ldrb w3, [x20, x19]# 接下来检查在 x3 寄存器当中的字节码的第一个字节(Opcode),如果它:# 1. 小于 187(kFirstShortStar),说明它不是特殊的 Short Star (Star0-Star15) 字节码# 2. 介于 187(kFirstShortStar) 和 202(kLastShortStar) 之间,说明它是特殊的 Short Star (Star0-Star15) 字节码# 3. 如果大于 202(kLastShortStar),说明它是非法的字节码# 如果 x3 寄存器大于或等于 187,说明这个字节码可能是 Short Star 字节码,就跳转到后面的 B20xc6ccfe4a8bc4 c4 7102ec7f cmp w3, #0xbb (187)0xc6ccfe4a8bc8 c8 54000102 b.hs #+0x20 (addr 0xc6ccfe4a8be8) -- B1 start --# 此时 x3 小于 187# 从栈上读取 x21 即 dispatch table register0xc6ccfe4a8bcc cc f9400ff5 ldr x21, [sp, #24]# 从 dispatch table,以 x3 为下标,读取下一个字节码对应的代码的地址0xc6ccfe4a8bd0 d0 f8637aa2 ldr x2, [x21, x3, lsl #3]# 把之前 LdaSmi 计算得到的 x4 寄存器写到 `accumulator` 即 x0 寄存器当中# 这里 x0 = 2 * x4,是因为 v8 用最低位表示这是一个 Smi(用 0 表示)还是一个指针(用 1 表示)0xc6ccfe4a8bd4 d4 0b040080 add w0, w4, w4# 恢复调用函数前的旧 fp 和 lr0xc6ccfe4a8bd8 d8 a9407bbd ldp fp, lr, [fp]# 恢复调用函数前的旧 sp0xc6ccfe4a8bdc dc 910103ff add sp, sp, #0x40 (64)# 下一个字节码对应的代码的地址已经保存在 x2 寄存器当中,跳转过去0xc6ccfe4a8be0 e0 aa0203f1 mov x17, x20xc6ccfe4a8be4 e4 d61f0220 br x17 -- B2 start -- [ Assert: UintPtrGreaterThanOrEqual(opcode, UintPtrConstant(static_cast<int>( Bytecode::kFirstShortStar))) - StoreRegisterForShortStar@../../src/interpreter/interpreter-assembler.cc:310 - AssembleArchInstruction@../../src/compiler/backend/arm64/code-generator-arm64.cc:978 ] Assert - AssembleArchInstruction@../../src/compiler/backend/arm64/code-generator-arm64.cc:978 [ Assert: UintPtrLessThanOrEqual( opcode, UintPtrConstant(static_cast<int>(Bytecode::kLastShortStar))) - StoreRegisterForShortStar@../../src/interpreter/interpreter-assembler.cc:314 - AssembleArchInstruction@../../src/compiler/backend/arm64/code-generator-arm64.cc:978# 此时 x3 大于或等于 187# 进一步判断 x3 是否大于 202,如果大于,则跳转到后面的 B30xc6ccfe4a8be8 e8 7103287f cmp w3, #0xca (202)0xc6ccfe4a8bec ec 540001c8 b.hi #+0x38 (addr 0xc6ccfe4a8c24) -- B4 start --# 此时 x3 介于 187 和 202 之间,是一个 Short Star# Short Star 做的事情就是把 `accumulator` 寄存器的值复制到 r0-r15 当中指定的寄存器# 所以直接在这里实现了 Short Star 的语义,而不是单独跑一段代码去执行它# 由于 r0-r15 寄存器保存在栈上,所以通过 x3 计算出 Short Star 要写到哪个寄存器# 进而直接计算要写到的栈的地址的偏移# 寻找一个通项公式,找到 Star0-Star15 要写入的地址:# Star0(202): r0 的位置在栈顶再往下的 8 字节,即 fp 减去 56# Star0(187): r15 的位置在栈顶再往下的 8*16 字节,即 fp 减去 176# 相对 fp 的偏移量就等于 x3 * 8 - 1672# 从而得到下面的代码:# 计算 x3 = x3 * 80xc6ccfe4a8bf0 f0 d37df063 lsl x3, x3, #3# 把之前 LdaSmi 计算得到的 x4 寄存器写到 `accumulator` 即 x0 寄存器当中# 这里 x0 = 2 * x4,是因为 v8 用最低位表示这是一个 Smi(用 0 表示)还是一个指针(用 1 表示)0xc6ccfe4a8bf4 f4 0b040080 add w0, w4, w4 ] Assert - AssembleArchInstruction@../../src/compiler/backend/arm64/code-generator-arm64.cc:978# 计算 x3 = x3 - 1672,就得到了相对 fp 的偏移量0xc6ccfe4a8bf8 f8 d11a2063 sub x3, x3, #0x688 (1672)# 从 fp 的地址读取函数调用前的 fp0xc6ccfe4a8bfc fc f94003a4 ldr x4, [fp]# 把 `accumulator` 写入到相对函数调用前的 fp 的对应位置0xc6ccfe4a8c00 100 f8236880 str x0, [x4, x3]# 下面就是 Dispatch 逻辑,只不过这次是执行完 Short Star 字节码后的 Dispatch# x19 = x3 + 1,就是 bytecode offset 前进一个字节,指向下一个字节码0xc6ccfe4a8c04 104 91000673 add x19, x19, #0x1 (1)# x20 是 bytecode array,从 bytecode array 读取下一个字节码的第一个字节到 x3 寄存器0xc6ccfe4a8c08 108 38736a83 ldrb w3, [x20, x19]# 从栈上读取 x21 即 dispatch table register0xc6ccfe4a8c0c 10c f9400ff5 ldr x21, [sp, #24]# 从 dispatch table,以 x3 为下标,读取下一个字节码对应的代码的地址0xc6ccfe4a8c10 110 f8637aa2 ldr x2, [x21, x3, lsl #3]# 恢复调用函数前的旧 fp 和 lr0xc6ccfe4a8c14 114 a9407bbd ldp fp, lr, [fp]# 恢复调用函数前的旧 sp0xc6ccfe4a8c18 118 910103ff add sp, sp, #0x40 (64)# 下一个字节码对应的代码的地址已经保存在 x2 寄存器当中,跳转过去0xc6ccfe4a8c1c 11c aa0203f1 mov x17, x20xc6ccfe4a8c20 120 d61f0220 br x17 -- B5 start (no frame) -- -- B3 start (deferred) --# 此时 x3 大于 202,为非法字节码,跳转到错误处理的逻辑 [ - LoadFromConstantsTable@../../src/codegen/arm64/macro-assembler-arm64.cc:2166 [ - LoadRoot@../../src/codegen/arm64/macro-assembler-arm64.cc:19540xc6ccfe4a8c24 124 f94d5f41 ldr x1, [x26, #6840] ] [ - DecompressTagged@../../src/codegen/arm64/macro-assembler-arm64.cc:34480xc6ccfe4a8c28 128 d28bd170 movz x16, #0x5e8b0xc6ccfe4a8c2c 12c b8706821 ldr w1, [x1, x16]0xc6ccfe4a8c30 130 8b010381 add x1, x28, x1 ] ] [ Frame: NO_FRAME_TYPE [ Inlined Trampoline for call to AbortCSADcheck - CallBuiltin@../../src/codegen/arm64/macro-assembler-arm64.cc:23910xc6ccfe4a8c34 134 96a4e10b bl #-0x56c7bd4 (addr 0xc6ccf8de1060) ;; code: Builtin::AbortCSADcheck ] ]0xc6ccfe4a8c38 138 d4200000 brk #0x00xc6ccfe4a8c3c 13c d4200000 brk #0x00xc6ccfe4a8c40 140 d503201f nop ;;; Safepoint table. - Emit@../../src/codegen/safepoint-table.cc:187 ]External Source positions: pc offset fileid line b4 380 72Safepoints (entries = 2, byte size = 12)0xc6ccfe4a8b7c 7c slots (sp->fp): 010010000xc6ccfe4a8ba8 a8 slots (sp->fp): 00001000RelocInfo (size = 3)0xc6ccfe4a8c34 code target (BUILTIN AbortCSADcheck) (0xc6ccf8de1060)

为了简化代码,关闭了 control flow integrity 相关的代码生成,具体方法是运行 gn args out/arm64.optdebug,追加一行 v8_control_flow_integrity = false,再重新 autoninja -C out/arm64.optdebug d8

以上是 debug 模式下生成的代码,多了很多检查;如果在 release 模式下,可以观察到更优的指令:

kind = BYTECODE_HANDLERname = LdaSmicompiler = turbofanaddress = 0x31a000462bdInstructions (size = 80)# 从这里开始实现 LdaSmi 的语义# 计算 x19 + 1 的值并写入 x1,得到 LdaSmi 的第二个字节相对 bytecode array 的偏移0xc903f8193400 0 91000661 add x1, x19, #0x1 (1)# 从 x20 + x1 地址读取 LdaSmi 的第二个字节到 x1,也就是要加载到 `accumulator` 的值,# 之后 x1 的值会写入到 x0,也就是 `accumulator` 对应的寄存器0xc903f8193404 4 38e16a81 ldrsb w1, [x20, x1]# Dispatch: 找到下一个 Opcode 对应的代码的入口,然后跳转过去# x19 = x19 + 2,就是 bytecode offset 前进两个字节,指向下一个字节码0xc903f8193408 8 91000a73 add x19, x19, #0x2 (2)# x20 是 bytecode array,从 bytecode array 读取下一个字节码的第一个字节到 x3 寄存器0xc903f819340c c 38736a83 ldrb w3, [x20, x19]# 计算 x4 = x3 * 8,也就是 dispatch table 中下一个字节码对应的代码地址的字节偏移0xc903f8193410 10 d37df064 lsl x4, x3, #3# 把之前 LdaSmi 计算得到的 x1 寄存器写到 `accumulator` 即 x0 寄存器当中# 这里 x0 = 2 * x1,是因为 v8 用最低位表示这是一个 Smi(用 0 表示)还是一个指针(用 1 表示)0xc903f8193414 14 0b010020 add w0, w1, w1# 如果 x3 寄存器大于或等于 187,说明这个字节码可能是 Short Star 字节码,就跳转到后面的 0xc903f819342c 地址0xc903f8193418 18 7102ec7f cmp w3, #0xbb (187)0xc903f819341c 1c 54000082 b.hs #+0x10 (addr 0xc903f819342c)# 如果没有跳转,此时 x3 寄存器小于 187# 从 dispatch table,以 x3 为下标(x4 = x3 * 8),读取下一个字节码对应的代码的地址0xc903f8193420 20 f8646aa2 ldr x2, [x21, x4]# 跳转到下一个字节码对应的代码的地址0xc903f8193424 24 aa0203f1 mov x17, x20xc903f8193428 28 d61f0220 br x17# 实现 Short Star 字节码# 计算出要写入的 r0-r15 寄存器相对 fp 的偏移量 x3 * 8 - 1672# 这个偏移量的计算公式在前面推导过,此时 x4 等于 x3 * 80xc903f819342c 2c d11a2081 sub x1, x4, #0x688 (1672)0xc903f8193430 30 aa1d03e3 mov x3, fp# 把 `accumulator` 写入到相对 fp 的对应位置0xc903f8193434 34 f8216860 str x0, [x3, x1]# 下面就是 Dispatch 逻辑,只不过这次是执行完 Short Star 字节码后的 Dispatch# x19 = x19 + 1,就是 bytecode offset 前进一个字节,指向下一个字节码0xc903f8193438 38 91000673 add x19, x19, #0x1 (1)# x20 是 bytecode array,从 bytecode array 读取下一个字节码的第一个字节到 x1 寄存器0xc903f819343c 3c 38736a81 ldrb w1, [x20, x19]# 从 dispatch table,以 x1 为下标,读取下一个字节码对应的代码的地址0xc903f8193440 40 f8617aa2 ldr x2, [x21, x1, lsl #3]# 跳转到下一个字节码对应的代码的地址0xc903f8193444 44 aa0203f1 mov x17, x20xc903f8193448 48 d61f0220 br x170xc903f819344c 4c d503201f nop

可见 release 模式下的代码还是简单了许多,保证了性能。

有的 Opcode 后面不会紧接着出现 Short Star,此时 Dispatch 会减少一次特判,代码更加简单,以 Ldar 为例:

kind = BYTECODE_HANDLERname = Ldarcompiler = turbofanaddress = 0x31a00046245Instructions (size = 44)# Ldar 的语义是,把指定参数寄存器的值写入到 `accumulator` 当中# 参数寄存器的位置记录在 Ldar 字节码的第二个字节中# 如:0b 04 对应 Ldar a1# 计算 x19 + 1 的值并写入 x1,得到 Ldar 的第二个字节相对 bytecode array 的偏移0xc903f8193320 0 91000661 add x1, x19, #0x1 (1)# 从 x20 + x1 地址读取 Ldar 的第二个字节到 x1,也就是参数寄存器相对 fp 的下标0xc903f8193324 4 38a16a81 ldrsb x1, [x20, x1]# 相对 fp 以 x1 为下标,读取参数寄存器的值到 x1 寄存器0xc903f8193328 8 aa1d03e3 mov x3, fp0xc903f819332c c f8617861 ldr x1, [x3, x1, lsl #3]# Dispatch: 找到下一个 Opcode 对应的代码的入口,然后跳转过去# x19 = x19 + 2,就是 bytecode offset 前进两个字节,指向下一个字节码0xc903f8193330 10 91000a73 add x19, x19, #0x2 (2)# x20 是 bytecode array,从 bytecode array 读取下一个字节码的第一个字节到 x3 寄存器0xc903f8193334 14 38736a83 ldrb w3, [x20, x19]# 从 dispatch table,以 x3 为下标,读取下一个字节码对应的代码的地址0xc903f8193338 18 f8637aa2 ldr x2, [x21, x3, lsl #3]# 把参数寄存器的值写入到 `accumulator` 也就是 x0 当中0xc903f819333c 1c aa0103e0 mov x0, x1# 跳转到下一个字节码对应的代码的地址0xc903f8193340 20 aa0203f1 mov x17, x20xc903f8193344 24 d61f0220 br x170xc903f8193348 28 d503201f nop

小结一下:

  1. Ignition 给每种可能的 Opcode 类型生成一段代码
  2. 这段代码会进行一些检查(仅 Debug 模式下),然后在汇编里实现这个字节码的功能
  3. 执行完字节码后,进入 Dispatch 逻辑,寻找下一个字节码对应的代码的地址
  4. 特别地,如果下一个字节码是 Short Star (Star0-Star15),因为它比较简单和常见,就直接执行它,执行完再重新寻找再下一个字节码对应的代码的地址
  5. 这些 Opcode 对应的代码会在 v8 编译过程中通过 mksnapshot 命令一次性生成好,运行时直接复用,不用重新生成
  6. V8 的值的最低位标识了它的类型:0 表示 Smi,1 表示指针,因此在存储 Smi 的时候,寄存器里保存的是实际值的两倍,这样最低位就是 0

参考

导出飞书日历为 iCalendar 格式

2025-02-04 08:00:00

导出飞书日历为 iCalendar 格式

背景

之前用了一段时间飞书日历,想要把日历里的事件导出来备份,但是发现飞书自己的导出功能太弱,因此参考 从飞书导出日历到 Fastmail - Xuanwo's Blog 进行了导出的尝试。

导出方法

上面提到的文章,是通过 CalDAV 的方式进行的日历同步。因此我第一步也是配置飞书的 CalDAV 服务:

  1. 打开飞书客户端
  2. 点击设置
  3. 点击日历
  4. 设置 CalDAV 同步

按照界面所示,配置 CalDAV 同步,就可以得到用于 CalDAV 的域名、用户名和密码了。如果只是要订阅,那么到这一步,就可以直接用 CalDAV 客户端来同步了。但我想进一步得到 iCalendar 格式的日历文件。

于是我参考了上述文章的评论区的做法:

@jason5ng32 jason5ng32Oct 28, 2024分享一下我的方法:1. 在服务器上安装 vdirsyncer ,这个工具可以同步 CalDAV 的内容,在同步设置里,不需要先找到 UUID,可以直接用飞书提供的 URL。2. 写一个 Python 脚本,将 vdirsyncer 同步的内容合并成单一的 ics 文件。3. 将 ics 文件放到一个地址稍微复杂一点的 http 目录里,可以外部访问。4. 写一个 run.sh 脚本,通过 crontab 每 10 分钟执行一次 vdirsyncer 同步和日历文件合成。

也就是说,用 vdirsyncer 把日历同步到本地,再转换为 iCalendar 格式的日历文件。参考 vdirsyncer 文档,这件事情并不复杂:

  1. 按照 vdirsyncer: pip3 install vdirsyncer
  2. 编辑 ~/.vdirsyncer/config,填入在飞书处得到的用户密码:
    [general]status_path = "~/.vdirsyncer/status/"[pair my_contacts]a = "my_contacts_local"b = "my_contacts_remote"collections = ["from a", "from b"][storage my_contacts_local]type = "filesystem"path = "~/.contacts/"fileext = ".ics"[storage my_contacts_remote]type = "caldav"url = "https://caldav.feishu.cn"username = "REDACTED"password = "REDACTED"
  3. 配置好以后,进行同步:vdirsyncer discover && vdirsyncer sync

此时在 ~/.contacts 目录下,已经能看到很多个 ics 文件了,每个 ics 文件对应了日历中的一个事件。实际上,这些文件就已经是 iCalendar 格式了,只不过每个文件只有一个事件。

为了让一个 .ics 文件包括日历的所有事件,写了一个脚本,实际上就是处理每个 ics 文件,去掉每个文件开头结尾的 BEGIN:VCALENDAREND:VCALENDAR,把中间的部分拼起来,最后再加上开头结尾:

import sysall_lines = []all_lines += ["BEGIN:VCALENDAR"]for f in sys.argv[1:]: content = open(f).read().strip() lines = content.splitlines() all_lines += lines[1:-1]all_lines += ["END:VCALENDAR"]print("\n".join(all_lines))

运行上述脚本:python3 dump.py ~/.contacts/*/*.ics > dump.ics,这样得到的 .ics 文件就可以直接导入到日历软件了。

UPDATE: 我在之前写的飞书文档备份工具 feishu-backup 的基础上,添加了飞书日历的导出功能,把原始的 json 保存下来,并转换得到 iCalendar 格式的 .ics 文件。

导出 iCloud 国区的日历和联系人

除了导出飞书的日历,也可以用类似的方法导出 iCloud 国区的日历:把 url 改成 "https://caldav.icloud.com.cn",在 Apple ID 上生成 App 密码,填入上面的 password,再把邮箱写到 username 即可。

更进一步,也可以导出 iCloud 国区的联系人:

  1. 把配置中 fileext = ".ics" 改成 fileext = ".vcf",因为联系人的格式是 vCard,其文件名后缀是 .vcf
  2. type = "caldav" 改成 type = "carddav",把 url = "https://caldav.icloud.com.cn 改成 url = "https://contacts.icloud.com.cn",表示同步联系人而不是日历

如果既要同步日历,又要同步联系人,或者需要同步来自不同来源的日历,建议把 status 和 storage local 放到不同的目录下,避免出现冲突。