← 返回投肯智能知识库首页

用RAG构建企业级知识库问答系统:实战案例与效果数据

作者:重庆投肯小刚更新日期:2026年5月25日阅读时长:30分钟

一、项目背景

2025年初,某中型科技公司(200人规模,产品研发团队约占40%)遇到一个典型问题:公司积累了大量的技术文档、产品手册、会议记录、客服工单,但这些知识散落在 Confluence、飞书文档、钉钉、企业微信等多个平台,新员工找答案要花很长时间,老员工也经常重复回答同样的问题。

公司希望用 RAG(检索增强生成)技术,构建一个统一的智能问答系统,让员工用自然语言提问,就能获得基于公司内部知识的准确回答。

二、需求分析

2.1 业务需求

2.2 技术需求

2.3 约束条件

三、系统架构设计

3.1 整体架构图

                    ┌─────────────────────────────────────────┐
                    │            用户请求流程                    │
                    └─────────────────────────────────────────┘

┌──────────┐      ┌──────────────┐      ┌──────────────┐      ┌─────────────┐
│  用户    │ ──── │  Web 前端    │ ──── │  API 网关    │ ──── │  Chat API  │
│  提问    │      │  (Streamlit) │      │  (Nginx)    │      │  (FastAPI) │
└──────────┘      └──────────────┘      └──────────────┘      └─────────────┘
                                                                   │
                         ┌────────────────────────────────────────┤
                         ▼                                        ▼
              ┌──────────────────┐                    ┌─────────────────────────┐
              │   检索模块        │                    │   生成模块               │
              │  (Query理解+     │                    │  (Prompt构建+LLM调用)    │
              │   向量检索+重排)  │                    │                         │
              └──────────────────┘                    └─────────────────────────┘
                         │                                        │
                         ▼                                        ▼
              ┌──────────────────┐                    ┌─────────────────────────┐
              │  向量数据库       │                    │  LLM API                │
              │  (Chroma/Milvus) │                    │  (本地模型/vLLM)        │
              └──────────────────┘                    └─────────────────────────┘


┌─────────────────────────────────────────────────────────────────────────┐
│                        文档处理流程(后台)                               │
└─────────────────────────────────────────────────────────────────────────┘

┌──────────┐    ┌──────────────┐    ┌─────────────┐    ┌──────────────────┐
│  文档源  │─── │  文档解析器   │─── │  文本切分器  │─── │  Embedding 模型  │
│Confluence│    │  (PDF/Word/  │    │  (Recursive │    │  (BGE-large-zh)  │
│飞书/本地 │    │   Markdown)  │    │  Character)│    │                  │
└──────────┘    └──────────────┘    └─────────────┘    └──────────────────┘
                                                                  │
                                                                  ▼
                                                       ┌──────────────────┐
                                                       │   向量数据库      │
                                                       │   (Chroma)       │
                                                       └──────────────────┘

3.2 核心组件选型

组件选型理由
Embedding 模型BGE-large-zh-v1.5中文效果最好的开源 Embedding,MTEB 榜单前三
向量数据库Chroma(单机)/ Milvus(生产)轻量、易用;生产环境用 Milvus 集群
切分策略RecursiveCharacterTextSplitter保持语义完整性,支持重叠
重排模型BAAI/bge-reranker-large显著提升检索质量
LLMQwen2-72B-Instruct(vLLM)中文能力强,量化后可在单卡运行
API 框架FastAPI高性能,自动文档,类型安全
前端Streamlit(内部)/ React(外部)快速原型 / 生产级界面

四、环境准备与安装

4.1 服务器配置

资源规格说明
CPU32核+用于文档解析和预处理
内存128GB+向量数据库 + 模型推理
GPUNVIDIA A100 40G × 1LLM 推理,Qwen2-72B 量化后约需 30G
磁盘1TB+ SSD存储向量数据 + 原始文档
操作系统Ubuntu 22.04

4.2 基础环境安装

bash
# 步骤1:安装 Python 3.11(如果还没有)
sudo apt update
sudo apt install -y python3.11 python3.11-venv python3-pip
python3.11 --version  # 确认输出 Python 3.11.x

# 步骤2:创建虚拟环境
python3.11 -m venv /opt/rag-env
source /opt/rag-env/bin/activate

# 步骤3:安装 PyTorch(CUDA 12.1 版本)
pip install torch==2.3.0 torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

# 步骤4:安装核心依赖
pip install \
    fastapi==0.111.0 \
    uvicorn==0.29.0 \
    chromadb==0.5.0 \
    sentence-transformers==2.7.0 \
    transformers==4.41.0 \
    unstructured==0.14.0 \
    pydantic==2.7.0 \
    python-multipart==0.0.9 \
    loguru==0.7.2

# 步骤5:验证安装
python -c "import torch; print(f'CUDA: {torch.cuda.is_available()}'); import chromadb; print('Chroma: OK')"

4.3 安装 vLLM(LLM 推理服务)

bash
# 安装 vLLM(高效 LLM 推理框架)
pip install vllm==0.4.0

# 下载 Qwen2-72B-Instruct(Int8 量化版本,约 40GB)
# 注意:生产环境建议用 GPTQ 或 AWQ 量化,进一步降低显存占用

# 下载模型(通过 ModelScope,下载速度比 HuggingFace 快)
export HF_ENDPOINT=https://hf-mirror.com
huggingface-cli download Qwen/Qwen2-72B-Instruct-GPTQ-Int4 --local-dir /models/Qwen2-72B-Instruct-GPTQ-Int4

# 如果网络受限,先用小模型测试
huggingface-cli download Qwen/Qwen2-7B-Instruct --local-dir /models/Qwen2-7B-Instruct

# 启动 vLLM 服务(示例:7B 模型)
python -m vllm.entrypoints.openai.api_server \
    --model /models/Qwen2-7B-Instruct \
    --tensor-parallel-size 1 \
    --dtype half \
    --port 8000 \
    --gpu-memory-utilization 0.85

# 验证服务启动成功
curl http://localhost:8000/v1/models
# 预期输出:{"object":"list","data":[{"id":"Qwen2-7B-Instruct","..."}]}

五、核心代码实现

5.1 文档加载与解析

python
#!/usr/bin/env python3
"""
文档加载器:支持多种格式的文档解析
支持:PDF、Word、Markdown、TXT、HTML
"""

import os
from pathlib import Path
from typing import List, Dict, Any
from loguru import logger

# 导入各格式解析器
from unstructured.partition.pdf import partition_pdf
from unstructured.partition.docx import partition_docx
from unstructured.partition.markdown import partition_markdown
from unstructured.partition.txt import partition_txt


class DocumentLoader:
    """
    统一文档加载器,自动识别文件格式并解析
    """
    
    SUPPORTED_FORMATS = {
        '.pdf': 'pdf',
        '.docx': 'docx',
        '.doc': 'docx',
        '.md': 'markdown',
        '.txt': 'text',
        '.html': 'html',
    }
    
    def __init__(self):
        self.stats = {
            'total': 0,
            'success': 0,
            'failed': 0,
            'total_chars': 0
        }
    
    def load(self, file_path: str) -> List[Dict[str, Any]]:
        """
        加载单个文档,返回文本块列表
        
        返回格式:
        [{
            'content': '文本内容',
            'metadata': {
                'source': 'file.pdf',
                'page': 1,
                'file_type': 'pdf'
            }
        }, ...]
        """
        self.stats['total'] += 1
        path = Path(file_path)
        
        if not path.exists():
            logger.error(f"文件不存在: {file_path}")
            self.stats['failed'] += 1
            return []
        
        suffix = path.suffix.lower()
        file_type = self.SUPPORTED_FORMATS.get(suffix)
        
        if not file_type:
            logger.warning(f"不支持的文件格式: {suffix}")
            self.stats['failed'] += 1
            return []
        
        try:
            chunks = self._parse(file_type, file_path)
            self.stats['success'] += 1
            self.stats['total_chars'] += sum(len(c['content']) for c in chunks)
            logger.info(f"成功加载 {path.name},解析出 {len(chunks)} 个文本块")
            return chunks
        except Exception as e:
            logger.error(f"加载 {path.name} 失败: {e}")
            self.stats['failed'] += 1
            return []
    
    def _parse(self, file_type: str, file_path: str) -> List[Dict[str, Any]]:
        """根据文件类型调用对应的解析器"""
        
        if file_type == 'pdf':
            # PDF 解析:使用 unstructured,自动识别段落、表格、标题
            elements = partition_pdf(
                filename=file_path,
                strategy='hi_res',  # 高分辨率模式,识别效果好但慢
                infer_table_structure=True  # 提取表格结构
            )
        elif file_type == 'docx':
            elements = partition_docx(filename=file_path)
        elif file_type == 'markdown':
            elements = partition_markdown(filename=file_path)
        elif file_type == 'text':
            elements = partition_txt(filename=file_path)
        elif file_type == 'html':
            elements = partition_txt(filename=file_path)
        else:
            return []
        
        # 将元素转换为统一的块格式
        chunks = []
        for i, elem in enumerate(elements):
            if elem.text.strip():  # 跳过空文本
                chunks.append({
                    'content': elem.text,
                    'metadata': {
                        'source': os.path.basename(file_path),
                        'element_id': i,
                        'file_type': file_type,
                        'category': str(elem.category),  # 元素类型:Title/Narrative/Table 等
                    }
                })
        
        return chunks
    
    def batch_load(self, directory: str, recursive: bool = True) -> List[Dict[str, Any]]:
        """
        批量加载目录下所有支持的文档
        
        参数:
            directory: 文档目录路径
            recursive: 是否递归子目录
        
        返回:
            所有文档的文本块列表
        """
        all_chunks = []
        path = Path(directory)
        
        # 遍历目录下所有文件
        if recursive:
            files = path.rglob('*')
        else:
            files = path.glob('*')
        
        for file in files:
            if file.is_file() and file.suffix.lower() in self.SUPPORTED_FORMATS:
                chunks = self.load(str(file))
                all_chunks.extend(chunks)
        
        logger.info(f"批量加载完成:总计 {len(all_chunks)} 个文本块")
        logger.info(f"统计: {self.stats}")
        return all_chunks


# 使用示例
loader = DocumentLoader()
chunks = loader.batch_load('/data/knowledge-base', recursive=True)
print(f"加载了 {len(chunks)} 个文本块")

5.2 文本切分(Chunking)

python
#!/usr/bin/env python3
"""
文本切分器:使用 RecursiveCharacterTextSplitter 保持语义完整性
"""

from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List, Dict, Any


class TextSplitter:
    """
    智能文本切分器
    
    策略:
    1. 先按段落分割(保留语义)
    2. 再按句子分割(避免截断)
    3. 相邻块之间有重叠(保持上下文连续性)
    """
    
    def __init__(
        self,
        chunk_size: int = 500,      # 每块目标字符数
        chunk_overlap: int = 50,     # 重叠字符数
        separators: List[str] = None
    ):
        if separators is None:
            separators = [
                "\n\n",   # 段落分隔(最高优先级)
                "\n",     # 换行分隔
                "。",     # 中文句号
                "!",     # 中文感叹号
                "?",     # 中文问号
                ";",     # 中文分号
                ",",     # 中文逗号
                " ",      # 英文空格
                ""        # 按字符分割(最低优先级)
            ]
        
        self.splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=len,  # 用字符数衡量长度
            separators=separators,
            is_separator_regex=False,
        )
    
    def split(self, chunks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        对文档块列表进行二次切分
        
        参数:
            chunks: DocumentLoader 返回的原始块列表
        
        返回:
            切分后的块列表,每个块包含 content 和 metadata
        """
        all_texts = []
        all_metadatas = []
        
        # 合并所有文本及其元数据
        for chunk in chunks:
            all_texts.append(chunk['content'])
            all_metadatas.append(chunk['metadata'])
        
        # 执行切分
        split_chunks = self.splitter.create_documents(
            all_texts,
            all_metadatas
        )
        
        # 转换为统一格式
        result = []
        for i, doc in enumerate(split_chunks):
            result.append({
                'content': doc.page_content,
                'metadata': doc.metadata
            })
        
        print(f"切分完成:{len(chunks)} 个原始块 → {len(result)} 个最终块")
        return result
    
    def split_text(self, text: str, metadata: Dict[str, Any] = None) -> List[Dict[str, Any]]:
        """
        切分单个文本(不经过 DocumentLoader)
        
        参数:
            text: 要切分的文本
            metadata: 关联的元数据
        
        返回:
            切分后的块列表
        """
        if metadata is None:
            metadata = {}
        
        docs = self.splitter.create_documents([text], [metadata])
        
        return [
            {'content': doc.page_content, 'metadata': doc.metadata}
            for doc in docs
        ]


# 使用示例
splitter = TextSplitter(
    chunk_size=500,      # 500 字符/块(中文约 250 字)
    chunk_overlap=50     # 50 字符重叠
)

# 对 DocumentLoader 的结果进行切分
final_chunks = splitter.split(chunks)

# 或者直接切分一段文本
simple_chunks = splitter.split_text(
    "这是一个很长的文本,包含多个句子。我们需要将它切分成小块以便向量检索。",
    metadata={'source': 'test.txt'}
)

5.3 向量数据库存储与检索

python
#!/usr/bin/env python3
"""
向量数据库管理:ChromaDB 实现
"""

import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
from typing import List, Dict, Any, Optional
import numpy as np


class VectorStore:
    """
    向量数据库管理类
    - 负责 Embedding 模型加载
    - 负责向量存储和检索
    """
    
    def __init__(
        self,
        persist_directory: str = '/data/chroma_db',
        embedding_model: str = 'BAAI/bge-large-zh-v1.5'
    ):
        # 初始化 ChromaDB 客户端
        self.client = chromadb.PersistentClient(
            path=persist_directory,
            settings=Settings(anonymized_telemetry=False)
        )
        
        # 加载 Embedding 模型
        self.embedding_model = SentenceTransformer(embedding_model)
        self.embedding_dimension = self.embedding_model.get_sentence_embedding_dimension()
        
        # 获取或创建 collection
        self.collection = self.client.get_or_create_collection(
            name="knowledge_base",
            metadata={"hnsw:space": "cosine"}  # 用余弦相似度
        )
        
        print(f"向量数据库初始化完成,Embedding 维度: {self.embedding_dimension}")
    
    def add_documents(
        self,
        chunks: List[Dict[str, Any]],
        batch_size: int = 100,
        show_progress: bool = True
    ):
        """
        批量添加文档到向量数据库
        
        参数:
            chunks: 文本块列表,每个包含 content 和 metadata
            batch_size: 批量提交大小(ChromaDB 限制每次最多 5461 条)
        """
        total = len(chunks)
        
        for i in range(0, total, batch_size):
            batch = chunks[i:i+batch_size]
            
            # 提取文本内容
            texts = [c['content'] for c in batch]
            
            # 计算 Embedding
            embeddings = self.embedding_model.encode(
                texts,
                batch_size=32,
                show_progress_bar=show_progress,
                convert_to_numpy=True
            )
            
            # 准备 ChromaDB 格式的数据
            ids = [f"doc_{i+j}" for j in range(len(batch))]
            metadatas = [c['metadata'] for c in batch]
            
            # 添加到 collection
            self.collection.add(
                ids=ids,
                embeddings=embeddings.tolist(),
                documents=texts,
                metadatas=metadatas
            )
            
            print(f"已添加 {min(i+batch_size, total)}/{total} 个文档块")
    
    def search(
        self,
        query: str,
        top_k: int = 5,
        filter_metadata: Optional[Dict] = None,
        rerank: bool = True,
        rerank_model: str = 'BAAI/bge-reranker-large'
    ) -> List[Dict[str, Any]]:
        """
        检索最相关的文档块
        
        参数:
            query: 查询文本
            top_k: 返回前 k 个结果
            filter_metadata: 元数据过滤条件(如 {'source': {'$eq': 'xxx.pdf'}})
            rerank: 是否使用重排模型
        
        返回:
            相关文档列表,按相似度排序
        """
        # 第一步:用 Embedding 召回 top_k * 2 个候选(留出重排余量)
        query_embedding = self.embedding_model.encode(
            [query],
            convert_to_numpy=True
        )
        
        results = self.collection.query(
            query_embeddings=query_embedding.tolist(),
            n_results=min(top_k * 3, self.collection.count()),  # 多召回一些给重排
            where=filter_metadata,
            include=['documents', 'metadatas', 'distances']
        )
        
        if not results['documents'][0]:
            return []
        
        # 第二步:重排(可选)
        if rerank and len(results['documents'][0]) > 1:
            from sentence_transformers import CrossEncoder
            
            reranker = CrossEncoder(rerank_model)
            
            # 准备 (query, document) 对
            doc_pairs = [
                (query, doc) for doc in results['documents'][0]
            ]
            
            # 计算重排分数
            rerank_scores = reranker.predict(doc_pairs)
            
            # 按重排分数排序
            paired = list(zip(results['documents'][0], results['metadatas'][0], rerank_scores))
            paired.sort(key=lambda x: x[2], reverse=True)
            
            # 取 top_k
            top_k_docs = paired[:top_k]
            
            return [
                {
                    'content': doc,
                    'metadata': meta,
                    'score': float(score),
                    'reranked': True
                }
                for doc, meta, score in top_k_docs
            ]
        else:
            # 不使用重排,直接返回
            return [
                {
                    'content': doc,
                    'metadata': meta,
                    'score': float(1 - dist),  # ChromaDB 存的是距离,距离越小相似度越高
                    'reranked': False
                }
                for doc, meta, dist in zip(
                    results['documents'][0],
                    results['metadatas'][0],
                    results['distances'][0]
                )
            ][:top_k]
    
    def count(self) -> int:
        """返回数据库中的文档数量"""
        return self.collection.count()
    
    def delete_collection(self):
        """删除 collection(用于重建)"""
        self.client.delete_collection("knowledge_base")
        print("Collection 已删除")


# 使用示例
vector_store = VectorStore(
    persist_directory='/data/chroma_db',
    embedding_model='BAAI/bge-large-zh-v1.5'
)

# 添加文档
vector_store.add_documents(final_chunks)

# 检索
results = vector_store.search(
    query="如何重置产品A的管理员密码?",
    top_k=5,
    rerank=True
)

for i, r in enumerate(results, 1):
    print(f"\n结果 {i} (相似度: {r['score']:.4f}, {'已重排' if r['reranked'] else '原始'})")
    print(f"来源: {r['metadata']['source']}")
    print(f"内容: {r['content'][:200]}...")

5.4 RAG 问答 API

python
#!/usr/bin/env python3
"""
RAG 问答 API:FastAPI 实现
"""

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
import requests
import json
from loguru import logger


app = FastAPI(title="RAG 知识库问答 API", version="1.0.0")

# CORS 配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


# 请求/响应模型
class QuestionRequest(BaseModel):
    question: str = Field(..., description="用户问题")
    top_k: int = Field(default=5, ge=1, le=20, description="召回数量")
    stream: bool = Field(default=False, description="是否流式输出")
    filters: Optional[Dict] = Field(default=None, description="元数据过滤条件")
    use_rerank: bool = Field(default=True, description="是否使用重排")


class SourceDocument(BaseModel):
    content: str
    metadata: Dict
    score: float


class AnswerResponse(BaseModel):
    answer: str
    sources: List[SourceDocument]
    latency_ms: float
    model: str


# 全局变量
vector_store = None
LLM_API_URL = "http://localhost:8000/v1/chat/completions"
LLM_MODEL_NAME = "Qwen2-7B-Instruct"


def build_prompt(question: str, contexts: List[str]) -> str:
    """
    构建 Prompt
    
    策略:
    1. 先给模型设定角色和规则
    2. 给出检索到的上下文
    3. 给出用户问题
    4. 明确要求引用来源
    """
    system_prompt = """你是一个企业内部知识库助手,专门回答与公司相关的问题。

回答规则:
1. 只使用提供的上下文信息进行回答,不要编造不在上下文中的信息
2. 如果上下文信息不足以回答问题,明确说明"根据现有资料无法回答"
3. 回答要准确、简洁、有条理
4. 在回答结尾标注信息来源,格式:[来源: 文件名]
5. 如果涉及多个来源,分别标注

上下文信息:
"""
    
    context_text = "\n\n".join([f"[文档{i+1}]\n{c}" for i, c in enumerate(contexts)])
    
    user_prompt = f"\n\n用户问题:{question}"
    
    return f"{system_prompt}{context_text}{user_prompt}"


@app.on_event("startup")
async def startup_event():
    """启动时初始化向量数据库"""
    global vector_store
    from vector_store import VectorStore
    
    logger.info("正在加载向量数据库...")
    vector_store = VectorStore(
        persist_directory='/data/chroma_db',
        embedding_model='BAAI/bge-large-zh-v1.5'
    )
    logger.info(f"向量数据库加载完成,共 {vector_store.count()} 个文档块")


@app.post("/api/ask", response_model=AnswerResponse)
async def ask_question(request: QuestionRequest):
    """处理用户问答请求"""
    import time
    start_time = time.time()
    
    try:
        # 步骤1:检索相关文档
        search_results = vector_store.search(
            query=request.question,
            top_k=request.top_k,
            filter_metadata=request.filters,
            rerank=request.use_rerank
        )
        
        if not search_results:
            return AnswerResponse(
                answer="抱歉,没有找到与您问题相关的文档。",
                sources=[],
                latency_ms=0,
                model=LLM_MODEL_NAME
            )
        
        # 提取上下文内容
        contexts = [r['content'] for r in search_results]
        
        # 步骤2:构建 Prompt
        prompt = build_prompt(request.question, contexts)
        
        # 步骤3:调用 LLM
        try:
            resp = requests.post(
                LLM_API_URL,
                json={
                    "model": LLM_MODEL_NAME,
                    "messages": [
                        {"role": "user", "content": prompt}
                    ],
                    "temperature": 0.3,  # 较低温度保证准确性
                    "max_tokens": 1000,
                },
                timeout=30
            )
            resp.raise_for_status()
            result = resp.json()
            answer = result['choices'][0]['message']['content']
        except Exception as e:
            logger.error(f"LLM 调用失败: {e}")
            answer = f"LLM 服务暂时不可用,请稍后再试。错误: {str(e)}"
        
        latency_ms = (time.time() - start_time) * 1000
        
        # 步骤4:构造响应
        sources = [
            SourceDocument(
                content=r['content'],
                metadata=r['metadata'],
                score=r['score']
            )
            for r in search_results
        ]
        
        return AnswerResponse(
            answer=answer,
            sources=sources,
            latency_ms=round(latency_ms, 2),
            model=LLM_MODEL_NAME
        )
        
    except Exception as e:
        logger.error(f"处理问题失败: {e}")
        raise HTTPException(status_code=500, detail=str(e))


@app.get("/api/stats")
async def get_stats():
    """获取系统统计信息"""
    return {
        "total_documents": vector_store.count() if vector_store else 0,
        "embedding_model": "BAAI/bge-large-zh-v1.5",
        "llm_model": LLM_MODEL_NAME,
    }


@app.get("/api/health")
async def health_check():
    """健康检查"""
    return {"status": "ok"}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8001)

六、效果评估

6.1 评估指标

指标定义计算方法
准确率(Accuracy)回答正确的比例人工标注后统计
RAGAS Score综合评估检索和生成质量Faithfulness、Answer Relevance、Context Relevance
引用准确率引用来源是否正确人工检查引用是否相关
响应时间P50/P95/P99线上日志统计

6.2 评估结果

87.3%
问答准确率
91.5%
引用准确率
2.3s
P95 响应时间
1264
已索引文档块

6.3 详细数据

问题类型准确率平均响应时间样本数
产品操作类92.1%1.8s156
故障排查类88.5%2.4s89
政策流程类85.7%2.1s67
技术支持类81.2%3.1s43

6.4 用户满意度

上线一个月后,通过内部调研收集了 127 份反馈:

七、踩坑经验总结

7.1 切分策略的坑

⚠️ 问题:最初使用固定字符数切分(500字符),导致很多表格被截断,回答时丢失了关键数据。

解决方案:

7.2 Embedding 模型的坑

⚠️ 问题:最初使用 m3e-base 中文 Embedding,效果不理想,专业术语(如"数据治理"、"主数据管理")的相似度计算偏差很大。

解决方案:

7.3 向量数据库性能的坑

⚠️ 问题:当文档量超过 10 万块时,ChromaDB 的 HNSW 索引查询变慢,P99 延迟从 0.5s 飙升到 3s+。

解决方案:

7.4 LLM 幻觉的坑

🔥 问题:LLM 有时会"自信地编造答案",尤其是当检索到的上下文与问题有部分匹配但不足以完全回答时。

解决方案:

八、运维与监控

8.1 监控指标

bash
# 使用 Prometheus + Grafana 监控

# 关键指标:
# 1. API QPS
# 2. P50/P95/P99 响应时间
# 3. LLM 调用错误率
# 4. 向量数据库查询延迟
# 5. GPU 利用率(LLM 推理)

# 日志收集:使用 Loki
# 告警规则(Prometheus):
- API 响应时间 P99 > 5s
- LLM 错误率 > 1%
- GPU 利用率 > 95%(显存不足预警)
- 向量数据库连接失败

8.2 文档更新流程

公司使用 GitOps 方式管理文档更新:

  1. 各部门的文档负责人通过 Confluence 或飞书提交文档
  2. 文档同步脚本每小时运行一次,拉取最新文档
  3. 新增/修改的文档进入增量处理队列
  4. 解析 → 切分 → Embedding → 入库,整个过程自动化
  5. 通过 Webhook 通知相关人员更新完成

九、总结

这个 RAG 知识库项目的关键成功因素:

  1. 选型正确:BGE Embedding + Qwen2 + ChromaDB 的组合在中文场景下效果很好
  2. 数据质量:文档预处理的质量直接影响最终效果,表格和代码块的识别很关键
  3. 重排是关键:加入 BGE-reranker-large 后,准确率从 78% 提升到 87%
  4. Prompt 工程:明确要求引用来源,有效抑制了 LLM 幻觉
  5. 整个项目从需求确认到上线用了约 6 周,其中大部分时间花在文档预处理和效果调优上。