模型推理部署

概述

模型推理部署是微调流程的最后环节,涉及模型优化、推理加速、服务部署等多个方面。高效的部署策略能显著提升用户体验和系统性能。

推理优化策略

LoRA模型合并

def merge_lora_weights(model, save_path):
    """合并LoRA权重到基础模型"""
    from peft import PeftModel
    
    # 合并LoRA权重
    merged_model = model.merge_and_unload()
    
    # 保存合并后的模型
    merged_model.save_pretrained(save_path)
    
    print(f"LoRA权重已合并并保存到: {save_path}")
    return merged_model
 
# 使用示例
def deploy_merged_model():
    """部署合并后的模型"""
    from transformers import AutoModelForCausalLM, AutoTokenizer
    
    # 加载合并后的模型
    model = AutoModelForCausalLM.from_pretrained("./merged_model")
    tokenizer = AutoTokenizer.from_pretrained("./merged_model")
    
    # 推理速度更快,无需额外的LoRA计算
    def inference(text):
        inputs = tokenizer(text, return_tensors="pt")
        with torch.no_grad():
            outputs = model.generate(**inputs, max_length=512)
        return tokenizer.decode(outputs[0], skip_special_tokens=True)
    
    return inference

模型量化

def quantize_model(model, quantization_type="int8"):
    """模型量化以减少显存占用"""
    
    if quantization_type == "int8":
        # 8-bit量化
        from transformers import BitsAndBytesConfig
        
        quantization_config = BitsAndBytesConfig(
            load_in_8bit=True,
            llm_int8_threshold=6.0,
            llm_int8_has_fp16_weight=False,
        )
        
    elif quantization_type == "int4":
        # 4-bit量化
        quantization_config = BitsAndBytesConfig(
            load_in_4bit=True,
            bnb_4bit_compute_dtype=torch.float16,
            bnb_4bit_use_double_quant=True,
            bnb_4bit_quant_type="nf4"
        )
    
    # 加载量化模型
    quantized_model = AutoModelForCausalLM.from_pretrained(
        model_path,
        quantization_config=quantization_config,
        device_map="auto"
    )
    
    return quantized_model
 
# 动态量化
def dynamic_quantization(model):
    """动态量化(推理时量化)"""
    import torch.quantization as quant
    
    # 设置量化配置
    model.qconfig = quant.get_default_qconfig('fbgemm')
    
    # 准备量化
    quant.prepare(model, inplace=True)
    
    # 量化模型
    quant.convert(model, inplace=True)
    
    return model

推理加速技术

批量推理

class BatchInference:
    def __init__(self, model, tokenizer, batch_size=8):
        self.model = model
        self.tokenizer = tokenizer
        self.batch_size = batch_size
    
    def batch_generate(self, texts, max_length=512):
        """批量生成"""
        results = []
        
        for i in range(0, len(texts), self.batch_size):
            batch_texts = texts[i:i + self.batch_size]
            
            # 批量编码
            inputs = self.tokenizer(
                batch_texts,
                return_tensors="pt",
                padding=True,
                truncation=True,
                max_length=max_length
            )
            
            # 批量生成
            with torch.no_grad():
                outputs = self.model.generate(
                    **inputs,
                    max_length=max_length,
                    do_sample=True,
                    temperature=0.7,
                    pad_token_id=self.tokenizer.eos_token_id
                )
            
            # 解码结果
            batch_results = self.tokenizer.batch_decode(
                outputs, 
                skip_special_tokens=True
            )
            
            results.extend(batch_results)
        
        return results

KV缓存优化

def optimize_kv_cache(model):
    """优化KV缓存以加速生成"""
    
    # 启用KV缓存
    model.config.use_cache = True
    
    # 设置缓存策略
    def generate_with_cache(input_ids, max_length=512):
        past_key_values = None
        generated_ids = input_ids.clone()
        
        for _ in range(max_length - input_ids.size(1)):
            with torch.no_grad():
                outputs = model(
                    input_ids=generated_ids[:, -1:] if past_key_values else generated_ids,
                    past_key_values=past_key_values,
                    use_cache=True
                )
            
            # 获取下一个token
            next_token_logits = outputs.logits[:, -1, :]
            next_token = torch.argmax(next_token_logits, dim=-1, keepdim=True)
            
            # 更新生成序列和缓存
            generated_ids = torch.cat([generated_ids, next_token], dim=1)
            past_key_values = outputs.past_key_values
            
            # 检查结束条件
            if next_token.item() == tokenizer.eos_token_id:
                break
        
        return generated_ids
    
    return generate_with_cache

多模型部署策略

多Adapter热插拔

class MultiAdapterManager:
    def __init__(self, base_model_path):
        self.base_model = AutoModelForCausalLM.from_pretrained(base_model_path)
        self.adapters = {}
        self.current_adapter = None
    
    def load_adapter(self, adapter_name, adapter_path):
        """加载适配器"""
        from peft import PeftModel
        
        adapter_model = PeftModel.from_pretrained(
            self.base_model, 
            adapter_path,
            adapter_name=adapter_name
        )
        
        self.adapters[adapter_name] = adapter_model
        print(f"Adapter '{adapter_name}' loaded successfully")
    
    def switch_adapter(self, adapter_name):
        """切换适配器"""
        if adapter_name in self.adapters:
            self.current_adapter = adapter_name
            print(f"Switched to adapter: {adapter_name}")
        else:
            raise ValueError(f"Adapter '{adapter_name}' not found")
    
    def inference(self, text, adapter_name=None):
        """使用指定适配器进行推理"""
        if adapter_name:
            self.switch_adapter(adapter_name)
        
        if self.current_adapter is None:
            model = self.base_model
        else:
            model = self.adapters[self.current_adapter]
        
        inputs = tokenizer(text, return_tensors="pt")
        with torch.no_grad():
            outputs = model.generate(**inputs, max_length=512)
        
        return tokenizer.decode(outputs[0], skip_special_tokens=True)
 
# 使用示例
manager = MultiAdapterManager("base_model_path")
manager.load_adapter("customer_service", "adapters/customer_service")
manager.load_adapter("technical_support", "adapters/technical_support")
 
# 根据任务切换适配器
response1 = manager.inference("用户投诉问题", adapter_name="customer_service")
response2 = manager.inference("技术问题咨询", adapter_name="technical_support")

路由策略

class TaskRouter:
    def __init__(self, classifier_model, adapter_manager):
        self.classifier = classifier_model
        self.adapter_manager = adapter_manager
        self.task_mapping = {
            0: "customer_service",
            1: "technical_support", 
            2: "sales_inquiry",
            3: "general_chat"
        }
    
    def route_and_inference(self, text):
        """根据文本内容路由到合适的适配器"""
        
        # 分类预测
        inputs = tokenizer(text, return_tensors="pt", truncation=True)
        with torch.no_grad():
            logits = self.classifier(**inputs).logits
            predicted_task = torch.argmax(logits, dim=-1).item()
        
        # 获取对应的适配器
        adapter_name = self.task_mapping.get(predicted_task, "general_chat")
        
        # 使用对应适配器进行推理
        response = self.adapter_manager.inference(text, adapter_name)
        
        return {
            "response": response,
            "task_type": adapter_name,
            "confidence": torch.softmax(logits, dim=-1).max().item()
        }

服务化部署

FastAPI服务

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
 
app = FastAPI(title="LLM微调模型服务")
 
# 请求模型
class GenerationRequest(BaseModel):
    text: str
    max_length: int = 512
    temperature: float = 0.7
    adapter_name: str = None
 
class GenerationResponse(BaseModel):
    response: str
    task_type: str = None
    confidence: float = None
 
# 全局模型管理器
model_manager = None
 
@app.on_event("startup")
async def startup_event():
    """服务启动时加载模型"""
    global model_manager
    model_manager = MultiAdapterManager("./base_model")
    model_manager.load_adapter("default", "./adapters/default")
 
@app.post("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest):
    """文本生成接口"""
    try:
        if request.adapter_name:
            response = model_manager.inference(
                request.text, 
                adapter_name=request.adapter_name
            )
            return GenerationResponse(
                response=response,
                task_type=request.adapter_name
            )
        else:
            # 使用路由器自动选择适配器
            result = router.route_and_inference(request.text)
            return GenerationResponse(**result)
            
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
 
@app.get("/health")
async def health_check():
    """健康检查"""
    return {"status": "healthy", "model_loaded": model_manager is not None}
 
@app.get("/adapters")
async def list_adapters():
    """列出可用的适配器"""
    return {"adapters": list(model_manager.adapters.keys())}
 
if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

Docker部署

# Dockerfile
FROM nvidia/cuda:11.8-devel-ubuntu20.04
 
# 设置工作目录
WORKDIR /app
 
# 安装Python和依赖
RUN apt-get update && apt-get install -y \
    python3 \
    python3-pip \
    && rm -rf /var/lib/apt/lists/*
 
# 复制requirements文件
COPY requirements.txt .
 
# 安装Python依赖
RUN pip3 install --no-cache-dir -r requirements.txt
 
# 复制应用代码
COPY . .
 
# 暴露端口
EXPOSE 8000
 
# 启动命令
CMD ["python3", "app.py"]
# docker-compose.yml
version: '3.8'
 
services:
  llm-service:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - ./models:/app/models
      - ./adapters:/app/adapters
    environment:
      - CUDA_VISIBLE_DEVICES=0
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

性能监控与优化

推理性能监控

import time
import psutil
import GPUtil
from functools import wraps
 
class PerformanceMonitor:
    def __init__(self):
        self.metrics = {
            "request_count": 0,
            "total_latency": 0,
            "avg_latency": 0,
            "throughput": 0,
            "gpu_utilization": [],
            "memory_usage": []
        }
    
    def monitor_inference(self, func):
        """推理性能监控装饰器"""
        @wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            
            # GPU使用率监控
            gpus = GPUtil.getGPUs()
            if gpus:
                gpu_util = gpus[0].load * 100
                gpu_memory = gpus[0].memoryUtil * 100
                self.metrics["gpu_utilization"].append(gpu_util)
                self.metrics["memory_usage"].append(gpu_memory)
            
            # 执行推理
            result = func(*args, **kwargs)
            
            # 计算延迟
            latency = time.time() - start_time
            self.metrics["request_count"] += 1
            self.metrics["total_latency"] += latency
            self.metrics["avg_latency"] = self.metrics["total_latency"] / self.metrics["request_count"]
            
            return result
        
        return wrapper
    
    def get_metrics(self):
        """获取性能指标"""
        return {
            **self.metrics,
            "avg_gpu_utilization": sum(self.metrics["gpu_utilization"]) / len(self.metrics["gpu_utilization"]) if self.metrics["gpu_utilization"] else 0,
            "avg_memory_usage": sum(self.metrics["memory_usage"]) / len(self.metrics["memory_usage"]) if self.metrics["memory_usage"] else 0
        }
 
# 使用示例
monitor = PerformanceMonitor()
 
@monitor.monitor_inference
def inference_with_monitoring(text):
    return model.generate(text)

自动扩缩容

class AutoScaler:
    def __init__(self, min_replicas=1, max_replicas=5, target_latency=1.0):
        self.min_replicas = min_replicas
        self.max_replicas = max_replicas
        self.target_latency = target_latency
        self.current_replicas = min_replicas
        self.latency_history = []
    
    def should_scale_up(self, current_latency):
        """判断是否需要扩容"""
        self.latency_history.append(current_latency)
        
        # 保持最近10次的延迟记录
        if len(self.latency_history) > 10:
            self.latency_history.pop(0)
        
        # 如果平均延迟超过目标延迟的1.5倍,则扩容
        avg_latency = sum(self.latency_history) / len(self.latency_history)
        
        return (avg_latency > self.target_latency * 1.5 and 
                self.current_replicas < self.max_replicas)
    
    def should_scale_down(self, current_latency):
        """判断是否需要缩容"""
        if len(self.latency_history) < 5:
            return False
        
        avg_latency = sum(self.latency_history) / len(self.latency_history)
        
        return (avg_latency < self.target_latency * 0.7 and 
                self.current_replicas > self.min_replicas)
    
    def scale(self, current_latency):
        """执行扩缩容决策"""
        if self.should_scale_up(current_latency):
            self.current_replicas += 1
            return f"Scale up to {self.current_replicas} replicas"
        elif self.should_scale_down(current_latency):
            self.current_replicas -= 1
            return f"Scale down to {self.current_replicas} replicas"
        
        return "No scaling needed"

边缘部署优化

模型压缩

def compress_model_for_edge(model, compression_ratio=0.5):
    """为边缘部署压缩模型"""
    
    # 1. 知识蒸馏
    def knowledge_distillation(teacher_model, student_model, train_data):
        """知识蒸馏压缩"""
        import torch.nn.functional as F
        
        teacher_model.eval()
        student_model.train()
        
        for batch in train_data:
            # 教师模型输出
            with torch.no_grad():
                teacher_outputs = teacher_model(**batch)
                teacher_logits = teacher_outputs.logits
            
            # 学生模型输出
            student_outputs = student_model(**batch)
            student_logits = student_outputs.logits
            
            # 蒸馏损失
            distill_loss = F.kl_div(
                F.log_softmax(student_logits / temperature, dim=-1),
                F.softmax(teacher_logits / temperature, dim=-1),
                reduction='batchmean'
            ) * (temperature ** 2)
            
            # 反向传播
            distill_loss.backward()
    
    # 2. 权重剪枝
    def prune_weights(model, pruning_ratio=0.3):
        """权重剪枝"""
        import torch.nn.utils.prune as prune
        
        for name, module in model.named_modules():
            if isinstance(module, torch.nn.Linear):
                prune.l1_unstructured(module, name='weight', amount=pruning_ratio)
                prune.remove(module, 'weight')
    
    # 应用压缩技术
    prune_weights(model, compression_ratio)
    
    return model

ONNX转换

def convert_to_onnx(model, tokenizer, output_path):
    """转换为ONNX格式以提升推理速度"""
    import torch.onnx
    
    # 准备示例输入
    sample_text = "这是一个示例输入"
    inputs = tokenizer(sample_text, return_tensors="pt")
    
    # 设置模型为评估模式
    model.eval()
    
    # 导出ONNX模型
    torch.onnx.export(
        model,
        tuple(inputs.values()),
        output_path,
        input_names=['input_ids', 'attention_mask'],
        output_names=['logits'],
        dynamic_axes={
            'input_ids': {0: 'batch_size', 1: 'sequence'},
            'attention_mask': {0: 'batch_size', 1: 'sequence'},
            'logits': {0: 'batch_size', 1: 'sequence'}
        },
        opset_version=11
    )
    
    print(f"ONNX模型已保存到: {output_path}")
 
# ONNX推理
def onnx_inference(onnx_model_path, text):
    """使用ONNX模型进行推理"""
    import onnxruntime as ort
    
    # 加载ONNX模型
    session = ort.InferenceSession(onnx_model_path)
    
    # 准备输入
    inputs = tokenizer(text, return_tensors="np")
    
    # 推理
    outputs = session.run(None, {
        'input_ids': inputs['input_ids'],
        'attention_mask': inputs['attention_mask']
    })
    
    return outputs[0]

相关概念