Skip to content

MLOps 实践:从实验到生产的全流程

MLOps(Machine Learning Operations)是将机器学习模型系统化、自动化的工程实践。

ML 生命周期

问题定义 → 数据收集 → 实验开发 → 模型训练 → 模型评估 → 部署上线 → 监控维护 → 再训练

核心原则

1. 可复现性(Reproducibility)

任何实验结果必须能被复现。

python
# 固定一切随机种子
import random, numpy as np, torch

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

set_seed(42)

记录:代码版本、环境、超参数、数据集版本。

2. 自动化(Automation)

减少人工操作,用代码和脚本代替。

yaml
# GitHub Actions 示例
name: Train Model
on: [push]
jobs:
  train:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Setup Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.10'
      - name: Install dependencies
        run: pip install -r requirements.txt
      - name: Train model
        run: python train.py --config configs/exp1.yaml
      - name: Upload artifacts
        uses: actions/upload-artifact@v3
        with:
          name: model-weights
          path: outputs/model.pt

3. 可观测性(Observability)

部署后能监控模型性能、数据漂移、异常。

python
# Prometheus + Grafana 监控
# 指标:请求量、延迟、错误率、预测分布、内存使用

4. 版本控制

一切皆可版本化

  • 代码(Git)
  • 数据(DVC,LakeFS)
  • 模型(MLflow, ModelDB)
  • 配置(YAML + Git)
  • 环境(Docker, Conda)

工具栈推荐

实验跟踪(Experiment Tracking)

记录每次实验的配置、指标、 artifact。

工具描述适合场景
MLflow开源,功能全面自建,灵活定制
Weights & Biases (W&B)云服务 + 开源团队协作,可视化强
TensorBoardTensorFlow 原生TensorFlow 项目
Neptune商业版企业级需求

MLflow 示例:

python
import mlflow
import mlflow.pytorch

mlflow.set_experiment("llm-finetune")

with mlflow.start_run():
    # 记录超参数
    mlflow.log_params({
        "lr": 1e-5,
        "batch_size": 32,
        "model": "Llama-2-7b"
    })

    # 训练模型(省略)
    model = train(...)

    # 记录指标
    mlflow.log_metrics({
        "val_loss": 0.123,
        "val_acc": 0.95
    })

    # 保存模型
    mlflow.pytorch.log_model(model, "model")

查看 UI:运行 mlflow ui 命令启动 Web 界面


模型注册与部署(Model Registry & Serving)

工具类型特点
MLflow Models本地/云简单,与跟踪集成
BentoML本地容器性能好,支持多种框架
TensorFlow Serving生产GRPC/REST,Kubernetes
TorchServe生产PyTorch 官方,多模型管理
vLLM推理优化高性能,OpenAI API 兼容

BentoML 示例:

python
# service.py
import bentoml
from transformers import AutoModel, AutoTokenizer

model = AutoModel.from_pretrained("my-model").eval()
tokenizer = AutoTokenizer.from_pretrained("my-model")

@bentoml.service(
    resources={"cpu": "2"},
    traffic={"timeout": 10},
    runners=[bentoml.Runner(lambda x: model(**x), ...)]
)
class LLMService:
    @bentoml.api
    def generate(self, prompt: str) -> str:
        inputs = tokenizer(prompt, return_tensors="pt")
        output = model.generate(**inputs, max_new_tokens=100)
        return tokenizer.decode(output[0])

打包:bentoml build 部署:bentoml serve LLMService:latest


数据管理与版本控制

DVC(Data Version Control):

bash
# 初始化
dvc init

# 添加数据文件
dvc add data/raw/dataset.csv
git add data/raw/dataset.csv.dvc

# 推送到远程存储(S3, GCS, SSH)
dvc remote add -d myremote s3://mybucket/dvcstorage
dvc push

流程

  1. dvc add 生成 .dvc 指针文件(Git 版本化)
  2. 实际数据存到远程存储
  3. dvc pull 还原数据

流水线编排(Pipeline Orchestration)

工具描述
Airflow成熟,任务依赖强大,学习曲线陡
Prefect现代,Pythonic,易上手
Kedro数据科学专用,pipeline 结构化
MetaflowNetflix 内部,实验+生产一体化
GitHub ActionsCI/CD,简单流水线

Prefect 示例:

python
from prefect import flow, task

@task
def load_data(path):
    return pd.read_csv(path)

@task
def train_model(data):
    model = train(data)
    return model

@flow
def ml_pipeline():
    data = load_data("data/train.csv")
    model = train_model(data)

if __name__ == "__main__":
    ml_pipeline()

运行:prefect deployment build 部署:prefect deployment apply


典型架构

LLM 应用部署架构

┌─────────┐    ┌────────────┐    ┌─────────────┐    ┌─────────┐
│  用户   │───▶│  API网关   │───▶│  路由层      │───▶│ 模型推理│
└─────────┘    └────────────┘    └─────────────┘    └─────────┘


                                 ┌─────────────────┐
                                 │ 向量数据库      │
                                 │ (RAG 检索)      │
                                 └─────────────────┘


                                 ┌─────────────────┐
                                 │ 缓存 Redis      │
                                 └─────────────────┘

监控:Prometheus + Grafana(QPS、延迟、错误率、成本)
日志:ELK Stack(Elasticsearch, Logstash, Kibana)
追踪:Jaeger / OpenTelemetry(请求链路追踪)

CI/CD 流程

开发者 Push → 运行测试(单元、集成) → 自动构建 Docker 镜像 → 推送到 Registry → 通知部署系统 → 金丝雀发布 → 监控指标 → 全量发布(或回滚)

GitHub Actions 示例(模型训练 CI):

yaml
name: Model Training CI
on:
  push:
    paths:
      - 'src/**'
      - 'data/**'

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Run unit tests
        run: pytest tests/ --cov=src

  train:
    needs: test
    runs-on: gpu-runner  # 自托管 GPU runner
    steps:
      - uses: actions/checkout@v3
      - name: Pull data
        run: dvc pull
      - name: Train model
        run: python train.py --config configs/prod.yaml
      - name: Evaluate
        run: python eval.py --model outputs/model.pt
      - name: Register model
        if: success()
        run: |
          mlflow models serve -m outputs/model.pt --no-conda &

关键实践

1. 模型版本管理

每次训练生成唯一版本标签:

bash
# MLflow 注册表
mlflow models register \
  -m runs:/<run_id>/model \
  -n "llm-finetune" \
  -v "v1.2.0"

元数据

  • 训练数据哈希
  • 超参数
  • 性能指标
  • 负责人、日期

2. 模型回滚

A/B 测试失败 → 一键回滚:

bash
/mlflow/models/get-latest-versions?stage=Production
# 切换到上一版本

3. 渐进式发布

  • 金丝雀(Canary):1% 流量 → 新模型
  • 监控:错误率、延迟是否正常
  • 全量:无异常则 10% → 50% → 100%

4. 自动化监控告警

python
# Prometheus Rule(异常检测)
- alert: ModelPerformanceDegradation
  expr: increase(api_errors_total[5m]) > 100
  for: 10m
  labels:
    severity: critical
  annotations:
    summary: "模型错误率上升"

数据漂移检测

python
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

# 比较当前数据与训练数据分布
report = Report(metrics=[DataDriftPreset()])
report.run(current_data=current, reference_data=train, column_mapping=...)
report.save_html("drift_report.html")

5. 自动化再训练

当监控指标低于阈值时自动触发:

监测 → 触发重训 → 数据收集 → 训练 → 评估 → 注册新版本 → A/B 测试 → 全量

Airflow DAG 示例:

python
from airflow import DAG
from airflow.operators.python import PythonOperator

dag = DAG("retrain_pipeline", schedule="@weekly")

def collect_data():
    # 收集新用户反馈
    pass

def train():
    # 增量训练
    pass

def evaluate():
    # 验证新模型优于当前版本
    pass

def deploy():
    # 注册并发布新版本
    pass

collect_task = PythonOperator(task_id="collect", python_callable=collect_data, dag=dag)
train_task = PythonOperator(task_id="train", python_callable=train, dag=dag)
evaluate_task = PythonOperator(task_id="evaluate", python_callable=evaluate, dag=dag)
deploy_task = PythonOperator(task_id="deploy", python_callable=deploy, dag=dag)

collect_task >> train_task >> evaluate_task >> deploy_task

成本控制

1. 资源优化

  • 自动扩缩容:根据负载调整实例数(K8s HPA)
  • Spot Instance:训练使用抢占式实例(节省 60-90%)
  • 模型压缩上线:量化、蒸馏降低推理成本

2. 监控成本

python
# 记录每次推理成本
cost_per_token = 0.0001
total_cost = sum(tokens) * cost_per_token
alert_if(total_cost > daily_budget)

合规与治理

1. 模型卡片(Model Card)

记录模型信息(Hugging Face Model Card):

markdown
## Model Details
- **Model Type**: Llama-2-7b fine-tuned on medical QA
- **Training Data**: MIMIC-III (de-identified)
- **Intended Use**: Medical question answering (non-critical)
- **Limitations**: Not for diagnosis, may hallucinate

## Metrics
- Accuracy: 88.5%
- F1: 0.86
- Toxicity Rate: <0.1%

## Ethical Considerations
- Trained on de-identified data
- Reviewed by medical professionals
- Includes refusal mechanism for harmful queries

2. 偏见检测

python
from transformers import pipeline
from evaluate import load

bias_metric = load("bias", module_type="measurement")

# 测试不同人群描述的公平性
demographics = ["man", "woman", "person of color"]
for demo in demographics:
    prompts = [f"A {demo} doctor...", f"A {demo} nurse..."]
    outputs = model.generate(prompts)
    score = bias_metric.compute(predictions=outputs, references=...)

3. 可解释性

SHAP / LIME 解释模型决策:

python
import shap

explainer = shap.Explainer(model, tokenizer)
shap_values = explainer(["解释为什么模型拒绝回答..."])
shap.plots.text(shap_values[0])

自托管 vs 云服务

场景推荐
小团队,快速验证云服务(W&B, Azure ML, GCP Vertex AI)
数据敏感,需本地部署MLflow + DVC + Airflow 自建
大规模生产混合:云训练 + 自建推理(降低成本)

总结

  • 实验跟踪:MLflow / W&B 记录一切
  • 数据版本:DVC 管理数据集
  • 流水线:Airflow / Prefect 自动化
  • 模型服务:BentoML / vLLM 高性能推理
  • 监控告警:Prometheus + 自定义指标
  • CI/CD:GitHub Actions 自动化训练部署

MLOps 是 AI 产品化的桥梁,没有成熟的 MLOps,模型无法规模化落地。