RLHF微调

什么是RLHF?

RLHF(Reinforcement Learning from Human Feedback)是一种通过人类反馈进行强化学习的方法,通过三个阶段的训练来使大语言模型更好地符合人类偏好和价值观。

核心架构

三阶段训练流程

阶段1:监督微调(SFT) → 阶段2:奖励模型训练(RM) → 阶段3:强化学习优化(PPO)

整体架构图

人类标注数据 → SFT模型 → 偏好数据收集 → 奖励模型 → PPO优化 → 最终模型
     ↓              ↓              ↓           ↓         ↓
   指令-回答对    基础能力      偏好对比     奖励信号   策略优化
  1. 监督微调SFT (Stage 1)
    • 目标:训练一个初始的策略模型,使其具备基本的指令理解和响应能力。
    • 方法:使用高质量的”指令-回答”对进行监督学习。
  2. 奖励模型训练 (Stage 2)
    • 目标:训练一个奖励模型(RM),使其能够评估模型输出的质量,反映人类偏好。
    • 方法:收集人类对模型不同输出的偏好排序数据,训练RM进行评分。
  3. 强化学习优化 (Stage 3)
    • 目标:使用奖励模型作为信号,通过强化学习算法(通常是PPO微调)优化第一阶段的SFT模型。
    • 方法:策略模型生成回答,奖励模型给出评分,PPO算法根据奖励调整策略模型参数。

阶段一:监督微调(SFT)

SFT目标

建立模型的基础指令跟随能力,使其能够理解和执行各种任务。

实现方法

from transformers import AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments
 
def stage1_sft_training(model_name, sft_dataset):
    """阶段1:监督微调训练"""
    
    # 加载模型和分词器
    model = AutoModelForCausalLM.from_pretrained(model_name)
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token
    
    # 数据预处理
    def preprocess_sft_data(examples):
        """预处理SFT数据"""
        inputs = []
        for instruction, output in zip(examples["instruction"], examples["output"]):
            text = f"### 指令:\n{instruction}\n\n### 回答:\n{output}{tokenizer.eos_token}"
            inputs.append(text)
        
        # 分词
        model_inputs = tokenizer(
            inputs,
            truncation=True,
            padding=True,
            max_length=512,
            return_tensors="pt"
        )
        
        # 设置labels
        model_inputs["labels"] = model_inputs["input_ids"].clone()
        
        return model_inputs
    
    # 处理数据集
    tokenized_dataset = sft_dataset.map(
        preprocess_sft_data,
        batched=True,
        remove_columns=sft_dataset.column_names
    )
    
    # 训练配置
    training_args = TrainingArguments(
        output_dir="./sft_output",
        num_train_epochs=3,
        per_device_train_batch_size=4,
        gradient_accumulation_steps=4,
        learning_rate=2e-5,
        warmup_ratio=0.1,
        logging_steps=100,
        save_strategy="epoch",
        evaluation_strategy="epoch",
        load_best_model_at_end=True,
    )
    
    # 训练器
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset,
        tokenizer=tokenizer,
    )
    
    # 开始训练
    trainer.train()
    trainer.save_model()
    
    return model

阶段二:奖励模型训练(RM)

奖励模型架构

import torch
import torch.nn as nn
from transformers import AutoModel
 
class RewardModel(nn.Module):
    def __init__(self, base_model_name, hidden_size=768):
        super().__init__()
        
        # 基础语言模型
        self.base_model = AutoModel.from_pretrained(base_model_name)
        
        # 奖励头
        self.reward_head = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_size, 1)  # 输出单个奖励分数
        )
        
        # 冻结基础模型的部分层(可选)
        for param in self.base_model.embeddings.parameters():
            param.requires_grad = False
    
    def forward(self, input_ids, attention_mask=None):
        # 获取基础模型输出
        outputs = self.base_model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        
        # 使用最后一个token的表示(通常是EOS token)
        last_hidden_state = outputs.last_hidden_state
        
        # 找到每个序列的最后一个非padding token
        if attention_mask is not None:
            sequence_lengths = attention_mask.sum(dim=1) - 1
            batch_size = last_hidden_state.size(0)
            last_token_hidden = last_hidden_state[
                torch.arange(batch_size), sequence_lengths
            ]
        else:
            last_token_hidden = last_hidden_state[:, -1]
        
        # 计算奖励分数
        reward = self.reward_head(last_token_hidden)
        
        return reward.squeeze(-1)  # [batch_size]

奖励模型训练

def stage2_reward_model_training(sft_model, preference_dataset):
    """阶段2:奖励模型训练"""
    
    # 创建奖励模型
    reward_model = RewardModel(sft_model.config.name_or_path)
    
    # 偏好数据预处理
    def preprocess_preference_data(examples):
        """预处理偏好数据"""
        chosen_texts = []
        rejected_texts = []
        
        for prompt, chosen, rejected in zip(
            examples["prompt"], examples["chosen"], examples["rejected"]
        ):
            chosen_text = f"{prompt}\n{chosen}"
            rejected_text = f"{prompt}\n{rejected}"
            
            chosen_texts.append(chosen_text)
            rejected_texts.append(rejected_text)
        
        # 分词
        chosen_inputs = tokenizer(
            chosen_texts,
            truncation=True,
            padding=True,
            max_length=512,
            return_tensors="pt"
        )
        
        rejected_inputs = tokenizer(
            rejected_texts,
            truncation=True,
            padding=True,
            max_length=512,
            return_tensors="pt"
        )
        
        return {
            "chosen_input_ids": chosen_inputs["input_ids"],
            "chosen_attention_mask": chosen_inputs["attention_mask"],
            "rejected_input_ids": rejected_inputs["input_ids"],
            "rejected_attention_mask": rejected_inputs["attention_mask"],
        }
    
    # 奖励模型损失函数
    def reward_loss(chosen_rewards, rejected_rewards):
        """计算奖励模型损失(偏好学习)"""
        # 使用sigmoid损失,确保chosen的奖励高于rejected
        loss = -torch.log(torch.sigmoid(chosen_rewards - rejected_rewards)).mean()
        
        # 计算准确率
        accuracy = (chosen_rewards > rejected_rewards).float().mean()
        
        return loss, accuracy
    
    # 训练循环
    optimizer = torch.optim.AdamW(reward_model.parameters(), lr=1e-5)
    
    for epoch in range(3):
        total_loss = 0
        total_accuracy = 0
        
        for batch in preference_dataloader:
            # 前向传播
            chosen_rewards = reward_model(
                batch["chosen_input_ids"],
                batch["chosen_attention_mask"]
            )
            
            rejected_rewards = reward_model(
                batch["rejected_input_ids"], 
                batch["rejected_attention_mask"]
            )
            
            # 计算损失
            loss, accuracy = reward_loss(chosen_rewards, rejected_rewards)
            
            # 反向传播
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            total_accuracy += accuracy.item()
        
        print(f"Epoch {epoch}: Loss={total_loss:.4f}, Accuracy={total_accuracy:.4f}")
    
    return reward_model

阶段三:PPO优化

PPO算法实现

import torch.nn.functional as F
from torch.distributions import Categorical
 
class PPOTrainer:
    def __init__(self, policy_model, reward_model, ref_model, 
                 clip_ratio=0.2, kl_coef=0.1, value_coef=1.0):
        self.policy_model = policy_model
        self.reward_model = reward_model
        self.ref_model = ref_model
        
        self.clip_ratio = clip_ratio
        self.kl_coef = kl_coef
        self.value_coef = value_coef
        
        # 冻结参考模型和奖励模型
        for param in self.ref_model.parameters():
            param.requires_grad = False
        for param in self.reward_model.parameters():
            param.requires_grad = False
        
        self.optimizer = torch.optim.AdamW(
            self.policy_model.parameters(), 
            lr=1e-6  # PPO使用很小的学习率
        )
    
    def generate_responses(self, prompts, max_length=256):
        """生成回答"""
        self.policy_model.eval()
        
        responses = []
        log_probs = []
        
        for prompt in prompts:
            # 编码prompt
            inputs = tokenizer(prompt, return_tensors="pt")
            
            # 生成回答
            with torch.no_grad():
                outputs = self.policy_model.generate(
                    **inputs,
                    max_length=max_length,
                    do_sample=True,
                    temperature=0.7,
                    pad_token_id=tokenizer.eos_token_id,
                    return_dict_in_generate=True,
                    output_scores=True
                )
            
            generated_ids = outputs.sequences[0][inputs["input_ids"].size(1):]
            response = tokenizer.decode(generated_ids, skip_special_tokens=True)
            
            # 计算log概率
            logits = torch.stack(outputs.scores, dim=1)
            log_prob = F.log_softmax(logits, dim=-1)
            token_log_probs = torch.gather(
                log_prob, -1, generated_ids.unsqueeze(0).unsqueeze(-1)
            ).squeeze(-1)
            
            responses.append(response)
            log_probs.append(token_log_probs.sum().item())
        
        return responses, log_probs
    
    def compute_rewards(self, prompts, responses):
        """计算奖励"""
        rewards = []
        
        for prompt, response in zip(prompts, responses):
            full_text = f"{prompt}\n{response}"
            inputs = tokenizer(full_text, return_tensors="pt")
            
            with torch.no_grad():
                reward = self.reward_model(**inputs)
                rewards.append(reward.item())
        
        return rewards
    
    def compute_kl_penalty(self, prompts, responses):
        """计算KL散度惩罚"""
        kl_penalties = []
        
        for prompt, response in zip(prompts, responses):
            full_text = f"{prompt}\n{response}"
            inputs = tokenizer(full_text, return_tensors="pt")
            
            # 策略模型的logits
            policy_outputs = self.policy_model(**inputs)
            policy_logits = policy_outputs.logits
            
            # 参考模型的logits
            with torch.no_grad():
                ref_outputs = self.ref_model(**inputs)
                ref_logits = ref_outputs.logits
            
            # 计算KL散度
            policy_probs = F.softmax(policy_logits, dim=-1)
            ref_probs = F.softmax(ref_logits, dim=-1)
            
            kl_div = F.kl_div(
                F.log_softmax(policy_logits, dim=-1),
                ref_probs,
                reduction='batchmean'
            )
            
            kl_penalties.append(kl_div.item())
        
        return kl_penalties
    
    def ppo_step(self, prompts, responses, old_log_probs, rewards):
        """PPO优化步骤"""
        self.policy_model.train()
        
        total_loss = 0
        
        for prompt, response, old_log_prob, reward in zip(
            prompts, responses, old_log_probs, rewards
        ):
            full_text = f"{prompt}\n{response}"
            inputs = tokenizer(full_text, return_tensors="pt")
            
            # 当前策略的log概率
            outputs = self.policy_model(**inputs)
            logits = outputs.logits
            
            # 计算当前log概率
            response_tokens = tokenizer(response, return_tensors="pt")["input_ids"]
            log_probs = F.log_softmax(logits, dim=-1)
            current_log_prob = torch.gather(
                log_probs, -1, response_tokens.unsqueeze(-1)
            ).sum()
            
            # 计算比率
            ratio = torch.exp(current_log_prob - old_log_prob)
            
            # PPO损失
            advantage = reward  # 简化版,实际应该减去baseline
            
            # 裁剪目标
            clipped_ratio = torch.clamp(
                ratio, 1 - self.clip_ratio, 1 + self.clip_ratio
            )
            
            policy_loss = -torch.min(
                ratio * advantage,
                clipped_ratio * advantage
            )
            
            total_loss += policy_loss
        
        # 反向传播
        self.optimizer.zero_grad()
        total_loss.backward()
        torch.nn.utils.clip_grad_norm_(self.policy_model.parameters(), 1.0)
        self.optimizer.step()
        
        return total_loss.item()
    
    def train_step(self, prompts):
        """完整的PPO训练步骤"""
        
        # 1. 生成回答
        responses, old_log_probs = self.generate_responses(prompts)
        
        # 2. 计算奖励
        rewards = self.compute_rewards(prompts, responses)
        
        # 3. 计算KL惩罚
        kl_penalties = self.compute_kl_penalty(prompts, responses)
        
        # 4. 调整奖励(加入KL惩罚)
        adjusted_rewards = [
            r - self.kl_coef * kl for r, kl in zip(rewards, kl_penalties)
        ]
        
        # 5. PPO优化
        loss = self.ppo_step(prompts, responses, old_log_probs, adjusted_rewards)
        
        return {
            "loss": loss,
            "mean_reward": sum(rewards) / len(rewards),
            "mean_kl": sum(kl_penalties) / len(kl_penalties)
        }

完整RLHF训练流程

def full_rlhf_training(base_model_name, sft_data, preference_data, prompts):
    """完整的RLHF训练流程"""
    
    print("阶段1:监督微调(SFT)")
    sft_model = stage1_sft_training(base_model_name, sft_data)
    
    print("阶段2:奖励模型训练(RM)")
    reward_model = stage2_reward_model_training(sft_model, preference_data)
    
    print("阶段3:PPO优化")
    # 创建参考模型(SFT模型的副本)
    ref_model = AutoModelForCausalLM.from_pretrained(sft_model.config.name_or_path)
    ref_model.load_state_dict(sft_model.state_dict())
    
    # 创建PPO训练器
    ppo_trainer = PPOTrainer(
        policy_model=sft_model,
        reward_model=reward_model,
        ref_model=ref_model
    )
    
    # PPO训练循环
    for epoch in range(10):
        metrics = ppo_trainer.train_step(prompts)
        print(f"PPO Epoch {epoch}: {metrics}")
    
    return sft_model

评估与监控

RLHF评估指标

def evaluate_rlhf_model(model, eval_prompts, human_preferences):
    """评估RLHF模型"""
    
    metrics = {
        "helpfulness": 0,
        "harmlessness": 0,
        "honesty": 0,
        "preference_alignment": 0,
        "diversity": 0
    }
    
    responses = []
    
    for prompt in eval_prompts:
        # 生成回答
        inputs = tokenizer(prompt, return_tensors="pt")
        outputs = model.generate(
            **inputs,
            max_length=256,
            do_sample=True,
            temperature=0.7
        )
        
        response = tokenizer.decode(outputs[0], skip_special_tokens=True)
        responses.append(response)
    
    # 计算各项指标
    metrics["helpfulness"] = evaluate_helpfulness(eval_prompts, responses)
    metrics["harmlessness"] = evaluate_harmlessness(responses)
    metrics["honesty"] = evaluate_honesty(responses)
    metrics["preference_alignment"] = evaluate_preference_alignment(
        responses, human_preferences
    )
    metrics["diversity"] = evaluate_diversity(responses)
    
    return metrics
 
def evaluate_helpfulness(prompts, responses):
    """评估有用性"""
    # 可以使用另一个模型或人工评估
    # 这里简化为长度和相关性的启发式评估
    scores = []
    
    for prompt, response in zip(prompts, responses):
        # 简单的启发式评分
        length_score = min(len(response.split()) / 50, 1.0)  # 长度适中
        relevance_score = len(set(prompt.lower().split()) & 
                             set(response.lower().split())) / len(prompt.split())
        
        score = (length_score + relevance_score) / 2
        scores.append(score)
    
    return sum(scores) / len(scores)

训练监控

class RLHFMonitor:
    def __init__(self):
        self.metrics_history = {
            "reward": [],
            "kl_divergence": [],
            "policy_loss": [],
            "value_loss": []
        }
    
    def log_metrics(self, metrics):
        """记录训练指标"""
        for key, value in metrics.items():
            if key in self.metrics_history:
                self.metrics_history[key].append(value)
    
    def check_convergence(self, window=10):
        """检查训练收敛性"""
        if len(self.metrics_history["reward"]) < window:
            return False
        
        recent_rewards = self.metrics_history["reward"][-window:]
        reward_std = torch.std(torch.tensor(recent_rewards))
        
        # 如果奖励标准差很小,认为已收敛
        return reward_std < 0.01
    
    def detect_reward_hacking(self, threshold=2.0):
        """检测奖励黑客攻击"""
        if len(self.metrics_history["reward"]) < 2:
            return False
        
        current_reward = self.metrics_history["reward"][-1]
        previous_reward = self.metrics_history["reward"][-2]
        
        # 如果奖励增长过快,可能存在奖励黑客
        return (current_reward - previous_reward) > threshold

优势与挑战

优势

  1. 人类对齐:直接优化人类偏好
  2. 安全性提升:减少有害输出
  3. 质量改善:提高回答质量和相关性
  4. 可控性:通过奖励函数控制行为
  5. 泛化能力:学到的偏好可以泛化到新任务

挑战

  1. 训练复杂性:三阶段训练流程复杂
  2. 数据需求:需要大量高质量的人类标注
  3. 奖励黑客:模型可能学会欺骗奖励函数
  4. 分布偏移:可能偏离原始模型分布
  5. 计算成本:训练成本高昂

最佳实践

数据收集策略

def rlhf_data_collection_guide():
    """RLHF数据收集指南"""
    
    return {
        "SFT数据": {
            "数量": "10K-100K高质量指令-回答对",
            "质量": "确保回答准确、有用、安全",
            "多样性": "覆盖各种任务类型和领域",
            "格式": "统一的指令格式"
        },
        
        "偏好数据": {
            "数量": "10K-50K偏好对比",
            "标注质量": "多人标注,一致性检查",
            "平衡性": "避免偏向特定类型的回答",
            "更新频率": "定期更新以反映最新偏好"
        },
        
        "评估数据": {
            "独立性": "与训练数据完全独立",
            "全面性": "覆盖所有重要维度",
            "实时性": "反映真实使用场景"
        }
    }

超参数调优

def rlhf_hyperparameter_guide():
    """RLHF超参数调优指南"""
    
    return {
        "SFT阶段": {
            "学习率": "2e-5",
            "批次大小": "4-8",
            "训练轮数": "3-5"
        },
        
        "RM阶段": {
            "学习率": "1e-5",
            "批次大小": "2-4",
            "训练轮数": "1-3"
        },
        
        "PPO阶段": {
            "学习率": "1e-6",
            "KL系数": "0.1",
            "裁剪比率": "0.2",
            "批次大小": "1-2"
        }
    }

相关概念