Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

向量库知识智能体 (DB Agent RAG)


本教程演示如何将 files 目录下的 txt 文件按空行分段,写入 Chroma 内存向量库,并通过 ReAct Agent 检索回答。

数据准备提示

  1. 请在当前目录下创建一个 files 文件夹。

  2. 在其中放入 .txt 文件。

  3. 注意:若是 Excel 数据,需额外写脚本将列表头转为以下格式(空行分割),问答效果更准:

问题:<问题内容>
答案:<答案内容>

(此处为空行)

1. 导入依赖与加载环境

确保目录下存在 .env 文件,包含 DASHSCOPE_API_KEYDASHSCOPE_BASE_URL

import os
import re
from pathlib import Path
from typing import Iterable

from dotenv import load_dotenv
from openai import OpenAI
from langchain_openai import ChatOpenAI
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_chroma import Chroma
import chromadb
from chromadb.config import Settings
from langchain.agents import create_agent
from langchain.tools import tool

# 加载模型配置
_ = load_dotenv()

2. 配置大模型与客户端

# 配置大模型 (LangChain)
llm = ChatOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url=os.getenv("DASHSCOPE_BASE_URL"),
    model="qwen3-coder-plus",
    temperature=0,
)

# 创建 OpenAI 客户端 (用于 Embeddings)
client = OpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url=os.getenv("DASHSCOPE_BASE_URL"),
)

3. 定义 DashScope Embeddings

自定义 Embeddings 类以适配 LangChain 接口。

class DashScopeEmbeddings(Embeddings):
    """DashScope 兼容的 Embeddings 封装。"""

    def __init__(self, model: str = "text-embedding-v4", dimensions: int = 1024):
        self.model = model
        self.dimensions = dimensions

    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        vectors: list[list[float]] = []
        for i in range(0, len(texts), 10):
            chunk = texts[i : i + 10]
            response = client.embeddings.create(
                model=self.model,
                input=chunk,
                dimensions=self.dimensions,
            )
            vectors.extend([item.embedding for item in response.data])
        return vectors

    def embed_query(self, text: str) -> list[float]:
        response = client.embeddings.create(
            model=self.model,
            input=[text],
            dimensions=self.dimensions,
        )
        return response.data[0].embedding

4. 文档加载逻辑

读取目录下的 txt 文件,并使用空行进行切分。

def load_txt_documents(data_dir: Path) -> list[Document]:
    """读取目录下的 txt 文件并按空行分割为 Document。"""

    def split_on_blank(text: str) -> Iterable[str]:
        for block in re.split(r"\n\s*\n", text):
            cleaned = block.strip()
            if cleaned:
                yield cleaned

    documents: list[Document] = []
    # 检查目录是否存在
    if not data_dir.exists():
        print(f"Warning: 目录 {data_dir} 不存在")
        return []

    for path in sorted(data_dir.glob("*.txt")):
        content = path.read_text(encoding="utf-8")
        for idx, part in enumerate(split_on_blank(content)):
            documents.append(
                Document(
                    page_content=part,
                    metadata={"source": path.name, "chunk_id": idx},
                )
            )
    if not documents:
        print(f"目录 {data_dir} 下未找到 txt 文档")
    return documents

5. 构建向量数据库

连接到 Chroma 服务并写入数据。

注意:此处配置连接到远程 Chroma 服务器 (120.24.168.78:7020)。如果需要本地运行,请注释掉 client_settings 参数。

def build_vector_store(data_dir: Path | None = None) -> Chroma:
    """读取 txt 文件并构建内存向量库。"""
    # 在 Notebook 中,默认指向当前目录下的 files
    target_dir = data_dir or (Path.cwd() / "files")
    documents = load_txt_documents(target_dir)

    print(f"成功加载 {len(documents)} 个文档到向量库")

    embeddings = DashScopeEmbeddings()
    
    # 配置 Chroma 设置
    chroma_settings = Settings(
        chroma_server_host="120.24.168.78",
        chroma_server_http_port=7020
    )
    
    vector_store = Chroma(
        collection_name="test_collection",
        embedding_function=embeddings,
        persist_directory=None,
        client_settings=chroma_settings
    )
    
    # 清空集合(可选,防止重复数据堆积)
    try:
        existing_ids = vector_store.get()["ids"]
        if existing_ids:
            vector_store.delete(ids=existing_ids)
    except Exception as e:
        print(f"连接或清理数据库时提示: {e}")
    
    # 添加文档
    if documents:
        _ = vector_store.add_documents(documents)
    
    return vector_store

6. 创建 ReAct Agent

定义检索工具并初始化 Agent。

def create_react_agent_wrapper(vector_store: Chroma):
    """基于给定向量库创建带检索工具的 ReAct Agent。"""

    @tool(response_format="content_and_artifact")
    def retrieve_context(query: str):
        """基于向量库检索与问题最相关的文本片段。"""
        retrieved = vector_store.similarity_search(query, k=3)
        serialized = "\n\n".join(
            f"[{doc.metadata['source']}#{doc.metadata['chunk_id']}] {doc.page_content}"
            for doc in retrieved
        )
        return serialized, retrieved

    return create_agent(
        llm,
        tools=[retrieve_context],
        system_prompt=(
            "你可以使用检索工具获得参考资料。回答时结合检索到的内容,"
            "如有必要可以在答案中简单引用来源标识。"
        ),
    )

7. 运行演示

执行以下代码块开始提问。

# 1. 准备并构建向量库
vector_store = build_vector_store()
print('嵌入完成' + '\n')

# 2. 创建 Agent
agent = create_react_agent_wrapper(vector_store)

# 3. 提问
query = "公司的考勤方式是什么?"
print(f"Query: {query}\n" + "-" * 20)

input_payload = {"messages": [{"role": "user", "content": query}]}

for event in agent.stream(input_payload, stream_mode="values"):
    event["messages"][-1].pretty_print()
成功加载 8 个文档到向量库
嵌入完成

Query: 公司的考勤方式是什么?
--------------------
================================ Human Message =================================

公司的考勤方式是什么?
================================== Ai Message ==================================

为了找到公司的考勤方式,我将采取以下步骤进行搜索:

1. 首先确定公司是否有明确的考勤制度及相关规定。
2. 然后查找具体的考勤方式,例如打卡、签到等。

现在我开始第一步,通过检索来确定公司的考勤制度和相关规定。
Tool Calls:
  retrieve_context (call_9f3fed5b0751468f8a2ffd1f)
 Call ID: call_9f3fed5b0751468f8a2ffd1f
  Args:
    query: 公司的考勤制度和相关规定
================================= Tool Message =================================
Name: retrieve_context

[question.txt#0] 问题:公司的考勤方式是什么?
答案:公司实行弹性打卡。
权限: IT组、运维组
关键词:考勤方式

[question.txt#1] 问题:公司的考勤方式是什么?
答案:公司实行固定世界打卡。
权限: 运营组
关键词:考勤方式

[question.txt#5] 问题:休息不固定的人员考勤打卡有什么要求?
答案:休息不固定的人员在当月休息天数在正常规定休息天数范围内,不需提交调休或请假申请,上班照常打2次卡(上/下班各一次)。
权限:
关键词:弹性休息、考勤打卡要求
================================== Ai Message ==================================

公司的考勤方式是弹性打卡。