本教程演示如何将 files 目录下的 txt 文件按空行分段,写入 Chroma 内存向量库,并通过 ReAct Agent 检索回答。
数据准备提示¶
请在当前目录下创建一个
files文件夹。在其中放入
.txt文件。注意:若是 Excel 数据,需额外写脚本将列表头转为以下格式(空行分割),问答效果更准:
问题:<问题内容>
答案:<答案内容>
(此处为空行)1. 导入依赖与加载环境¶
确保目录下存在 .env 文件,包含 DASHSCOPE_API_KEY 和 DASHSCOPE_BASE_URL。
import os
import re
from pathlib import Path
from typing import Iterable
from dotenv import load_dotenv
from openai import OpenAI
from langchain_openai import ChatOpenAI
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_chroma import Chroma
import chromadb
from chromadb.config import Settings
from langchain.agents import create_agent
from langchain.tools import tool
# 加载模型配置
_ = load_dotenv()2. 配置大模型与客户端¶
# 配置大模型 (LangChain)
llm = ChatOpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_BASE_URL"),
model="qwen3-coder-plus",
temperature=0,
)
# 创建 OpenAI 客户端 (用于 Embeddings)
client = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_BASE_URL"),
)3. 定义 DashScope Embeddings¶
自定义 Embeddings 类以适配 LangChain 接口。
class DashScopeEmbeddings(Embeddings):
"""DashScope 兼容的 Embeddings 封装。"""
def __init__(self, model: str = "text-embedding-v4", dimensions: int = 1024):
self.model = model
self.dimensions = dimensions
def embed_documents(self, texts: list[str]) -> list[list[float]]:
vectors: list[list[float]] = []
for i in range(0, len(texts), 10):
chunk = texts[i : i + 10]
response = client.embeddings.create(
model=self.model,
input=chunk,
dimensions=self.dimensions,
)
vectors.extend([item.embedding for item in response.data])
return vectors
def embed_query(self, text: str) -> list[float]:
response = client.embeddings.create(
model=self.model,
input=[text],
dimensions=self.dimensions,
)
return response.data[0].embedding4. 文档加载逻辑¶
读取目录下的 txt 文件,并使用空行进行切分。
def load_txt_documents(data_dir: Path) -> list[Document]:
"""读取目录下的 txt 文件并按空行分割为 Document。"""
def split_on_blank(text: str) -> Iterable[str]:
for block in re.split(r"\n\s*\n", text):
cleaned = block.strip()
if cleaned:
yield cleaned
documents: list[Document] = []
# 检查目录是否存在
if not data_dir.exists():
print(f"Warning: 目录 {data_dir} 不存在")
return []
for path in sorted(data_dir.glob("*.txt")):
content = path.read_text(encoding="utf-8")
for idx, part in enumerate(split_on_blank(content)):
documents.append(
Document(
page_content=part,
metadata={"source": path.name, "chunk_id": idx},
)
)
if not documents:
print(f"目录 {data_dir} 下未找到 txt 文档")
return documents5. 构建向量数据库¶
连接到 Chroma 服务并写入数据。
注意:此处配置连接到远程 Chroma 服务器 (
120.24.168.78:7020)。如果需要本地运行,请注释掉client_settings参数。
def build_vector_store(data_dir: Path | None = None) -> Chroma:
"""读取 txt 文件并构建内存向量库。"""
# 在 Notebook 中,默认指向当前目录下的 files
target_dir = data_dir or (Path.cwd() / "files")
documents = load_txt_documents(target_dir)
print(f"成功加载 {len(documents)} 个文档到向量库")
embeddings = DashScopeEmbeddings()
# 配置 Chroma 设置
chroma_settings = Settings(
chroma_server_host="120.24.168.78",
chroma_server_http_port=7020
)
vector_store = Chroma(
collection_name="test_collection",
embedding_function=embeddings,
persist_directory=None,
client_settings=chroma_settings
)
# 清空集合(可选,防止重复数据堆积)
try:
existing_ids = vector_store.get()["ids"]
if existing_ids:
vector_store.delete(ids=existing_ids)
except Exception as e:
print(f"连接或清理数据库时提示: {e}")
# 添加文档
if documents:
_ = vector_store.add_documents(documents)
return vector_store6. 创建 ReAct Agent¶
定义检索工具并初始化 Agent。
def create_react_agent_wrapper(vector_store: Chroma):
"""基于给定向量库创建带检索工具的 ReAct Agent。"""
@tool(response_format="content_and_artifact")
def retrieve_context(query: str):
"""基于向量库检索与问题最相关的文本片段。"""
retrieved = vector_store.similarity_search(query, k=3)
serialized = "\n\n".join(
f"[{doc.metadata['source']}#{doc.metadata['chunk_id']}] {doc.page_content}"
for doc in retrieved
)
return serialized, retrieved
return create_agent(
llm,
tools=[retrieve_context],
system_prompt=(
"你可以使用检索工具获得参考资料。回答时结合检索到的内容,"
"如有必要可以在答案中简单引用来源标识。"
),
)7. 运行演示¶
执行以下代码块开始提问。
# 1. 准备并构建向量库
vector_store = build_vector_store()
print('嵌入完成' + '\n')
# 2. 创建 Agent
agent = create_react_agent_wrapper(vector_store)
# 3. 提问
query = "公司的考勤方式是什么?"
print(f"Query: {query}\n" + "-" * 20)
input_payload = {"messages": [{"role": "user", "content": query}]}
for event in agent.stream(input_payload, stream_mode="values"):
event["messages"][-1].pretty_print()成功加载 8 个文档到向量库
嵌入完成
Query: 公司的考勤方式是什么?
--------------------
================================ Human Message =================================
公司的考勤方式是什么?
================================== Ai Message ==================================
为了找到公司的考勤方式,我将采取以下步骤进行搜索:
1. 首先确定公司是否有明确的考勤制度及相关规定。
2. 然后查找具体的考勤方式,例如打卡、签到等。
现在我开始第一步,通过检索来确定公司的考勤制度和相关规定。
Tool Calls:
retrieve_context (call_9f3fed5b0751468f8a2ffd1f)
Call ID: call_9f3fed5b0751468f8a2ffd1f
Args:
query: 公司的考勤制度和相关规定
================================= Tool Message =================================
Name: retrieve_context
[question.txt#0] 问题:公司的考勤方式是什么?
答案:公司实行弹性打卡。
权限: IT组、运维组
关键词:考勤方式
[question.txt#1] 问题:公司的考勤方式是什么?
答案:公司实行固定世界打卡。
权限: 运营组
关键词:考勤方式
[question.txt#5] 问题:休息不固定的人员考勤打卡有什么要求?
答案:休息不固定的人员在当月休息天数在正常规定休息天数范围内,不需提交调休或请假申请,上班照常打2次卡(上/下班各一次)。
权限:
关键词:弹性休息、考勤打卡要求
================================== Ai Message ==================================
公司的考勤方式是弹性打卡。