Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

带权限内存知识智能体 (Auth Agent RAG)


实现基于文件元数据的 RBAC (基于角色的访问控制) 问答系统。

⚠️ 前置要求

读取同级目录下存在 files 文件夹,且文件夹内包含你的 .txt 知识库文件。

文件内容格式要求(示例):

问题:公司的考勤方式是什么?
答案:公司实行弹性打卡。
权限: IT组、运维组
关键词:考勤方式

问题:公司的考勤方式是什么?
答案:公司实行固定时间打卡。
权限: 运营组
关键词:考勤方式

1. 核心类定义:Embedding 与 文档加载

这里完全复用了源代码的逻辑:

  1. DashScopeEmbeddings: 适配阿里云 Embedding。

  2. load_txt_documents: 关键逻辑,解析文本块,提取 权限:关键词: 到 metadata,并清洗正文。

import re
from pathlib import Path
from typing import Iterable, List, Tuple, Dict
from openai import OpenAI
from langchain_core.embeddings import Embeddings
from langchain_core.documents import Document

# 初始化 OpenAI 客户端
client = OpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url=os.getenv("DASHSCOPE_BASE_URL"),
)

class DashScopeEmbeddings(Embeddings):
    """DashScope 兼容的 Embeddings 封装。"""
    def __init__(self, model: str = "text-embedding-v4", dimensions: int = 1024):
        self.model = model
        self.dimensions = dimensions

    def embed_documents(self, texts: list[str]) -> list[list[float]]:
        vectors: list[list[float]] = []
        # 批处理避免超时
        for i in range(0, len(texts), 5):
            chunk = texts[i : i + 5]
            try:
                response = client.embeddings.create(
                    model=self.model,
                    input=chunk,
                    dimensions=self.dimensions,
                )
                vectors.extend([item.embedding for item in response.data])
            except Exception as e:
                print(f"Embedding error: {e}")
                vectors.extend([[0.0]*self.dimensions] * len(chunk))
        return vectors

    def embed_query(self, text: str) -> list[float]:
        response = client.embeddings.create(
            model=self.model,
            input=[text],
            dimensions=self.dimensions,
        )
        return response.data[0].embedding

def load_txt_documents(data_dir: Path) -> list[Document]:
    """读取目录下的 txt 文件,提取元数据,清洗正文。"""

    def split_on_blank(text: str) -> Iterable[str]:
        # 使用正则按空行分割(兼容 \r\n)
        for block in re.split(r"\n\s*\n", text):
            cleaned = block.strip()
            if cleaned:
                yield cleaned

    def parse_block(text_block: str) -> Tuple[str, Dict]:
        """
        核心修改:
        1. 提取 权限、关键词 到 metadata。
        2. 返回的正文剔除这些元数据行,减少 Embedding 噪音。
        """
        lines = text_block.split('\n')
        content_lines = []
        metadata = {}
        
        for line in lines:
            line = line.strip()
            if not line: continue

            # 提取权限 (支持中文冒号和英文冒号)
            if line.startswith(("权限:", "权限:")):
                raw_perm = line.split(':', 1)[1].strip()
                # 使用正则分割:顿号、逗号、空格
                perm_list = re.split(r'[、,,\s]+', raw_perm)
                metadata["permissions"] = [p for p in perm_list if p]
            
            # 提取关键词
            elif line.startswith(("关键词:", "关键词:")):
                metadata["keywords"] = line.split(':', 1)[1].strip()
            
            # 保留正文 (问题和答案)
            else:
                content_lines.append(line)
        
        return "\n".join(content_lines), metadata

    documents: list[Document] = []

    if not data_dir.exists():
        print(f"警告:目录 {data_dir} 不存在,跳过加载。")
        return []

    for path in sorted(data_dir.glob("*.txt")):
        print(f"正在读取文件: {path.absolute()}") 
        content = path.read_text(encoding="utf-8")
        
        for idx, part in enumerate(split_on_blank(content)):
            clean_content, extracted_meta = parse_block(part)
            
            final_metadata = {
                "source": path.name, 
                "chunk_id": idx,
                **extracted_meta 
            }
            
            documents.append(
                Document(
                    page_content=clean_content,
                    metadata=final_metadata,
                )
            )
            
    return documents

2. 构建向量库

读取 files 目录下的现有文件。

from langchain_core.vectorstores import InMemoryVectorStore

def build_vector_store(target_dir: Path) -> InMemoryVectorStore:
    documents = load_txt_documents(target_dir)

    if not documents:
        print("❌ 未加载到任何文档,请检查 files 目录是否存在 .txt 文件。")
        # 返回空库防止报错
        return InMemoryVectorStore(embedding=DashScopeEmbeddings())

    print(f"✅ 成功加载 {len(documents)} 个文档片段")
    
    embeddings = DashScopeEmbeddings()
    vector_store = InMemoryVectorStore(embedding=embeddings)
    vector_store.add_documents(documents)
    
    return vector_store

# 指定目录为当前 Notebook 下的 files 目录
data_path = Path.cwd() / "files"
vector_store = build_vector_store(data_path)
正在读取文件: c:\Users\Lenovo\Desktop\product\dive-into-langgraph-plus\files\question.txt
✅ 成功加载 8 个文档片段

3. 创建 Agent 与 权限过滤逻辑

这里实现了核心的 filter_func,通过比较 user_permission 和文档元数据中的 permissions 列表来决定是否返回文档。

from langchain_openai import ChatOpenAI
from langchain.agents import create_agent
from langchain.tools import tool

# 配置大模型
llm = ChatOpenAI(
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    base_url=os.getenv("DASHSCOPE_BASE_URL"),
    model="qwen3-coder-plus",
    temperature=0,
)

def create_react_agent(vector_store: InMemoryVectorStore, user_permission: str):

    @tool(response_format="content_and_artifact")
    def retrieve_context(query: str):
        """检索知识库。"""
        
        # === 核心修改:修复过滤逻辑 ===
        def filter_func(doc: Document) -> bool:
            # 获取文档的权限列表
            doc_perms = doc.metadata.get("permissions", [])
            
            # 1. 如果文档没有设置权限,默认为公开,返回 True
            if not doc_perms: 
                return True 
            
            # 2. 检查用户权限是否在文档允许的列表中
            return user_permission in doc_perms

        print(f"\n[检索中] 用户权限: {user_permission}, 查询: {query}")
        
        # 执行检索
        retrieved = vector_store.similarity_search(
            query, 
            k=3, 
            filter=filter_func # 传入过滤函数
        )
        
        if not retrieved:
            return "没有找到相关且您有权限查看的文档。", []

        # 格式化上下文
        serialized = "\n\n".join(
            f"---片段 {i+1}---\n{doc.page_content}"
            for i, doc in enumerate(retrieved)
        )
        return serialized, retrieved

    return create_agent(
        llm,
        tools=[retrieve_context],
        system_prompt=(
            "你是一个企业知识问答助手。"
            "必须优先根据检索到的【参考资料】回答用户问题。"
            "只输出用户权限组内的文档内容,不输出其他权限组的文档。"
            "不清晰或有多个相似回答的,需要咨询用户进行确认。"
        ),
    )

4. 运行测试

修改下方的 user_permissionquery 来测试你的现有文档。

# === 测试场景 1 ===
# 假设你的 files/x.txt 中包含 "权限: IT组"
current_permission = "IT组"
query = "公司的考勤方式是什么?"

agent = create_react_agent(vector_store, user_permission=current_permission)

response = agent.invoke({"messages": [{"role": "user", "content": query}]})

print("\n=== 最终回答 ===")
print(response["messages"][-1].content)

[检索中] 用户权限: IT组, 查询: 公司的考勤方式是什么?

=== 最终回答 ===
公司的考勤方式是**弹性打卡**。员工可以根据自己的工作安排,在规定的时间内灵活选择上下班打卡时间。此外,对于忘记打卡的情况,员工可以在缺卡发生后的3个工作日内申请补卡,每人每月限补卡3次,超过次数将不再受理。若未及时补卡,则每次缺卡将按缺勤半天处理,并扣发半天工资。

需要注意的是,对于休息不固定的人员,只要其当月休息天数在正常规定的范围内,就不需要提交调休或请假申请,但仍需按照常规要求每天打卡两次(上、下班各一次)。
# === 测试场景 2 ===
# 切换为 运营组,再次提问
current_permission = "运营组"
query = "公司的考勤方式是什么?"

agent = create_react_agent(vector_store, user_permission=current_permission)

response = agent.invoke({"messages": [{"role": "user", "content": query}]})

print("\n=== 最终回答 ===")
print(response["messages"][-1].content)

[检索中] 用户权限: 运营组, 查询: 公司的考勤方式是什么?

=== 最终回答 ===
公司实行固定时间打卡的考勤方式。员工需在上下班时各打卡一次。如果有忘记打卡的情况,可以在缺卡发生后的3个工作日内申请补卡,每人每月限补卡3次,超过限制将不再受理。若未及时补卡,每次缺卡将按缺勤半天处理,并扣发半天工资。

对于休息不固定的人员,如果其当月休息天数在正常规定范围内,则无需提交调休或请假申请,但仍需按照正常工作日的要求打卡(上、下班各一次)。