实现基于文件元数据的 RBAC (基于角色的访问控制) 问答系统。
⚠️ 前置要求¶
读取同级目录下存在 files 文件夹,且文件夹内包含你的 .txt 知识库文件。
文件内容格式要求(示例):
问题:公司的考勤方式是什么?
答案:公司实行弹性打卡。
权限: IT组、运维组
关键词:考勤方式
问题:公司的考勤方式是什么?
答案:公司实行固定时间打卡。
权限: 运营组
关键词:考勤方式1. 核心类定义:Embedding 与 文档加载¶
这里完全复用了源代码的逻辑:
DashScopeEmbeddings: 适配阿里云 Embedding。load_txt_documents: 关键逻辑,解析文本块,提取权限:和关键词:到 metadata,并清洗正文。
import re
from pathlib import Path
from typing import Iterable, List, Tuple, Dict
from openai import OpenAI
from langchain_core.embeddings import Embeddings
from langchain_core.documents import Document
# 初始化 OpenAI 客户端
client = OpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_BASE_URL"),
)
class DashScopeEmbeddings(Embeddings):
"""DashScope 兼容的 Embeddings 封装。"""
def __init__(self, model: str = "text-embedding-v4", dimensions: int = 1024):
self.model = model
self.dimensions = dimensions
def embed_documents(self, texts: list[str]) -> list[list[float]]:
vectors: list[list[float]] = []
# 批处理避免超时
for i in range(0, len(texts), 5):
chunk = texts[i : i + 5]
try:
response = client.embeddings.create(
model=self.model,
input=chunk,
dimensions=self.dimensions,
)
vectors.extend([item.embedding for item in response.data])
except Exception as e:
print(f"Embedding error: {e}")
vectors.extend([[0.0]*self.dimensions] * len(chunk))
return vectors
def embed_query(self, text: str) -> list[float]:
response = client.embeddings.create(
model=self.model,
input=[text],
dimensions=self.dimensions,
)
return response.data[0].embedding
def load_txt_documents(data_dir: Path) -> list[Document]:
"""读取目录下的 txt 文件,提取元数据,清洗正文。"""
def split_on_blank(text: str) -> Iterable[str]:
# 使用正则按空行分割(兼容 \r\n)
for block in re.split(r"\n\s*\n", text):
cleaned = block.strip()
if cleaned:
yield cleaned
def parse_block(text_block: str) -> Tuple[str, Dict]:
"""
核心修改:
1. 提取 权限、关键词 到 metadata。
2. 返回的正文剔除这些元数据行,减少 Embedding 噪音。
"""
lines = text_block.split('\n')
content_lines = []
metadata = {}
for line in lines:
line = line.strip()
if not line: continue
# 提取权限 (支持中文冒号和英文冒号)
if line.startswith(("权限:", "权限:")):
raw_perm = line.split(':', 1)[1].strip()
# 使用正则分割:顿号、逗号、空格
perm_list = re.split(r'[、,,\s]+', raw_perm)
metadata["permissions"] = [p for p in perm_list if p]
# 提取关键词
elif line.startswith(("关键词:", "关键词:")):
metadata["keywords"] = line.split(':', 1)[1].strip()
# 保留正文 (问题和答案)
else:
content_lines.append(line)
return "\n".join(content_lines), metadata
documents: list[Document] = []
if not data_dir.exists():
print(f"警告:目录 {data_dir} 不存在,跳过加载。")
return []
for path in sorted(data_dir.glob("*.txt")):
print(f"正在读取文件: {path.absolute()}")
content = path.read_text(encoding="utf-8")
for idx, part in enumerate(split_on_blank(content)):
clean_content, extracted_meta = parse_block(part)
final_metadata = {
"source": path.name,
"chunk_id": idx,
**extracted_meta
}
documents.append(
Document(
page_content=clean_content,
metadata=final_metadata,
)
)
return documents2. 构建向量库¶
读取 files 目录下的现有文件。
from langchain_core.vectorstores import InMemoryVectorStore
def build_vector_store(target_dir: Path) -> InMemoryVectorStore:
documents = load_txt_documents(target_dir)
if not documents:
print("❌ 未加载到任何文档,请检查 files 目录是否存在 .txt 文件。")
# 返回空库防止报错
return InMemoryVectorStore(embedding=DashScopeEmbeddings())
print(f"✅ 成功加载 {len(documents)} 个文档片段")
embeddings = DashScopeEmbeddings()
vector_store = InMemoryVectorStore(embedding=embeddings)
vector_store.add_documents(documents)
return vector_store
# 指定目录为当前 Notebook 下的 files 目录
data_path = Path.cwd() / "files"
vector_store = build_vector_store(data_path)正在读取文件: c:\Users\Lenovo\Desktop\product\dive-into-langgraph-plus\files\question.txt
✅ 成功加载 8 个文档片段
3. 创建 Agent 与 权限过滤逻辑¶
这里实现了核心的 filter_func,通过比较 user_permission 和文档元数据中的 permissions 列表来决定是否返回文档。
from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from langchain.tools import tool
# 配置大模型
llm = ChatOpenAI(
api_key=os.getenv("DASHSCOPE_API_KEY"),
base_url=os.getenv("DASHSCOPE_BASE_URL"),
model="qwen3-coder-plus",
temperature=0,
)
def create_react_agent(vector_store: InMemoryVectorStore, user_permission: str):
@tool(response_format="content_and_artifact")
def retrieve_context(query: str):
"""检索知识库。"""
# === 核心修改:修复过滤逻辑 ===
def filter_func(doc: Document) -> bool:
# 获取文档的权限列表
doc_perms = doc.metadata.get("permissions", [])
# 1. 如果文档没有设置权限,默认为公开,返回 True
if not doc_perms:
return True
# 2. 检查用户权限是否在文档允许的列表中
return user_permission in doc_perms
print(f"\n[检索中] 用户权限: {user_permission}, 查询: {query}")
# 执行检索
retrieved = vector_store.similarity_search(
query,
k=3,
filter=filter_func # 传入过滤函数
)
if not retrieved:
return "没有找到相关且您有权限查看的文档。", []
# 格式化上下文
serialized = "\n\n".join(
f"---片段 {i+1}---\n{doc.page_content}"
for i, doc in enumerate(retrieved)
)
return serialized, retrieved
return create_agent(
llm,
tools=[retrieve_context],
system_prompt=(
"你是一个企业知识问答助手。"
"必须优先根据检索到的【参考资料】回答用户问题。"
"只输出用户权限组内的文档内容,不输出其他权限组的文档。"
"不清晰或有多个相似回答的,需要咨询用户进行确认。"
),
)4. 运行测试¶
修改下方的 user_permission 和 query 来测试你的现有文档。
# === 测试场景 1 ===
# 假设你的 files/x.txt 中包含 "权限: IT组"
current_permission = "IT组"
query = "公司的考勤方式是什么?"
agent = create_react_agent(vector_store, user_permission=current_permission)
response = agent.invoke({"messages": [{"role": "user", "content": query}]})
print("\n=== 最终回答 ===")
print(response["messages"][-1].content)
[检索中] 用户权限: IT组, 查询: 公司的考勤方式是什么?
=== 最终回答 ===
公司的考勤方式是**弹性打卡**。员工可以根据自己的工作安排,在规定的时间内灵活选择上下班打卡时间。此外,对于忘记打卡的情况,员工可以在缺卡发生后的3个工作日内申请补卡,每人每月限补卡3次,超过次数将不再受理。若未及时补卡,则每次缺卡将按缺勤半天处理,并扣发半天工资。
需要注意的是,对于休息不固定的人员,只要其当月休息天数在正常规定的范围内,就不需要提交调休或请假申请,但仍需按照常规要求每天打卡两次(上、下班各一次)。
# === 测试场景 2 ===
# 切换为 运营组,再次提问
current_permission = "运营组"
query = "公司的考勤方式是什么?"
agent = create_react_agent(vector_store, user_permission=current_permission)
response = agent.invoke({"messages": [{"role": "user", "content": query}]})
print("\n=== 最终回答 ===")
print(response["messages"][-1].content)
[检索中] 用户权限: 运营组, 查询: 公司的考勤方式是什么?
=== 最终回答 ===
公司实行固定时间打卡的考勤方式。员工需在上下班时各打卡一次。如果有忘记打卡的情况,可以在缺卡发生后的3个工作日内申请补卡,每人每月限补卡3次,超过限制将不再受理。若未及时补卡,每次缺卡将按缺勤半天处理,并扣发半天工资。
对于休息不固定的人员,如果其当月休息天数在正常规定范围内,则无需提交调休或请假申请,但仍需按照正常工作日的要求打卡(上、下班各一次)。