为什么你的 RAG 不好用?

很多同学在做知识库时,都踩过这样的坑:

  • 用户问"怎么退款",检索出来的却是"支付流程",语义相似但答非所问
  • 文档被切成一堆碎片,检索到的片段没头没尾,LLM 看了一脸懵
  • 用户说"那个报销咋搞",这种口语化的问题,向量搜索根本匹配不上
  • 产品名、编号这类精确关键词,纯向量搜索反而不如直接 Ctrl+F

这就是 RAG 1.0 的典型痛点:朴素的"切块 → 向量化 → Top-K 检索 → 喂给大模型"流程,在企业真实场景下往往不够用。

今天这篇文章,我将带你从零搭建一个 RAG 2.0 知识库,用《红楼梦》全本作为知识库,实战演示四大核心技术,让 RAG 从"能用"变成"好用"。

RAG 1.0 vs 2.0:到底差在哪?

先来直观对比一下:

对比项 RAG 1.0 RAG 2.0
切块策略 固定字数切块,一刀切 父子窗口:小块检索,大块生成
检索方式 纯向量搜索 向量 + BM25 混合搜索
排序方式 直接取 Top-K 多路召回 + Reranker 重排序
问题处理 用户原话直接检索 Query Rewrite 问题改写
答案质量 经常答非所问、遗漏关键信息 准确率高,有据可查

一句话总结:RAG 1.0 是"大海捞针",RAG 2.0 是"精准制导"。

RAG 2.0 四大核心技术

父子窗口切块(Parent-Child Chunking)

传统切块的矛盾:块切太大,检索不精准;块切太小,上下文丢失。

父子窗口的解法

┌──────────────── 父块(1000字)────────────────┐
│  完整的上下文,用于送给 LLM 生成答案            │
│  ┌── 子块1(300字) ──┐ ┌── 子块2(300字) ──┐   │
│  │ 用于检索,粒度细  │ │ 用于检索,粒度细  │   │
│  └──────────────────┘ └──────────────────┘   │
└──────────────────────────────────────────────┘
  • 子块(Child):300 字,粒度细,检索更精准
  • 父块(Parent):1000 字,保留完整上下文,送给 LLM 生成答案
  • 检索时命中子块,但把对应的父块送给大模型

Query Rewrite(问题改写)

用户的问题往往是口语化的、模糊的。直接拿去检索,效果不好。

解法:用 LLM 先把问题改写一下:

用户原话:"那个报销咋搞?"
        ↓ LLM 改写
检索查询:"企业差旅报销流程是什么?"

改写后的查询关键词更明确,向量匹配效果大幅提升。

Hybrid Search(混合搜索)

两种搜索方式各有所长:

  • 向量搜索:理解语义,“退款” ≈ “退货” ≈ “退钱”
  • BM25 关键词搜索:精确匹配,产品名、人名、编号一个不漏

混合搜索 = 向量搜索 + BM25,两路召回,取长补短。

Reranker(重排序)

多路召回的结果需要统一排序。Reranker 做的事情:

  1. 把向量搜索和 BM25 的分数归一化到 0-1
  2. 加权融合:最终分数 = 0.6 × 向量分 + 0.4 × BM25分
  3. 按融合分数重新排序,取 Top-K

这样既考虑了语义相关性,又兼顾了关键词精确度。

环境准备

技术栈

组件 选型 说明
向量数据库 ChromaDB 纯 Python,零依赖部署
Embedding OpenRouter API 免费嵌入模型
关键词搜索 BM25(rank_bm25) 经典的关键词检索算法
LLM mimo-v2.5-pro 生成答案
分词 jieba 中文分词

安装依赖

pip install chromadb requests rank-bm25 jieba openai numpy

配置文件

# config.py

# ChromaDB 向量数据库
CHROMA_COLLECTION = "hongloumeng"

# Embedding 模型 (OpenRouter)
EMBEDDING_API_URL = "https://openrouter.ai/api/v1/embeddings"
EMBEDDING_API_KEY = "你的key"
EMBEDDING_MODEL = "nvidia/llama-nemotron-embed-vl-1b-v2:free"
EMBEDDING_DIM = 2048

# LLM 模型
LLM_BASE_URL = "https://你的LLM地址/v1"
LLM_API_KEY = "你的key"
LLM_MODEL = "mimo-v2.5-pro"

# 文档切块参数
PARENT_CHUNK_SIZE = 1000       # 父块大小
PARENT_CHUNK_OVERLAP = 100     # 父块重叠
CHILD_CHUNK_SIZE = 300         # 子块大小
CHILD_CHUNK_OVERLAP = 50       # 子块重叠

# 检索参数
INITIAL_TOP_K = 20             # 初次检索取 Top-K
FINAL_TOP_K = 5                # Reranker 后取 Top-K

代码实现

下面开始核心代码实现,每个模块独立解耦,方便理解。

文档加载与父子切块

# document_loader.py
import re
import hashlib


def load_and_clean(file_path: str) -> str:
    """加载文本文件并清洗"""
    with open(file_path, "r", encoding="gbk", errors="ignore") as f:
        text = f.read()
    text = re.sub(r"\n{3,}", "\n\n", text)
    return text.strip()


def chunk_text(text: str, chunk_size: int, overlap: int) -> list[dict]:
    """按固定大小切块,返回带元信息的块列表"""
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunk_content = text[start:end]
        chunk_id = hashlib.md5(chunk_content.encode()).hexdigest()[:12]
        chunks.append({
            "id": chunk_id,
            "content": chunk_content,
            "start_pos": start,
            "end_pos": end,
        })
        start += chunk_size - overlap
    return chunks


def build_parent_child_chunks(file_path: str) -> tuple[list[dict], list[dict]]:
    """构建父子块结构"""
    text = load_and_clean(file_path)

    # 先切父块
    parents = chunk_text(text, 1000, 100)

    # 对每个父块内部切子块
    children = []
    for parent in parents:
        parent_children = chunk_text(parent["content"], 300, 50)
        for i, child in enumerate(parent_children):
            child["parent_id"] = parent["id"]
            child["chunk_index"] = i
            children.append(child)
        parent["child_ids"] = [c["id"] for c in parent_children]

    return parents, children

运行效果:

父块: 947 个
子块: 3788 个

Embedding 向量化

# embedding.py
import requests


def get_embedding(text: str) -> list[float]:
    """获取单条文本的向量"""
    response = requests.post(
        "https://openrouter.ai/api/v1/embeddings",
        headers={
            "Authorization": "Bearer 你的key",
            "Content-Type": "application/json",
        },
        json={"model": "nvidia/llama-nemotron-embed-vl-1b-v2:free", "input": text},
        timeout=30,
    )
    response.raise_for_status()
    return response.json()["data"][0]["embedding"]


def get_embeddings_batch(texts: list[str], batch_size: int = 32) -> list[list[float]]:
    """批量获取文本向量"""
    all_embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        response = requests.post(
            "https://openrouter.ai/api/v1/embeddings",
            headers={"Authorization": "Bearer 你的key", "Content-Type": "application/json"},
            json={"model": "nvidia/llama-nemotron-embed-vl-1b-v2:free", "input": batch},
            timeout=60,
        )
        response.raise_for_status()
        sorted_items = sorted(response.json()["data"], key=lambda x: x["index"])
        for item in sorted_items:
            all_embeddings.append(item["embedding"])
    return all_embeddings

ChromaDB 向量存储

# vector_store.py
import chromadb

client = chromadb.PersistentClient(path="./chroma_db")


def get_or_create_collection(drop_existing: bool = False):
    if drop_existing:
        try:
            client.delete_collection("hongloumeng")
        except Exception:
            pass
    return client.get_or_create_collection(
        name="hongloumeng",
        metadata={"hnsw:space": "cosine"},  # 余弦相似度
    )


def insert_chunks(chunks: list[dict], embeddings: list[list[float]]) -> int:
    """批量插入子块"""
    collection = get_or_create_collection()
    ids = [c["id"] for c in chunks]
    documents = [c["content"][:4000] for c in chunks]
    metadatas = [{"parent_id": c.get("parent_id", ""), "chunk_index": c.get("chunk_index", 0)} for c in chunks]

    for i in range(0, len(ids), 5000):
        end = min(i + 5000, len(ids))
        collection.add(ids=ids[i:end], embeddings=embeddings[i:end],
                       documents=documents[i:end], metadatas=metadatas[i:end])
    return len(ids)


def search_vectors(query_embedding: list[float], top_k: int = 20) -> list[dict]:
    """向量相似度搜索"""
    collection = get_or_create_collection()
    results = collection.query(query_embeddings=[query_embedding], n_results=top_k,
                               include=["documents", "metadatas", "distances"])
    hits = []
    for i in range(len(results["ids"][0])):
        hits.append({
            "id": results["ids"][0][i],
            "content": results["documents"][0][i],
            "parent_id": results["metadatas"][0][i].get("parent_id", ""),
            "distance": 1 - results["distances"][0][i],  # 转为相似度
        })
    return hits

BM25 关键词搜索

# bm25_search.py
import jieba
from rank_bm25 import BM25Okapi


class BM25Index:
    def __init__(self):
        self.corpus = []
        self.bm25 = None

    def build_index(self, chunks: list[dict]):
        self.corpus = chunks
        tokenized = [list(jieba.cut(c["content"])) for c in chunks]
        self.bm25 = BM25Okapi(tokenized)

    def search(self, query: str, top_k: int = 20) -> list[dict]:
        scores = self.bm25.get_scores(list(jieba.cut(query)))
        top_indices = scores.argsort()[-top_k:][::-1]
        results = []
        for idx in top_indices:
            if scores[idx] > 0:
                chunk = self.corpus[idx].copy()
                chunk["bm25_score"] = float(scores[idx])
                results.append(chunk)
        return results

Reranker 重排序

# reranker.py
def normalize_scores(items: list[dict], score_key: str) -> list[dict]:
    scores = [item.get(score_key, 0) for item in items]
    min_s, max_s = min(scores), max(scores)
    spread = max_s - min_s if max_s != min_s else 1.0
    for item in items:
        item[f"norm_{score_key}"] = (item.get(score_key, 0) - min_s) / spread
    return items


def score_fusion_rerank(vector_hits, bm25_hits, vector_weight=0.6, bm25_weight=0.4, top_k=5):
    """分数融合重排:归一化 → 加权合并 → 重新排序"""
    vector_hits = normalize_scores(vector_hits, "distance")
    bm25_hits = normalize_scores(bm25_hits, "bm25_score")

    merged = {}
    for hit in vector_hits:
        merged[hit["id"]] = {"id": hit["id"], "content": hit["content"],
                             "parent_id": hit.get("parent_id", ""),
                             "vector_score": hit.get("norm_distance", 0), "bm25_score": 0.0}
    for hit in bm25_hits:
        if hit["id"] in merged:
            merged[hit["id"]]["bm25_score"] = hit.get("norm_bm25_score", 0)
        else:
            merged[hit["id"]] = {"id": hit["id"], "content": hit["content"],
                                 "parent_id": hit.get("parent_id", ""),
                                 "vector_score": 0.0, "bm25_score": hit.get("norm_bm25_score", 0)}

    for item in merged.values():
        item["fusion_score"] = vector_weight * item["vector_score"] + bm25_weight * item["bm25_score"]

    return sorted(merged.values(), key=lambda x: x["fusion_score"], reverse=True)[:top_k]

Query Rewrite 问题改写

# query_rewrite.py
import requests


def rewrite_query(question: str) -> str:
    """用 LLM 把口语化问题改写为更适合检索的形式"""
    prompt = f"""你是一个检索优化专家。请将用户的口语化问题改写为更适合文档检索的形式,
保持核心语义不变,使关键词更明确。

用户问题:{question}

只返回改写后的查询文本,不要返回其他内容。"""

    response = requests.post(
        "你的LLM_API/chat/completions",
        headers={"Authorization": "Bearer 你的key", "Content-Type": "application/json"},
        json={"model": "mimo-v2.5-pro", "messages": [{"role": "user", "content": prompt}], "temperature": 0.3},
        timeout=30,
    )
    return response.json()["choices"][0]["message"]["content"].strip()

完整 RAG 管道

把以上所有模块串联起来:

# rag_pipeline.py
class RAGPipeline:
    def query(self, question: str) -> dict:
        # Step 1: Query Rewrite
        search_query = rewrite_query(question)

        # Step 2: Hybrid Search
        query_vec = get_embedding(search_query)
        vector_hits = search_vectors(query_vec, top_k=20)
        bm25_hits = self.bm25_index.search(search_query, top_k=20)

        # Step 3: Reranker
        reranked = score_fusion_rerank(vector_hits, bm25_hits, top_k=5)

        # Step 4: 获取父块上下文 & LLM 生成答案
        context_parts = []
        seen_parents = set()
        for item in reranked:
            parent_id = item.get("parent_id", "")
            if parent_id and parent_id in self.parent_map and parent_id not in seen_parents:
                seen_parents.add(parent_id)
                context_parts.append(self.parent_map[parent_id]["content"])

        context = "\n\n---\n\n".join(context_parts)
        answer = self._generate_answer(question, context)
        return {"question": question, "rewritten": search_query, "answer": answer}

实战效果演示

以《红楼梦》全本为知识库,来看看 RAG 2.0 的实际效果。

测试问题:贾宝玉最后结局是什么?

Query Rewrite 效果

原始问题:贾宝玉最后结局是什么?
改写结果:《红楼梦》中贾宝玉的最终结局是什么?

Hybrid Search 效果

向量命中: 20 条, BM25命中: 20 条

最终答案

根据文档内容,贾宝玉的最终结局是:在尘缘了结后,跟随一僧一道离开了凡间,返回其本源所在。

具体过程:

  1. 宝玉生病,魂魄恍惚间见到送玉的和尚,跟随其前往一处荒野
  2. 在毘陵驿,贾政于雪夜见到了打扮成僧人样的宝玉,宝玉拜别父亲
  3. 一僧一道夹住宝玉说道:"俗缘已毕,还不快走!"三人飘然登岸而去

原文引用:“我所居兮,青埂之峰。我所游兮,鸿蒙太空。谁与我游兮,吾谁与从?渺渺茫茫兮,归彼大荒。”

可以看到

  • 答案准确,没有幻觉
  • 引用了原文,有据可查
  • 理解了"最后结局"的语义,检索到了相关段落

使用方式

# 构建索引(首次运行)
python main.py build

# 单次提问
python main.py ask "林黛玉怎么死的?"

# 交互式问答
python main.py chat

# 对比 RAG 1.0 vs 2.0 效果
python main.py compare

总结

RAG 2.0 不是什么黑科技,而是一套工程优化组合拳

技术 解决的问题 效果
父子窗口切块 切太大不精准,切太小丢上下文 检索精度 + 上下文完整性
Query Rewrite 口语化问题匹配差 召回率提升
Hybrid Search 纯向量漏掉精确匹配 语义 + 关键词双保险
Reranker 多路结果无法统一排序 排序质量提升

建议:不要一上来就追求花哨的方案,先把这四个基础技术做好,RAG 的效果就能上一个大台阶。

Logo

作为“人工智能6S店”的官方数字引擎,为AI开发者与企业提供一个覆盖软硬件全栈、一站式门户。

更多推荐