模型推理部署
概述
模型推理部署是微调流程的最后环节,涉及模型优化、推理加速、服务部署等多个方面。高效的部署策略能显著提升用户体验和系统性能。
推理优化策略
LoRA模型合并
def merge_lora_weights(model, save_path):
"""合并LoRA权重到基础模型"""
from peft import PeftModel
# 合并LoRA权重
merged_model = model.merge_and_unload()
# 保存合并后的模型
merged_model.save_pretrained(save_path)
print(f"LoRA权重已合并并保存到: {save_path}")
return merged_model
# 使用示例
def deploy_merged_model():
"""部署合并后的模型"""
from transformers import AutoModelForCausalLM, AutoTokenizer
# 加载合并后的模型
model = AutoModelForCausalLM.from_pretrained("./merged_model")
tokenizer = AutoTokenizer.from_pretrained("./merged_model")
# 推理速度更快,无需额外的LoRA计算
def inference(text):
inputs = tokenizer(text, return_tensors="pt")
with torch.no_grad():
outputs = model.generate(**inputs, max_length=512)
return tokenizer.decode(outputs[0], skip_special_tokens=True)
return inference
模型量化
def quantize_model(model, quantization_type="int8"):
"""模型量化以减少显存占用"""
if quantization_type == "int8":
# 8-bit量化
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0,
llm_int8_has_fp16_weight=False,
)
elif quantization_type == "int4":
# 4-bit量化
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4"
)
# 加载量化模型
quantized_model = AutoModelForCausalLM.from_pretrained(
model_path,
quantization_config=quantization_config,
device_map="auto"
)
return quantized_model
# 动态量化
def dynamic_quantization(model):
"""动态量化(推理时量化)"""
import torch.quantization as quant
# 设置量化配置
model.qconfig = quant.get_default_qconfig('fbgemm')
# 准备量化
quant.prepare(model, inplace=True)
# 量化模型
quant.convert(model, inplace=True)
return model
推理加速技术
批量推理
class BatchInference:
def __init__(self, model, tokenizer, batch_size=8):
self.model = model
self.tokenizer = tokenizer
self.batch_size = batch_size
def batch_generate(self, texts, max_length=512):
"""批量生成"""
results = []
for i in range(0, len(texts), self.batch_size):
batch_texts = texts[i:i + self.batch_size]
# 批量编码
inputs = self.tokenizer(
batch_texts,
return_tensors="pt",
padding=True,
truncation=True,
max_length=max_length
)
# 批量生成
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_length=max_length,
do_sample=True,
temperature=0.7,
pad_token_id=self.tokenizer.eos_token_id
)
# 解码结果
batch_results = self.tokenizer.batch_decode(
outputs,
skip_special_tokens=True
)
results.extend(batch_results)
return results
KV缓存优化
def optimize_kv_cache(model):
"""优化KV缓存以加速生成"""
# 启用KV缓存
model.config.use_cache = True
# 设置缓存策略
def generate_with_cache(input_ids, max_length=512):
past_key_values = None
generated_ids = input_ids.clone()
for _ in range(max_length - input_ids.size(1)):
with torch.no_grad():
outputs = model(
input_ids=generated_ids[:, -1:] if past_key_values else generated_ids,
past_key_values=past_key_values,
use_cache=True
)
# 获取下一个token
next_token_logits = outputs.logits[:, -1, :]
next_token = torch.argmax(next_token_logits, dim=-1, keepdim=True)
# 更新生成序列和缓存
generated_ids = torch.cat([generated_ids, next_token], dim=1)
past_key_values = outputs.past_key_values
# 检查结束条件
if next_token.item() == tokenizer.eos_token_id:
break
return generated_ids
return generate_with_cache
多模型部署策略
多Adapter热插拔
class MultiAdapterManager:
def __init__(self, base_model_path):
self.base_model = AutoModelForCausalLM.from_pretrained(base_model_path)
self.adapters = {}
self.current_adapter = None
def load_adapter(self, adapter_name, adapter_path):
"""加载适配器"""
from peft import PeftModel
adapter_model = PeftModel.from_pretrained(
self.base_model,
adapter_path,
adapter_name=adapter_name
)
self.adapters[adapter_name] = adapter_model
print(f"Adapter '{adapter_name}' loaded successfully")
def switch_adapter(self, adapter_name):
"""切换适配器"""
if adapter_name in self.adapters:
self.current_adapter = adapter_name
print(f"Switched to adapter: {adapter_name}")
else:
raise ValueError(f"Adapter '{adapter_name}' not found")
def inference(self, text, adapter_name=None):
"""使用指定适配器进行推理"""
if adapter_name:
self.switch_adapter(adapter_name)
if self.current_adapter is None:
model = self.base_model
else:
model = self.adapters[self.current_adapter]
inputs = tokenizer(text, return_tensors="pt")
with torch.no_grad():
outputs = model.generate(**inputs, max_length=512)
return tokenizer.decode(outputs[0], skip_special_tokens=True)
# 使用示例
manager = MultiAdapterManager("base_model_path")
manager.load_adapter("customer_service", "adapters/customer_service")
manager.load_adapter("technical_support", "adapters/technical_support")
# 根据任务切换适配器
response1 = manager.inference("用户投诉问题", adapter_name="customer_service")
response2 = manager.inference("技术问题咨询", adapter_name="technical_support")
路由策略
class TaskRouter:
def __init__(self, classifier_model, adapter_manager):
self.classifier = classifier_model
self.adapter_manager = adapter_manager
self.task_mapping = {
0: "customer_service",
1: "technical_support",
2: "sales_inquiry",
3: "general_chat"
}
def route_and_inference(self, text):
"""根据文本内容路由到合适的适配器"""
# 分类预测
inputs = tokenizer(text, return_tensors="pt", truncation=True)
with torch.no_grad():
logits = self.classifier(**inputs).logits
predicted_task = torch.argmax(logits, dim=-1).item()
# 获取对应的适配器
adapter_name = self.task_mapping.get(predicted_task, "general_chat")
# 使用对应适配器进行推理
response = self.adapter_manager.inference(text, adapter_name)
return {
"response": response,
"task_type": adapter_name,
"confidence": torch.softmax(logits, dim=-1).max().item()
}
服务化部署
FastAPI服务
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
app = FastAPI(title="LLM微调模型服务")
# 请求模型
class GenerationRequest(BaseModel):
text: str
max_length: int = 512
temperature: float = 0.7
adapter_name: str = None
class GenerationResponse(BaseModel):
response: str
task_type: str = None
confidence: float = None
# 全局模型管理器
model_manager = None
@app.on_event("startup")
async def startup_event():
"""服务启动时加载模型"""
global model_manager
model_manager = MultiAdapterManager("./base_model")
model_manager.load_adapter("default", "./adapters/default")
@app.post("/generate", response_model=GenerationResponse)
async def generate_text(request: GenerationRequest):
"""文本生成接口"""
try:
if request.adapter_name:
response = model_manager.inference(
request.text,
adapter_name=request.adapter_name
)
return GenerationResponse(
response=response,
task_type=request.adapter_name
)
else:
# 使用路由器自动选择适配器
result = router.route_and_inference(request.text)
return GenerationResponse(**result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy", "model_loaded": model_manager is not None}
@app.get("/adapters")
async def list_adapters():
"""列出可用的适配器"""
return {"adapters": list(model_manager.adapters.keys())}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
Docker部署
# Dockerfile
FROM nvidia/cuda:11.8-devel-ubuntu20.04
# 设置工作目录
WORKDIR /app
# 安装Python和依赖
RUN apt-get update && apt-get install -y \
python3 \
python3-pip \
&& rm -rf /var/lib/apt/lists/*
# 复制requirements文件
COPY requirements.txt .
# 安装Python依赖
RUN pip3 install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["python3", "app.py"]
# docker-compose.yml
version: '3.8'
services:
llm-service:
build: .
ports:
- "8000:8000"
volumes:
- ./models:/app/models
- ./adapters:/app/adapters
environment:
- CUDA_VISIBLE_DEVICES=0
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
性能监控与优化
推理性能监控
import time
import psutil
import GPUtil
from functools import wraps
class PerformanceMonitor:
def __init__(self):
self.metrics = {
"request_count": 0,
"total_latency": 0,
"avg_latency": 0,
"throughput": 0,
"gpu_utilization": [],
"memory_usage": []
}
def monitor_inference(self, func):
"""推理性能监控装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
# GPU使用率监控
gpus = GPUtil.getGPUs()
if gpus:
gpu_util = gpus[0].load * 100
gpu_memory = gpus[0].memoryUtil * 100
self.metrics["gpu_utilization"].append(gpu_util)
self.metrics["memory_usage"].append(gpu_memory)
# 执行推理
result = func(*args, **kwargs)
# 计算延迟
latency = time.time() - start_time
self.metrics["request_count"] += 1
self.metrics["total_latency"] += latency
self.metrics["avg_latency"] = self.metrics["total_latency"] / self.metrics["request_count"]
return result
return wrapper
def get_metrics(self):
"""获取性能指标"""
return {
**self.metrics,
"avg_gpu_utilization": sum(self.metrics["gpu_utilization"]) / len(self.metrics["gpu_utilization"]) if self.metrics["gpu_utilization"] else 0,
"avg_memory_usage": sum(self.metrics["memory_usage"]) / len(self.metrics["memory_usage"]) if self.metrics["memory_usage"] else 0
}
# 使用示例
monitor = PerformanceMonitor()
@monitor.monitor_inference
def inference_with_monitoring(text):
return model.generate(text)
自动扩缩容
class AutoScaler:
def __init__(self, min_replicas=1, max_replicas=5, target_latency=1.0):
self.min_replicas = min_replicas
self.max_replicas = max_replicas
self.target_latency = target_latency
self.current_replicas = min_replicas
self.latency_history = []
def should_scale_up(self, current_latency):
"""判断是否需要扩容"""
self.latency_history.append(current_latency)
# 保持最近10次的延迟记录
if len(self.latency_history) > 10:
self.latency_history.pop(0)
# 如果平均延迟超过目标延迟的1.5倍,则扩容
avg_latency = sum(self.latency_history) / len(self.latency_history)
return (avg_latency > self.target_latency * 1.5 and
self.current_replicas < self.max_replicas)
def should_scale_down(self, current_latency):
"""判断是否需要缩容"""
if len(self.latency_history) < 5:
return False
avg_latency = sum(self.latency_history) / len(self.latency_history)
return (avg_latency < self.target_latency * 0.7 and
self.current_replicas > self.min_replicas)
def scale(self, current_latency):
"""执行扩缩容决策"""
if self.should_scale_up(current_latency):
self.current_replicas += 1
return f"Scale up to {self.current_replicas} replicas"
elif self.should_scale_down(current_latency):
self.current_replicas -= 1
return f"Scale down to {self.current_replicas} replicas"
return "No scaling needed"
边缘部署优化
模型压缩
def compress_model_for_edge(model, compression_ratio=0.5):
"""为边缘部署压缩模型"""
# 1. 知识蒸馏
def knowledge_distillation(teacher_model, student_model, train_data):
"""知识蒸馏压缩"""
import torch.nn.functional as F
teacher_model.eval()
student_model.train()
for batch in train_data:
# 教师模型输出
with torch.no_grad():
teacher_outputs = teacher_model(**batch)
teacher_logits = teacher_outputs.logits
# 学生模型输出
student_outputs = student_model(**batch)
student_logits = student_outputs.logits
# 蒸馏损失
distill_loss = F.kl_div(
F.log_softmax(student_logits / temperature, dim=-1),
F.softmax(teacher_logits / temperature, dim=-1),
reduction='batchmean'
) * (temperature ** 2)
# 反向传播
distill_loss.backward()
# 2. 权重剪枝
def prune_weights(model, pruning_ratio=0.3):
"""权重剪枝"""
import torch.nn.utils.prune as prune
for name, module in model.named_modules():
if isinstance(module, torch.nn.Linear):
prune.l1_unstructured(module, name='weight', amount=pruning_ratio)
prune.remove(module, 'weight')
# 应用压缩技术
prune_weights(model, compression_ratio)
return model
ONNX转换
def convert_to_onnx(model, tokenizer, output_path):
"""转换为ONNX格式以提升推理速度"""
import torch.onnx
# 准备示例输入
sample_text = "这是一个示例输入"
inputs = tokenizer(sample_text, return_tensors="pt")
# 设置模型为评估模式
model.eval()
# 导出ONNX模型
torch.onnx.export(
model,
tuple(inputs.values()),
output_path,
input_names=['input_ids', 'attention_mask'],
output_names=['logits'],
dynamic_axes={
'input_ids': {0: 'batch_size', 1: 'sequence'},
'attention_mask': {0: 'batch_size', 1: 'sequence'},
'logits': {0: 'batch_size', 1: 'sequence'}
},
opset_version=11
)
print(f"ONNX模型已保存到: {output_path}")
# ONNX推理
def onnx_inference(onnx_model_path, text):
"""使用ONNX模型进行推理"""
import onnxruntime as ort
# 加载ONNX模型
session = ort.InferenceSession(onnx_model_path)
# 准备输入
inputs = tokenizer(text, return_tensors="np")
# 推理
outputs = session.run(None, {
'input_ids': inputs['input_ids'],
'attention_mask': inputs['attention_mask']
})
return outputs[0]