In [2]:
import math
import warnings
from typing import List, Optional, Tuple, Union, Dict
from collections import OrderedDict

import torch
import torch.nn.functional as F
from torch import nn
from torch.nn import CrossEntropyLoss
import re
from dataclasses import dataclass


import logging
from configuration_minicpm import MiniCPMConfig # 直接导入

logger = logging.getLogger(__name__)

 from .autonotebook import tqdm as notebook_tqdm


MiniCPM 采用标准的 Decoder 作为其架构,主要包括三个部分:Embedding, Attention 和 MLP 层。我们对每一部分进行拆解,以便更好地理解其工作原理。整体代码源自于 [MiniCPM 官方仓库](https://github.com/OpenBMB/MiniCPM),这里逐步搭建模型,以便更好地理解其工作原理。

In [None]:
config = MiniCPMConfig(**json.load(open("config.json")))

In [3]:
@dataclass
class BaseModelOutputWithPast(OrderedDict):
 last_hidden_state: torch.FloatTensor = None
 past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None
 hidden_states: Optional[Tuple[torch.FloatTensor, ...]] = None
 attentions: Optional[Tuple[torch.FloatTensor, ...]] = None
 
@dataclass
class CausalLMOutputWithPast(OrderedDict):
 loss: Optional[torch.FloatTensor] = None
 logits: torch.FloatTensor = None
 past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None
 hidden_states: Optional[Tuple[torch.FloatTensor, ...]] = None
 attentions: Optional[Tuple[torch.FloatTensor, ...]] = None


### RoPE

在计算 Embedding 时,采用了 RoPE(Rotary Positional Embedding)的相对位置编码方式,帮助模型更好地理解序列中的位置信息。RoPE 的核心思想是将位置编码的计算转换为旋转矩阵的计算,从而减少计算量。RoPE 的计算公式如下:

`MiniCPMRotaryEmbedding` 实现了旋转位置嵌入(Rotary Position Embedding)。它计算并缓存旋转位置编码的余弦和正弦值,以便在前向传播过程中快速获取。

In [4]:
class MiniCPMRotaryEmbedding(nn.Module):
 def __init__(self, dim, max_position_embeddings=2048, base=10000, device=None):
 super().__init__()

 self.dim = dim
 self.max_position_embeddings = max_position_embeddings
 self.base = base
 # 计算了逆频率inv_freq并使用register_buffer方法将其注册为一个缓冲区
 inv_freq = 1.0 / (self.base ** (torch.arange(0, self.dim, 2).float().to(device) / self.dim))
 self.register_buffer("inv_freq", inv_freq, persistent=False)

 # 构建缓存
 self._set_cos_sin_cache(
 seq_len=max_position_embeddings, device=self.inv_freq.device, dtype=torch.float32
 )

 def _set_cos_sin_cache(self, seq_len, device, dtype):
 # 计算并缓存余弦和正弦值
 self.max_seq_len_cached = seq_len
 t = torch.arange(self.max_seq_len_cached, device=device, dtype=self.inv_freq.dtype)
 freqs = torch.outer(t, self.inv_freq)

 # 将频率扩展到维度上
 emb = torch.cat((freqs, freqs), dim=-1)

 # 缓存余弦值和正弦值
 self.register_buffer("cos_cached", emb.cos().to(dtype), persistent=False)
 self.register_buffer("sin_cached", emb.sin().to(dtype), persistent=False)

 def forward(self, x, seq_len=None):
 # 首先检查输入序列的长度是否超过了缓存的最大长度,如果超过了,则重新计算并缓存余弦和正弦值
 # x: [bs, num_attention_heads, seq_len, head_size]
 if seq_len > self.max_seq_len_cached:
 self._set_cos_sin_cache(seq_len=seq_len, device=x.device, dtype=x.dtype)

 # 返回对应序列长度的余弦和正弦值
 return (
 self.cos_cached[:seq_len].to(dtype=x.dtype),
 self.sin_cached[:seq_len].to(dtype=x.dtype),
 )

此段代码的功能是对输入数据的一半隐藏维度进行旋转操作。将原本的后半部分旋转到前面,将原本的前半部分旋转到后面。

In [5]:
def rotate_half(x):
 # 将输入张量 x 沿 emb 维度一分为二
 x1 = x[..., : x.shape[-1] // 2]
 x2 = x[..., x.shape[-1] // 2 :]
 # 将后半部分取负号,然后与前半部分拼接,对输入张量的隐藏维度进行旋转
 return torch.cat((-x2, x1), dim=-1)

此函数将旋转位置嵌入(Rotary Position Embedding)应用于查询和键张量。首先,函数获取键张量的数据类型,然后根据位置索引提取旋转嵌入的余弦和正弦部分,并在指定维度上进行扩展。为了提高计算的精度,在进行 embedding 计算时,从 bfloat16 数据类型转换为 float32 数据类型。

In [6]:
def apply_rotary_pos_emb(q, k, cos, sin, position_ids, unsqueeze_dim=1):
 # 保存原始数据类型
 orig_dtype = k.dtype # torch.bfloat16
 
 # 根据 position_ids 选择 cos 和 sin,并在指定维度上扩展
 cos = cos[position_ids].unsqueeze(unsqueeze_dim) # [bs, 1, seq_len, dim] 便于和[bs, num_heads, q_len, head_dim] 维度的 q,k 进行矩阵乘法
 sin = sin[position_ids].unsqueeze(unsqueeze_dim) # [bs, 1, seq_len, dim]
 
 # 将 q 和 k 转换为 float32 类型,以便进行精确的计算
 q_fp32 = q.to(dtype=torch.float32, device=q.device)
 k_fp32 = k.to(dtype=torch.float32, device=k.device)
 
 # 计算 q 和 k 的旋转位置嵌入
 q_embed = (q_fp32 * cos) + (rotate_half(q_fp32) * sin)
 k_embed = (k_fp32 * cos) + (rotate_half(k_fp32) * sin)
 
 # 将结果转换回原始数据类型并返回
 return q_embed.to(dtype=orig_dtype), k_embed.to(dtype=orig_dtype) # [bs, num_heads, q_len, head_dim]

### Attention

在语言模型中,未来的 token 在当前时间步骤中是不可见的。因此,我们构造一个上三角矩阵来屏蔽未来的信息。在此矩阵中,对角线以上的部分(即未来的元素)被设置为极小的浮点数值(通常为负无穷大),这样做的目的是在自注意力机制的计算过程中,使这些部分被忽略或仅被赋予极小的权重,从而确保模型仅能“感知”到之前的元素。若存在缓存,则需要将过去的缓存纳入考虑范围。

In [7]:
def create_causal_mask(input_shape, dtype, device, past_length=0):
 batch_size, query_length = input_shape
 # 创建一个上三角矩阵,填充最小浮点值,表示未来的token不能看到
 causal_mask = torch.triu(torch.full((query_length, query_length), torch.finfo(dtype).min, dtype=dtype, device=device), diagonal=1)
 # 如果有过去的key-value长度,则在mask前面添加零矩阵
 if past_length > 0:
 causal_mask = torch.cat([torch.zeros(query_length, past_length, dtype=dtype, device=device), causal_mask], dim=-1)
 # 扩展mask的维度以匹配批次大小,并返回
 return causal_mask[None, None, :, :].expand(batch_size, 1, query_length, query_length + past_length)

在 MiniCPM 模型中,原始的分词器(tokenizer)生成的掩码(mask)矩阵是一个二维矩阵,其中0表示填充(padding)位置,1表示真实令牌(token)位置。在注意力(attention)层中,我们需要将这个掩码矩阵扩展到四维,以便它能够与注意力矩阵进行逐元素相乘。这一步骤是为了确保模型在计算注意力权重时,只考虑真实令牌的位置,而忽略填充位置,从而提高模型处理不同长度输入序列的能力。

In [None]:
def expand_attention_mask(mask, dtype, target_length = None):
 batch_size, source_length = mask.shape
 target_length = target_length if target_length is not None else source_length

 # 扩展mask的维度以匹配目标长度和批次大小
 expanded_mask = mask[:, None, None, :].expand(batch_size, 1, target_length, source_length).to(dtype)
 # 反转mask,将1变为0,0变为1
 inverted_mask = 1.0 - expanded_mask
 # 将反转后的mask中为True的位置填充为最小浮点值
 return inverted_mask.masked_fill(inverted_mask.bool(), torch.finfo(dtype).min)

组合我们设计好的用于因果语言模型的 mask 和 padding mask,得到最终的 mask 矩阵。这个矩阵的作用是在自注意力机制中,屏蔽未来的信息,确保模型只能“感知”到之前的元素。

In [None]:

def prepare_4d_causal_attention_mask(
 attention_mask: Optional[torch.Tensor],
 query_length: int,
 past_length: int,
 dtype: torch.dtype,
 device: Union[torch.device, "str"] = "cpu",
):

 # 如果attention_mask存在且是2维的
 if attention_mask is not None and attention_mask.dim() == 2:
 # 获取批次大小和查询长度
 batch_size = attention_mask.shape[0]
 query_length = query_length
 # 更新input_shape和past_length
 input_shape = (batch_size, query_length)
 causal_mask = None
 if query_length > 1:
 # 创建4维的causal mask
 causal_mask = create_causal_mask(input_shape, dtype, device, past_length)
 # 扩展attention mask
 expanded_mask = expand_attention_mask(attention_mask, dtype, query_length)
 if causal_mask is not None:
 # 将causal mask中对应expanded mask为True的位置填充为最小浮点值
 expanded_attn_mask = causal_mask.masked_fill(expanded_mask.bool(), torch.finfo(dtype).min)
 expanded_attn_mask = expanded_mask
 return expanded_attn_mask


`MiniCPMAttention` 通过多头注意力机制高效处理长序列数据。它融合了动态头维度分配、旋转式位置编码(RoPE)、以及键值对缓存机制等多项技术,以提高模型的性能和灵活性。

- **动态头维度分配**:通过将隐藏层的维度均匀分配给多个注意力头,实现了并行处理的优化,从而提高了计算效率。
- **RoPE 位置编码**:引入了旋转式位置编码,以增强模型对序列位置信息的捕捉能力。这在处理长序列时尤其重要,因为它能够有效地保持位置信息的连续性和一致性。
- **键值对缓存机制**:在自回归解码过程中,支持缓存先前计算的键值对,这一机制显著加速了连续解码任务的处理速度。

相比如原始的 Attention,MiniCPMAttention 在计算 Embeddig 时采用 RoPE Embedding,这样可以更好地处理长序列。另外,MiniCPMAttention 支持键值对的缓存,这在自回归解码中非常有用,可以大大提高解码速度。

In [8]:
class MiniCPMAttention(nn.Module):
 def __init__(self, config: MiniCPMConfig, layer_idx: Optional[int] = None):
 super().__init__()
 self.config = config
 self.layer_idx = layer_idx
 if layer_idx is None:
 layer_idx.warn_once(
 f"Instantiating {self.__class__.__name__} without passing `layer_idx` is not recommended and will "
 "to errors during the forward call, if caching is used. Please make sure to provide a `layer_idx` "
 "when creating this class."
 )

 self.attention_dropout = config.attention_dropout # 0.0
 self.hidden_size = config.hidden_size # 2304
 self.num_heads = config.num_attention_heads # 36
 self.head_dim = self.hidden_size // self.num_heads # 64
 self.num_key_value_heads = config.num_key_value_heads # 36
 self.num_key_value_groups = self.num_heads // self.num_key_value_heads # 1
 self.max_position_embeddings = config.max_position_embeddings # 2048
 self.rope_theta = config.rope_theta # 10000.0
 self.is_causal = True

 if (self.head_dim * self.num_heads) != self.hidden_size:
 raise ValueError(
 f"hidden_size must be divisible by num_heads (got `hidden_size`: {self.hidden_size}"
 f" and `num_heads`: {self.num_heads})."
 )

 self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=config.attention_bias) # (2304, 36*64=2304)
 self.k_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=config.attention_bias)
 self.v_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=config.attention_bias)
 self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=config.attention_bias)
 self._init_rope()

 def _init_rope(self):
 self.rotary_emb = MiniCPMRotaryEmbedding(
 self.head_dim,
 max_position_embeddings=self.max_position_embeddings,
 base=self.rope_theta,
 )

 def forward(
 self,
 hidden_states: torch.Tensor,
 attention_mask: Optional[torch.Tensor] = None,
 position_ids: Optional[torch.LongTensor] = None,
 past_key_value: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
 output_attentions: bool = False,
 use_cache: bool = False,
 **kwargs,
 ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[Tuple[torch.Tensor]]]:

 bsz, q_len, _ = hidden_states.size()

 # q,k,v 矩阵
 query_states = self.q_proj(hidden_states)
 key_states = self.k_proj(hidden_states)
 value_states = self.v_proj(hidden_states)
 
 # 拆成 num_heads 个头 (bsz, num_heads, q_len, self.head_dim)
 query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2)
 key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)
 value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2)

 kv_seq_len = key_states.shape[-2]
 if past_key_value is not None and len(past_key_value) > 0 and len(past_key_value[0]) > self.layer_idx and len(past_key_value[0][self.layer_idx].shape) > 1:
 # 如果有 kv-cache 缓存,需要加上缓存的长度
 kv_seq_len += past_key_value[0][self.layer_idx].shape[0] 
 
 # 获取 RoPE Embedding 对应位置的 cos 和 sin 值 ( 这里传入的 value_states 不会参与计算,只是确保类型和设备)
 cos, sin = self.rotary_emb(value_states.to(torch.float32), seq_len=kv_seq_len)
 
 # 对 q 和 k 向量应用 RoPE 位置编码
 query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids)
 # 如果存在先前的 k-v 缓存
 if past_key_value is not None:
 # 若当前层缓存未初始化,则进行初始化
 if len(past_key_value[0]) <= self.layer_idx:
 # 为当前层新增 k-v 的缓存
 past_key_value[0].append(key_states)
 past_key_value[1].append(value_states)
 else:
 # 若当前层缓存已存在,通过在序列长度维度上进行拼接更新缓存
 past_key_value[0][self.layer_idx] = torch.cat([past_key_value[0][self.layer_idx], key_states], dim=-2)
 past_key_value[1][self.layer_idx] = torch.cat([past_key_value[1][self.layer_idx], value_states], dim=-2)

 key_states, value_states = past_key_value[0][self.layer_idx], past_key_value[1][self.layer_idx] 
 
 attn_weights = torch.matmul(query_states, key_states.transpose(2, 3)) / math.sqrt(self.head_dim)
 
 if attn_weights.size() != (bsz, self.num_heads, q_len, kv_seq_len):
 raise ValueError(
 f"Attention weights should be of size {(bsz, self.num_heads, q_len, kv_seq_len)}, but is"
 f" {attn_weights.size()}"
 )

 if attention_mask is not None:
 if attention_mask.size() != (bsz, 1, q_len, kv_seq_len):
 raise ValueError(
 f"Attention mask should be of size {(bsz, 1, q_len, kv_seq_len)}, but is {attention_mask.size()}"
 )
 attn_weights = attn_weights + attention_mask

 # 使用32位浮点数精度以提高计算精度
 attn_weights = F.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query_states.dtype)
 attn_weights = F.dropout(attn_weights, p=self.attention_dropout, training=self.training)
 attn_output = torch.matmul(attn_weights, value_states)

 if attn_output.size() != (bsz, self.num_heads, q_len, self.head_dim):
 raise ValueError(
 f"`attn_output` should be of size {(bsz, self.num_heads, q_len, self.head_dim)}, but is"
 f" {attn_output.size()}"
 )
 
 attn_output = attn_output.transpose(1, 2).contiguous()

 attn_output = attn_output.reshape(bsz, q_len, self.hidden_size)

 attn_output = self.o_proj(attn_output)

 if not output_attentions:
 attn_weights = None
 
 return attn_output, attn_weights, past_key_value

在模型中,注意力层(attention layer)占据了大部分的参数量,这主要归因于多个注意力头(attention heads)的参数。其中,查询(Q)、键(K)、值(V)三个矩阵的参数量相同。给定隐藏层大小(hidden_size)为 2304,并使用 64 个注意力头,每个头的维度设置为 36,那么这三个矩阵的总参数量计算为 `3*2304*36*64=15,925,248`。

此外,还需要一个映射矩阵将这 64 个头的输出重新映射回输入的维度,该映射矩阵的参数量为 `2304*2304=5,308,416`。

因此,注意力层的总参数量为 `15,925,248 + 5,308,416 = 21,233,664` 约 21M。

### RMSNorm

`rms_layernorm` 是一种归一化层,它结合了 RMSProp 优化器和 Layer Normalization 的概念。可以对输入进行归一化处理,使得网络在训练过程中更加稳定。

$$ y = W \times \left(\frac{H}{\sqrt{mean(H^2) + \epsilon}}\right) $$

`rms_layernorm`层首先计算输入的平方的均值,然后用输入除以这个均值的平方根(加上一个很小的常数以防止除以零),从而确保输入的每个元素都在一个相对稳定的范围内。然后,这个层会乘以一个可学习的权重参数。

这种归一化策略有助于减少训练过程中的内部协变量偏移,降低模型对初始化的敏感度,同时也能加速训练过程。

In [9]:

class MiniCPMRMSNorm(nn.Module):
 def __init__(self, hidden_size, eps=1e-6):
 super().__init__()
 # 初始化权重参数为1,形状由hidden_size决定
 self.weight = nn.Parameter(torch.ones(hidden_size)) 
 # 设置方差的epsilon值,防止除以0
 self.variance_epsilon = eps

 def forward(self, hidden_states):
 # 保存输入的数据类型,以便后续恢复
 old_dtype = hidden_states.dtype
 # 计算方差,先转换数据类型以提高精度,然后计算平方的均值
 variance = hidden_states.to(torch.float32).pow(2).mean(dim=-1, keepdim=True)
 # 标准化隐藏状态,使用rsqrt(方差+epsilon的倒数根)进行缩放,并恢复原数据类型
 hidden_states = (hidden_states * torch.rsqrt(variance + self.variance_epsilon)).to(old_dtype)
 # 应用权重参数,进行缩放
 return hidden_states * self.weight


### SwiGLU 的 MLP

MiniCPM 的 MLP(多层感知器)结构采用 SwiGLU 激活层。该结构包含三个线性层:gate_proj、up_proj 和 down_proj,以及一个 SiLU 激活函数。将 gate_proj 层的结果通过 SiLU 激活函数转化,控制 up_proj 层的激活权重,对输入 x 进行特征提取和转换,然后通过 down_proj 层将转换后的特征映射回原始维度,从而实现一次前向传播。这种设计策略使得模型在保持输出维度不变的同时,能够有效地提取和转换输入特征。

SiLU(Sigmoid Linear Unit)激活函数是一种非线性函数,其公式为 $$ f(x) = x \cdot \sigma(x) $$当输入值为负时,该函数的输出接近于0;而当输入值为正时,输出则接近于输入值本身。这种特性使得 SiLU 函数具有无上界、有下界、平滑且非单调的特征。在深度学习模型的众多实践中,SiLU 函数已被证明在性能上超越了 ReLU 及其他激活函数。SiLU 函数不仅继承了 ReLU 激活函数的优点(例如,能够有效缓解梯度消失问题),同时也克服了 ReLU 函数的一些不足(例如,ReLU 函数在负数部分梯度为零,且非零中心)。此外,SiLU 函数是一种平滑函数,这意味着在其整个定义域内都存在导数,这对于优化过程是极其有利的。

In [10]:
 
class MiniCPMMLP(nn.Module):
 def __init__(self, config):
 super().__init__()
 self.config = config
 self.hidden_size = config.hidden_size # 2304
 self.intermediate_size = config.intermediate_size # 5760
 self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False)
 self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False)
 self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False)
 self.act_fn = nn.SiLU()

 def forward(self, x): 
 down_proj = self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x))
 return down_proj

MLP 层是模型参数量的另一个重要来源。在 MiniCPM 模型中,MLP 层的参数量主要来自于三个线性层(gate_proj、up_proj 和 down_proj)的参数。给定隐藏层大小(hidden_size)为 2304,up_proj 和 gate_proj 将均数据升维到 5760,down_proj 再降维到 2304,那么这三个线性层的参数量分别为 `2304*5760=13,276,160`,`2304*5760=13,276,160`,`5760*2304=13,276,160`。

MLP 层的总参数量为 `13,276,160 + 13,276,160 + 13,276,160 = 39,828,480`,约 39M。

### DecoderLayer

在构建 MiniCPM 模型的解码器层 `MiniCPMDecoderLayer` 时,我们将充分利用已经构建的关键组件:`MiniCPMAttention` 类负责执行注意力计算,`MiniCPMMLP` 类处理全连接层的运算,而 `MiniCPMRMSNorm` 类则负责执行层归一化操作,这包括对输入的隐藏状态进行归一化以及在注意力计算之后进行归一化处理。

解码器层的处理流程遵循了解码器层设计的通用模式。首先,对输入的隐藏状态进行层归一化处理,接着通过自注意力机制对其进行加工处理。处理后的隐藏状态会与原始的隐藏状态进行残差连接,然后进行比例缩放。之后,对这个经过残差连接和比例缩放处理的隐藏状态再次进行层归一化处理,并通过全连接层进行加工处理。处理后的隐藏状态再次与原始的隐藏状态进行残差连接,并进行比例缩放。


在深层神经网络中,随着层数的增加,残差连接的累积可能导致梯度爆炸或梯度消失的问题。通过引入缩放机制,可以确保每一层的输出保持在一个合理的范围内,从而提升训练过程的稳定性和模型的整体性能。通过缩放因子 `self.scale_depth / math.sqrt(self.num_hidden_layers)` 调整残差连接的贡献度,以确保每一层的输出既不会因层数增加而过大,也不会过小。

In [None]:

class MiniCPMDecoderLayer(nn.Module):
 def __init__(self, config: MiniCPMConfig, layer_idx: int):
 super().__init__()
 self.hidden_size = config.hidden_size
 self.self_attn = MiniCPMAttention(config=config, layer_idx=layer_idx)

 self.mlp = MiniCPMMLP(config)
 self.input_layernorm = MiniCPMRMSNorm(config.hidden_size, eps=config.rms_norm_eps)
 self.post_attention_layernorm = MiniCPMRMSNorm(config.hidden_size, eps=config.rms_norm_eps)

 self.scale_depth = config.scale_depth
 self.num_hidden_layers = config.num_hidden_layers

 def forward(
 self,
 hidden_states: torch.Tensor,
 attention_mask: Optional[torch.Tensor] = None,
 position_ids: Optional[torch.LongTensor] = None,
 past_key_value: Optional[Tuple[torch.Tensor]] = None,
 output_attentions: Optional[bool] = False,
 use_cache: Optional[bool] = False,
 **kwargs,
 ) -> Tuple[torch.FloatTensor, Optional[Tuple[torch.FloatTensor, torch.FloatTensor]]]:
 
 residual = hidden_states
 # 对输入归一化
 hidden_states = self.input_layernorm(hidden_states)
 # Self Attention 计算
 hidden_states, self_attn_weights, present_key_value = self.self_attn(
 hidden_states=hidden_states,
 attention_mask=attention_mask,
 position_ids=position_ids,
 past_key_value=past_key_value,
 output_attentions=output_attentions,
 use_cache=use_cache,
 **kwargs,
 )
 # 应用残差连接并缩放
 hidden_states = residual + hidden_states * (self.scale_depth / math.sqrt(self.num_hidden_layers))

 residual = hidden_states
 # 对 attention 结果归一化
 hidden_states = self.post_attention_layernorm(hidden_states)

 hidden_states = self.mlp(hidden_states)
 # 应用残差连接并缩放
 hidden_states = residual + hidden_states * (self.scale_depth / math.sqrt(self.num_hidden_layers))

 outputs = (hidden_states,)

 if output_attentions:
 outputs += (self_attn_weights,)

 if use_cache:
 outputs += (present_key_value,)

 return outputs


每个解码层都由 attention 层和 MLP 层组成,所以一个解码器的参数量为 `21,233,664 + 39,828,480 = 61,062,144` 约 61M。

### Model

用一个 Model 类进行所有 MiniCPM 的基本配置

In [11]:
 
class MiniCPMPreTrainedModel(nn.Module):
 def __init__(self, *args, **kwargs):
 self.config = args[0]

 super().__init__()

 def _init_weights(self, module):
 std = self.config.initializer_range
 if isinstance(module, nn.Linear):
 module.weight.data.normal_(mean=0.0, std=std)
 if module.bias is not None:
 module.bias.data.zero_()
 elif isinstance(module, nn.Embedding):
 module.weight.data.normal_(mean=0.0, std=std)
 if module.padding_idx is not None:
 module.weight.data[module.padding_idx].zero_()


MiniCPMModel 使整个模型的核心部分,它负责整个模型的前向计算过程。包括以下几个关键步骤:

1. **参数校验**:确保`input_ids`和`inputs_embeds`不会同时指定。
2. **位置ID处理**:若未提供`position_ids`,则自动创建一个序列。
3. **词嵌入生成**:基于`input_ids`生成词嵌入,或直接采用`inputs_embeds`。
4. **注意力掩码准备**:构造一个四维的因果注意力掩码。
5. **隐藏状态初始化**:以词嵌入向量初始化隐藏状态。

在完成隐藏状态的初始化后,模型通过若干解码器层对隐藏状态进行加工处理。在此过程中,根据需求,模型能够输出隐藏状态和注意力机制的详细信息。这包括对最终层隐藏状态的归一化处理,以及对所有隐藏状态和自注意力机制输出的汇总。此外,还涉及到批次大小和序列长度的计算、缓存机制的管理、位置索引的生成、词嵌入层的操作、解码器层的加工处理,以及最终输出层的归一化处理。

In [12]:

class MiniCPMModel(MiniCPMPreTrainedModel):

 def __init__(self, config: MiniCPMConfig):
 super().__init__(config)

 self.padding_idx = config.pad_token_id
 self.vocab_size = config.vocab_size

 self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, self.padding_idx)
 self.layers = nn.ModuleList(
 [MiniCPMDecoderLayer(config, layer_idx) for layer_idx in range(config.num_hidden_layers)]
 )

 self.norm = MiniCPMRMSNorm(config.hidden_size, eps=config.rms_norm_eps)

 self.gradient_checkpointing = False
 # self._init_weights()
 
 def _init_weights(self, module):
 std = self.config.initializer_range
 if isinstance(module, nn.Linear):
 module.weight.data.normal_(mean=0.0, std=std)
 if module.bias is not None:
 module.bias.data.zero_()
 elif isinstance(module, nn.Embedding):
 module.weight.data.normal_(mean=0.0, std=std)
 if module.padding_idx is not None:
 module.weight.data[module.padding_idx].zero_()
 
 def get_input_embeddings(self):
 return self.embed_tokens

 def set_input_embeddings(self, value):
 self.embed_tokens = value

 def forward(
 self,
 input_ids: torch.LongTensor = None,
 attention_mask: Optional[torch.Tensor] = None,
 position_ids: Optional[torch.LongTensor] = None,
 past_key_values: Optional[List[torch.FloatTensor]] = None,
 inputs_embeds: Optional[torch.FloatTensor] = None,
 use_cache: Optional[bool] = None,
 output_attentions: Optional[bool] = None,
 output_hidden_states: Optional[bool] = None,
 return_dict: Optional[bool] = None,
 ) -> Union[Tuple, BaseModelOutputWithPast]:
 output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
 output_hidden_states = (
 output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
 )
 use_cache = use_cache if use_cache is not None else self.config.use_cache

 return_dict = return_dict if return_dict is not None else self.config.use_return_dict

 if input_ids is not None and inputs_embeds is not None:
 raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time")
 elif input_ids is not None:
 batch_size, seq_length = input_ids.shape[:2]
 elif inputs_embeds is not None:
 batch_size, seq_length = inputs_embeds.shape[:2]
 else:
 raise ValueError("You have to specify either input_ids or inputs_embeds")

 past_key_values_length = 0
 
 if use_cache:
 if past_key_values is not None and len(past_key_values) > 0 and len(past_key_values[0]) > 0 and len(past_key_values[0][0].shape) > 2:
 past_key_values_length = past_key_values[0][0].shape[-2]

 if position_ids is None:
 device = input_ids.device if input_ids is not None else inputs_embeds.device
 position_ids = torch.arange(
 past_key_values_length, seq_length + past_key_values_length, dtype=torch.long, device=device
 )
 position_ids = position_ids.unsqueeze(0)

 if inputs_embeds is None:
 inputs_embeds = self.embed_tokens(input_ids) * self.config.scale_emb

 attention_mask = prepare_4d_causal_attention_mask(attention_mask, seq_length, past_key_values_length, inputs_embeds.dtype, inputs_embeds.device)
 
 # embed positions
 hidden_states = inputs_embeds

 # decoder layers
 all_hidden_states = () if output_hidden_states else None
 all_self_attns = () if output_attentions else None
 next_decoder_cache = None

 for decoder_layer in self.layers:
 if output_hidden_states:
 all_hidden_states += (hidden_states,)

 layer_outputs = decoder_layer(
 hidden_states,
 attention_mask=attention_mask,
 position_ids=position_ids,
 past_key_value=past_key_values,
 output_attentions=output_attentions,
 use_cache=use_cache,
 )

 hidden_states = layer_outputs[0]

 if use_cache:
 next_decoder_cache = layer_outputs[2 if output_attentions else 1]

 if output_attentions:
 all_self_attns += (layer_outputs[1],)
 # 对最终的结果归一化
 hidden_states = self.norm(hidden_states)

 # 添加最后一个解码器层的隐藏状态
 if output_hidden_states:
 all_hidden_states += (hidden_states,)

 next_cache = None
 if use_cache:
 next_cache = next_decoder_cache
 if not return_dict:
 return tuple(v for v in [hidden_states, next_cache, all_hidden_states, all_self_attns] if v is not None)
 return BaseModelOutputWithPast(
 last_hidden_state=hidden_states,
 past_key_values=next_cache,
 hidden_states=all_hidden_states,
 attentions=all_self_attns,
 )

Embedding 占模型中非常大的一个参数量,这里的为 `122753 * 2304 = 282,822,912`,即约 282M 参数。

### CausalLM

首先定义一个 `CausalLMOutputWithPast`类,主要用于因果语言模型(或自回归模型)的输出。

In [None]:
class CausalLMOutputWithPast(OrderedDict):
 loss: Optional[torch.FloatTensor] = None
 logits: torch.FloatTensor = None
 past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None
 hidden_states: Optional[Tuple[torch.FloatTensor, ...]] = None
 attentions: Optional[Tuple[torch.FloatTensor, ...]] = None

我们来看一下如何准备输入数据的步骤。

1. **调整输入数据**:利用`adjust_input_ids`函数,根据提供的注意力掩码或之前计算出的键值对长度,调整`input_ids`的长度,以确保其符合模型所期望的长度,从而能够正确地应用注意力机制。

2. **处理先前的键值对**:计算先前键值对的长度,并基于此调整`input_ids`和`attention_mask`。

3. **生成位置ID**:对于Transformer模型而言,位置ID极为关键,它为模型提供了序列中各个元素的位置信息。如果没有直接提供位置ID但提供了注意力掩码,该函数将依据注意力掩码来生成位置ID。

4. **更新模型输入**:根据是否提供了`inputs_embeds`以及是否利用了先前的键值对,该函数决定使用哪种类型的输入,并将位置ID、先前的键值对、是否使用缓存以及注意力掩码等信息综合到模型输入中。

In [None]:
 def prepare_inputs_for_generation(
 self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, **kwargs
 ):
 # 调整输入以匹配注意力掩码或过去的键值长度
 def adjust_input_ids(input_ids, attention_mask, past_length):
 if attention_mask is not None and attention_mask.shape[1] > input_ids.shape[1]:
 return input_ids[:, -(attention_mask.shape[1] - past_length):]
 elif past_length < input_ids.shape[1]:
 return input_ids[:, past_length:]
 return input_ids

 # 根据 kv 缓存的长度调整输入
 if past_key_values is not None and len(past_key_values) > 0 and len(past_key_values[0]) > 0 and len(past_key_values[0][0].shape) > 2:
 cache_length = past_length = past_key_values[0][0].shape[2]
 max_cache_length = None

 input_ids = adjust_input_ids(input_ids, attention_mask, past_length)

 if max_cache_length is not None and attention_mask is not None and cache_length + input_ids.shape[1] > max_cache_length:
 attention_mask = attention_mask[:, -max_cache_length:]
 
 # 按照注意力掩码生成位置ID
 position_ids = kwargs.get("position_ids", None)
 if attention_mask is not None and position_ids is None:
 position_ids = attention_mask.long().cumsum(-1) - 1
 position_ids.masked_fill_(attention_mask == 0, 1)
 if past_key_values:
 position_ids = position_ids[:, -input_ids.shape[1]:]
 
 # 更新模型输入
 model_inputs = {"inputs_embeds": inputs_embeds} if inputs_embeds is not None and past_key_values is None else {"input_ids": input_ids}
 
 model_inputs.update(
 {
 "position_ids": position_ids,
 "past_key_values": past_key_values,
 "use_cache": kwargs.get("use_cache"),
 "attention_mask": attention_mask,
 }
 )
 return model_inputs 

MiniCPM 采用了 tie-Embedding 的方式,即词嵌入层和输出层共享参数。这种方式可以减少模型的参数量,提高模型的训练效率。所以需要有获取和设置输入输出词嵌入层的方法。

`MiniCPMForCausalLM`类通过继承`MiniCPMPreTrainedModel`继承了基础属性。在其构造函数`__init__`中,执行了以下几个关键步骤:

1. **初始化父类**:通过`super().__init__(config)`调用父类构造函数,确保 config 被正确初始化。
2. **构建模型核心**:实例化`MiniCPMModel`作为模型的核心组件。
3. **定义线性层**:根据配置中的`vocab_size`确定词汇表的大小,并定义一个线性层`lm_head`。该线性层负责将隐藏层状态映射到词汇表上的得分(即logits),并明确指出不使用偏置项(`bias=False`),直接复用输入 Emb 层(读取权重时手动实现)。

在`forward`方法中,执行了以下几个关键步骤:

1. **确定输出内容**:依据配置来决定是否输出注意力权重和隐藏层状态。
2. **执行前向传播**:调用`self.model`进行实际的前向传播计算,获取最后一层的隐藏层状态。
3. **转换为logits**:通过`lm_head`将隐藏层状态转换为logits。
4. **计算损失**:如果提供了标签,则根据这些标签计算交叉熵损失,这一步骤对模型的训练至关重要。
5. **返回结果**:根据`return_dict`的设置,决定是返回一个包含所有输出的元组,还是返回一个命名的输出对象`CausalLMOutputWithPast`。

In [13]:
 
class MiniCPMForCausalLM(MiniCPMPreTrainedModel):
 _tied_weights_keys = ["lm_head.weight"]

 def __init__(self, config):
 super().__init__(config)
 self.model = MiniCPMModel(config)
 self.vocab_size = config.vocab_size
 self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

 # Initialize weights and apply final processing
 # self.post_init()

 def get_input_embeddings(self):
 return self.model.embed_tokens

 def set_input_embeddings(self, value):
 self.model.embed_tokens = value

 def get_output_embeddings(self):
 return self.lm_head

 def set_output_embeddings(self, new_embeddings):
 self.lm_head = new_embeddings

 def set_decoder(self, decoder):
 self.model = decoder

 def get_decoder(self):
 return self.model

 def forward(
 self,
 input_ids: torch.LongTensor = None,
 attention_mask: Optional[torch.Tensor] = None,
 position_ids: Optional[torch.LongTensor] = None,
 past_key_values: Optional[List[torch.FloatTensor]] = None,
 inputs_embeds: Optional[torch.FloatTensor] = None,
 labels: Optional[torch.LongTensor] = None,
 use_cache: Optional[bool] = None,
 output_attentions: Optional[bool] = None,
 output_hidden_states: Optional[bool] = None,
 return_dict: Optional[bool] = None,
 ) -> Union[Tuple, CausalLMOutputWithPast]:

 output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
 output_hidden_states = (
 output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
 )
 return_dict = return_dict if return_dict is not None else self.config.use_return_dict

 # 调用模型
 outputs = self.model(
 input_ids=input_ids,
 attention_mask=attention_mask,
 position_ids=position_ids,
 past_key_values=past_key_values,
 inputs_embeds=inputs_embeds,
 use_cache=use_cache,
 output_attentions=output_attentions,
 output_hidden_states=output_hidden_states,
 return_dict=return_dict,
 )
 
 # 获取最后一层隐藏状态,并通过线性层(lm_head)转换为logits
 hidden_states = outputs.last_hidden_state
 logits = self.lm_head(hidden_states / (self.config.hidden_size / self.config.dim_model_base))
 logits = logits.float()
 
 loss = None
 # 如果存在标签,则进行损失计算
 if labels is not None:
 # 对logits和labels进行错位,以便预测下一个token
 shift_logits = logits[..., :-1, :].contiguous()
 shift_labels = labels[..., 1:].contiguous()
 # 为交叉熵损失计算准备,将tokens展平
 loss_fct = CrossEntropyLoss()
 shift_logits = shift_logits.view(-1, self.config.vocab_size)
 shift_labels = shift_labels.view(-1)
 shift_labels = shift_labels.to(shift_logits.device)
 # 计算交叉熵损失
 loss = loss_fct(shift_logits, shift_labels)

 if not return_dict:
 output = (logits,) + outputs[1:]
 return (loss,) + output if loss is not None else output

 return CausalLMOutputWithPast(
 loss=loss,
 logits=logits,
 past_key_values=outputs.past_key_values,
 hidden_states=outputs.hidden_states,
 attentions=outputs.attentions,
 )

定义一个名为 `generate` 的方法,用于生成文本序列。该方法设计灵活,既能进行确定性的最大概率生成,也能通过随机采样产生更加多样化的输出。通过调整方法参数,用户可以在生成的质量与多样性之间做出权衡。

1. **初始化缓存**:若启用缓存且未提供过去的键值对,则该方法会初始化一个空的键值对缓存。

2. **准备输入**:计算批次大小,并初始化两个标志变量:`finished` 用于标记每个序列是否完成生成,`unfinished_sequences` 用于标记每个序列是否尚未完成。

3. **获取 pad_token_id**:从 tokenizer 中获取 pad token 的 ID,该 ID 将用于后续填充生成的序列。

4. **生成循环**:最多循环 `max_new_tokens` 次,每次循环生成一个新的 token。循环内部操作如下:
 - 准备当前步骤的输入,并通过模型获取 logits。
 - 若指定了 `top_k`,则将 logits 中非 top_k 的值设置为负无穷大,以便在采样时忽略它们。
 - 根据 `do_sample` 参数决定是通过采样还是选择最大概率的 token 作为下一个 token。
 - 更新输入序列,将新生成的 token 添加到输入序列中。
 - 若提供了 `attention_mask`,则更新它以包括新的 token。
 - 更新 `finished` 和 `unfinished_sequences` 标志,以标记哪些序列已完成或仍未完成。
 - 检查是否所有序列都已完成,若是,则终止循环。

5. **返回生成的序列**:最终,该方法返回包含原始及新生成 token 的输入序列。

In [None]:
 @torch.no_grad()
 def generate(self, input_ids, max_new_tokens=1024, temperature=1.0, top_k=None, use_cache=False, past_key_values=None, tokenizer=None, do_sample=False, **model_kwargs):
 if use_cache and past_key_values is None:
 # 初始化 kv 缓存
 past_key_values = ([], [])
 model_kwargs["past_key_values"] = past_key_values
 batch_size = input_ids.size(0)
 # 初始化完成标志和未完成序列标志
 finished = torch.zeros(batch_size, dtype=torch.bool).to(input_ids.device)
 unfinished_sequences = torch.ones(batch_size, dtype=torch.bool).to(input_ids.device)
 # 获取 pad_token_id 用于填充
 pad_token_id = tokenizer.pad_token_id # 提前获取 pad_token_id

 for _ in range(max_new_tokens):
 # 准备生成的输入
 model_inputs = self.prepare_inputs_for_generation(input_ids, **model_kwargs)

 logits = self(**model_inputs).logits[:, -1, :] / temperature # Apply temperature
 
 if top_k is not None:
 indices_to_remove = logits < torch.topk(logits, top_k)[0][..., -1, None]
 logits[indices_to_remove] = -float('Inf')
 
 if do_sample:
 probs = F.softmax(logits, dim=-1)
 next_tokens = torch.multinomial(probs, num_samples=1).squeeze(1)
 else:
 next_tokens = torch.argmax(logits, dim=-1)
 
 # 更新未完成序列的 next_tokens 
 next_tokens = next_tokens * unfinished_sequences + pad_token_id * (~unfinished_sequences) 
 input_ids = torch.cat([input_ids, next_tokens[:, None]], dim=-1)
 if "attention_mask" in model_kwargs:
 # 更新 attention_mask
 attention_mask = model_kwargs["attention_mask"]
 model_kwargs["attention_mask"] = torch.cat(
 [attention_mask, attention_mask.new_ones((attention_mask.shape[0], 1))], dim=-1
 )
 # 更新完成和未完成的序列标志
 finished |= (next_tokens.squeeze(-1) == tokenizer.eos_token_id)
 unfinished_sequences &= ~finished
 
 # 如果所有序列都完成,则停止生成
 if finished.all():
 break

 return input_ids

所以整个 MiniCPM 的非 Embedding 参数量为:

整个模型有 40 个 decoder 层,每个 decoder 层由一个 21M 参数的 attention 层和一个 39M 参数的 MLP 层组成,共约 61M 参数。
所以总参数量为 `61,062,144 * 40 = 2,442,485,760`,约 2.4B。

Embedding 层后的参数为:`122753 * 2304 = 282,822,912`, 约 282M。

考虑 Embedding 层后的总参数为: `2,442,485,760 + 282,822,912 = 2,725,308,672`,约 2.7B 参数。