AI技术源码分析

Spring AI Alibaba Graph

旨在利用 Spring AI 作为桥梁,连接阿里巴巴的大语言模型(如通义千wen)和图数据库(如阿里云 GDB),以实现通过自然语言与复杂关联数据进行交互的目的。

底层核心原理是 Text-to-Cypher/Gremlin,即将用户的自然语言问题,通过大语言模型(LLM)动态翻译成图数据库的专业查询语言

设计思想

图数据库(Graph Database)非常擅长处理实体之间的复杂关系,例如社交网络中的“朋友的朋友”、金融风控中的“资金链路”、电商中的“共同购买”等。但要查询这些数据,需要使用专门的图查询语言,如 Gremlin (阿里云 GDB 支持) 或 Cypher。这些语言学习门槛高,普通业务人员无法直接使用。

“Spring AI Alibaba Graph”架构的核心思想就是解决这个问题:让 LLM 充当一个智能的“翻译官”。

流程:

赋予 LLM 上下文知识 (Context Awareness):一个通用的 LLM 不知道你的图数据库里有什么。因此,必须在查询时,动态地告诉 LLM 图的“Schema”(模式),包括:

  • 节点标签 (Node Labels):例如 Person(人)、Company(公司)、Product(产品)。
  • 边标签 (Edge Labels):例如 WORKS_FOR(为…工作)、INVESTED_IN(投资了)、BOUGHT(购买了)。
  • 属性 (Properties):每个节点和边上可能有的属性,例如 Person 节点有 name, age 属性。

利用 LLM 的代码生成能力 (Code Generation):大语言模型本质上是一个强大的序列生成器,它不仅能生成文章,也能生成代码。通过精巧的提示工程 (Prompt Engineering),我们可以引导 LLM 将用户的自然语言问题,结合我们提供的图 Schema,生成一段完全正确的 Gremlin 或 Cypher 查询代码。

自动化执行与反馈:生成查询代码后,程序自动执行它,并将结果返回。为了提升用户体验,甚至可以将图数据库返回的结构化数据再次交给 LLM,让它用自然语言进行总结和概括。

架构

实际流程:

用户提问: 用户通过前端界面或 API,输入自然语言问题:“查询投资了‘阿里巴巴’的所有人的姓名”。

Spring 应用接收: 一个基于 Spring Boot 和 Spring AI 构建的后端服务接收到这个请求。

构建精确的 Prompt: 这是最关键的一步。程序会动态构建一个发给 LLM 的 Prompt,它通常包含以下部分:

  • 入口 : 开发者代码调用 graphClient.query("查詢投資了阿里巴巴的所有人")
    委派 : DefaultGraphClient 将请求委派给 graphChatService.chat("...")
    检索(Retrieval) : graphChatService 首先调用其持有的 graphStore.getSchema() 方法。 Neo4jGraphStore (假设)会连接 Neo4j 数据库,执行 CALL db.schema.visualization() 或类似命令,获取 Schema 并格式化为文本。
    构建(Prompt Construction) : graphChatService 使用 GraphChatPromptFactory ,将上一步获取的 Schema 文本和用户问题填充到一个预设的模板中,生成最终的 Prompt 对象。
    生成(Generation) : graphChatService 调用 chatClient.call(prompt)spring-ai-alibaba-tongyi-starter 捕获此调用,向通义千问 API 发送请求。
    解析(Parsing) : 通义千问返回一个包含 Cypher 查询的 ChatResponsegraphChatService 使用 GraphCypherResponseParser 从中提取出纯净的 Cypher 查询字符串。
    执行(Execution) : graphChatService 拿到查询字符串后,调用 graphStore.execute(cypherQuery)返回 : Neo4jGraphStore 使用 Neo4j Java Driver 执行查询,并将结果返回。这个结果最终沿着调用链返回给开发者。

三大核心

GraphStore,图存储的统一抽象

设计目的 : 将上层的“Text-to-Query”逻辑与底层具体的图数据库(Neo4j, NebulaGraph, 阿里云 GDB 等)完全解耦。无论底层是什么图数据库,对上层来说,它们都只是一个 GraphStore

核心方法:

getSchema() : 这是实现 RAG 的信息来源 。此方法负责连接到底层数据库,并提取其 Schema 信息(节点标签、边标签、属性等),将其格式化为一个可以喂给 LLM 的字符串。

execute(String query) : 这是执行查询的统一入口。它接收 LLM 生成的查询字符串(如 Cypher 或 Gremlin),并通过具体的数据库驱动来执行它,然后返回结果。

设计模式 : 策略模式(Strategy Pattern) 。用户可以根据自己的数据库类型,注入不同的 GraphStore 实现(例如 Neo4jGraphStore , TinkerPopGremlinGraphStore ),而上层 GraphClient 的代码无需任何改动。一个抽象类的实现

问题and挑战:

  1. GraphStore.getSchema() 的“信息密度”挑战

“提示词爆炸” (Prompt Explosion) 与成本问题:当图的 Schema 变得非常庞大时(成百上千种节点和边),getSchema() 返回的字符串会非常长。这不仅会急剧增加调用 LLM 的 Token 成本,更严重的是,过长的、充斥着大量无关信息的上下文,反而可能降低 LLM 的理解和推理能力,导致生成错误的查询。

Schema 的“知识”密度不足:Schema 只定义了“结构”,但没有定义“语义”。例如,一个 TRANSACTION 的边,其 amount 属性是正是负代表什么业务含义?LLM 仅从 Schema 中无法得知。

解决:

一个更先进的 GraphStore 实现,不应该仅仅是“暴力”地返回整个 Schema。它应该进化为一个“Schema 智能检索器”

  1. Schema 知识图谱化/向量化:将图的 Schema 本身(节点标签、边标签、属性及其注释)也构建成一个小型知识图谱或将其向量化存储。
  2. 两阶段 RAG:当用户提问时,第一阶段,先对用户的问题进行向量相似度搜索,从海量的 Schema 中检索出与问题最相关的子图 Schema(例如,问题关于“交易”,就只提取 Account, Transaction 等相关的节点和边)。第二阶段,再将这个高度相关的、精简的子图 Schema 喂给 LLM 去生成查询。
  3. 动态数据样本getSchema() 还可以被增强为 getContext(),除了返回 Schema,还能动态地从图中抓取几条与问题相关的实际数据样本(例如,几条交易记录),一并作为上下文提供给 LLM。这能极大地帮助 LLM 理解数据格式和语义,提升查询准确率。

GraphClient :面向用户的核心入口

设计目的 : 隐藏所有内部复杂的交互流程(获取 Schema -> 构建 Prompt -> 调用 LLM -> 解析响应-> 执行查询),为最终用户提供一个极其简单的接口。

核心方法剖析:

  • query(String question, ...) : 这是最核心的方法。开发者只需要传入自然语言问题,就可以得到图数据库的查询结果。它的默认实现 DefaultGraphClient编排下面要讲的 GraphChatServiceGraphStore 来完成整个流程。

GraphChatService :LLM 交互与智能的核心

设计目的 : 专门负责处理与大语言模型(LLM)之间的所有交互,实现核心的“Text-to-Query”转换逻辑。

核心:

依赖注入 : 它会被注入一个 GraphStore 实例和一个 Spring AI 的 ChatClient 实例。
Prompt 工厂 : 内部会使用一个 GraphChatPromptFactory 来创建 Prompt。这个工厂是提示工程(Prompt Engineering) 的具体体现,它会接收 GraphStore.getSchema() 获取到的 Schema 和用户问题,然后把它们组装成一个结构化、高质量的 Prompt 模板。
调用 LLM : 它使用注入的 ChatClient 将生成的 Prompt 发送给 LLM(例如通义千问)。响应解析 : 它还包含一个 ResponseParser 。因为 LLM 的返回内容可能不只是纯粹的查询语句(例如,可能包含在 Markdown 的 cypher ... 代码块中,或者有一些解释性的文字), ResponseParser 负责从 LLM 的原始响应中精确地提取出可执行的查询代码 。这一步是保证系统稳定性的关键。

Google GraphRAG

Google GraphRAG 的核心目标是从非结构化文本中构建图,以优化 RAG 的检索效果

传统的向量检索(Vector Search)只能找到与问题“表面相似”的文本片段,却忽略了文档内部和文档之间隐藏的深层语义关联。GraphRAG 的核心思想就是通过从文本中提取实体和关系来构建知识图谱,并利用图的社区结构来发现这些深层关联,从而为 LLM 提供更全面、更具上下文的背景信息

图的角色:在这里,图不是最终的知识库,而是一个为了提升检索质量而生成的“中间索引结构”。最终答案的来源(Source of Truth)依然是原始的文本。


GraphRAG 的流程分为“离线索引”和“在线查询”两个阶段。

阶段一:离线索引(数据预处理)

  1. 实体与关系提取 (Information Extraction):系统读取所有源文档(如 PDF, Word, TXT),利用 LLM 遍历这些文本,提取出关键的实体(人、事、物、概念)以及它们之间的关系。
  2. 图谱构建 (Graph Construction):将提取出的实体作为“节点”,关系作为“边”,构建一个全局的知识图谱。
  3. 社区检测 (Community Detection):在构建好的图上运行图算法(如 Leiden 算法),将整个图划分成若干个高内聚、低耦合的“语义社区”。每个社区代表了一组在语义上紧密相关的主题或事件。
  4. 社区摘要 (Community Summarization):利用 LLM 为每一个检测出的社区生成一个高度概括的摘要描述。

阶段二:在线查询(应答用户提问)

  1. 用户提问:用户提出一个问题。
  2. 多路检索 (Multi-path Retrieval):系统并行地在两个层面上进行检索:
    • 全局检索:在所有“社区摘要”上进行向量检索,快速定位到与问题最相关的几个社区。
    • 局部检索:同时,也在所有原始文本块上进行传统的向量检索。
  3. 上下文构建 (Context Building):这是 GraphRAG 的核心创新。一旦通过全局检索定位到了一个或多个相关社区,系统不再是只返回几个零散的文本块,而是将整个社区内的所有实体、关系、以及关联的原始文本块,作为一个完整的、结构化的上下文提供出来。
  4. 生成答案 (Answer Generation):将这个极其丰富且上下文完整的“社区信息包”喂给 LLM,让它基于这些信息生成最终的、高质量的答案。

知识图谱Graph-bulider

基本流程

  1. 定义图谱模式

节点类型 (Entity Types): 定义你要抽取的实体类别,例如:Person (人物), Organization (组织), Product (产品), Location (地点)。

关系类型 (Relation Types): 定义实体之间可能存在的关系,例如:WORKS_AT (任职于), FOUNDED (创立了), LOCATED_IN (位于)。

属性 (Properties): 定义节点或关系可能包含的属性,例如 Person 节点可以有 name, birth_date 等属性。

类似于E-R图,实体对象,以及他们的关系和属性

  1. 数据准备与预处理

收集用于信息抽取的原始非结构化文本数据,如新闻文章、公司报告、技术文档等。根据需要进行清洗,去除无关信息(如HTML标签、广告等)。

  1. 提示词promt工程,注入属性查询

    • 在Prompt中清晰地列出预定义的节点类型和关系类型。
    • 要求LLM以标准、易于解析的格式(如JSON、JSON-LD)返回结果。这对于后续的自动化处理至关重要。指定输入输出格式
    • 提供示例 (Few-shot Learning): 在Prompt中给出1-2个“输入文本 -> 输出JSON”的例子,能极大提升LLM的抽取准确率和格式一致性。提供示例,便于LLM进行输入输出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
你是一个信息抽取专家。请从下面的文本中,根据我提供的Schema,抽取出所有实体和关系。

# Schema:
- 节点类型: Person, Company
- 关系类型: WORKS_AT (从 Person 指向 Company)

# 文本:
"张三是阿里巴巴的一名算法工程师,而李四则在腾讯工作。"

# 输出格式 (JSON):
请严格按照以下JSON格式输出,不要添加任何额外解释:
{
"entities": [
{"id": "张三", "type": "Person", "properties": {}},
{"id": "李四", "type": "Person", "properties": {}},
{"id": "阿里巴巴", "type": "Company", "properties": {}},
{"id": "腾讯", "type": "Company", "properties": {}}
],
"relations": [
{"source": "张三", "target": "阿里巴巴", "type": "WORKS_AT"},
{"source": "李四", "target": "腾讯", "type": "WORKS_AT"}
]
}
  1. 调用LLM进行抽取

将准备好的文本和精心设计的Prompt批量发送给LLM API,获取结构化的输出结果。

  1. 后处理与校验

格式校验: 验证返回的是否是合法的JSON。

实体对齐/归一化: “阿里”、”阿里巴巴”、”Alibaba” 可能指向同一实体,需要进行归一化处理。

关系校验: 检查关系是否符合Schema定义(例如,WORKS_AT 关系必须连接 PersonCompany)。

6.数据持久化

将校验过的实体和关系数据转换为图数据库(如Neo4j)的查询语句(通常是Cypher),然后批量导入到数据库中,完成知识图谱的构建。

查询语言 Cypher

create node:

1
CREATE (p:Person {name: 'Alice', born: 1990});

create friends:

1
2
MATCH (a:Person {name: 'Alice'}), (b:Person {name: 'Bob'})
CREATE (a)-[r:FRIENDS {since: 2010}]->(b);

match node:

1
MATCH (p:Person) RETURN p;

match node by where:

1
2
3
4
5
6
7
// 查找名字是'Alice'的人
MATCH (p:Person {name: 'Alice'}) RETURN p;
// 使用WHERE子句进行更复杂的过滤
MATCH (p:Person) WHERE p.born > 1980 RETURN p.name, p.born;
// 查找所有和'Alice'是朋友的人
MATCH (a:Person {name: 'Alice'})-[:FRIENDS]-(friend)
RETURN friend.name;

set/update/remove:

1
2
3
4
5
6
// 给'Alice'添加一个email属性
MATCH (p:Person {name: 'Alice'}) SET p.email = 'alice@example.com';
// 移除'Alice'的born属性
MATCH (p:Person {name: 'Alice'}) REMOVE p.born;
// 给'Alice'再添加一个Student标签
MATCH (p:Person {name: 'Alice'}) SET p:Student;

friend:

1
2
3
// 删除'Alice'和她所有朋友的关系
MATCH (a:Person {name: 'Alice'})-[r:FRIENDS]-()
DELETE r;
1
2
3
4
5
// 删除没有关系的节点
MATCH (p:Person {name: '孤立的节点'}) DELETE p;

// 删除节点及其所有关联的关系 (非常常用)
MATCH (p:Person {name: '需要删除的人'}) DETACH DELETE p;

index:

1
2
CREATE INDEX person_name_index FOR (p:Person) ON (p.name);
CREATE CONSTRAINT person_name_unique ON (p:Person) ASSERT p.name IS UNIQUE;

优化检索

  1. 智能分块:

按语义分块: 利用NLP库(如NLTK, spaCy)或LangChain中的RecursiveCharacterTextSplitter,尝试按句子、段落、Markdown标题等语义边界进行切分,并设置重叠区域(overlap)来保留上下文联系。

面向实体/命题的分块: 对于更高级的应用,可以先用LLM提取出文本中的核心命题(Propositions)或实体关系,然后将这些结构化的信息作为检索单元。这能实现更精准的原子化信息检索。

文档结构化解析: 针对PDF、HTML等文件,不仅仅是提取纯文本。解析其标题、表格、列表等结构化信息,并将这些元数据(Metadata)与文本块一同索引,对后续的元数据过滤至关重要。

  1. 优化嵌入模型:

选择合适的模型: 不同的模型有不同的优势。例如,BGE (BAAI General Embedding)系列模型在中英文任务上表现优异,而OpenAI的text-embedding-3-large则在多语言和综合性能上领先。你需要根据你的主要语种和应用场景进行选型和评测。

领域微调 (Fine-tuning): 如果你的业务数据具有非常强的领域特性(如法律、医疗),通用Embedding模型可能无法很好地理解专业术语的语义。在这种情况下,使用领域数据对Embedding模型进行微调,可以显著提升检索效果。

数据清洗:去除文档中的噪声,如HTML标签、页眉页脚、广告语、不相关的图片描述等。

添加元数据: 为每个文本块添加丰富的元数据,如来源文档、章节标题、作者、发布日期等。

生成假设性问题 (Hypothetical Questions): 使用LLM为每个文本块生成几个它可能回答的问题,并将这些问题与文本块一起进行向量化。这样当用户的查询与某个假设性问题相似时,就能更容易地找到对应的原文。这个方法也称为“HyDE”(Hypothetical Document Embeddings),效果显著。


检索:

  1. 使用混合检索 (Hybrid Search)

语义检索 (Dense Retrieval): 即向量检索,擅长理解概念、意图和模糊查询。

关键词检索 (Sparse Retrieval): 如传统的BM25算法,擅长匹配精确的关键词、术语、ID等。

优势: 当用户查询“Neo4j 5.18.0的新特性”时,关键词检索能精准定位到5.18.0这个版本号,而语义检索能理解“新特性”这个概念。两者结合,能大幅提升召回率和准确率。许多现代向量数据库(如Weaviate, Pinecone)都原生支持混合检索。

  1. 优化参数

向量数据库通常使用近似最近邻(ANN)算法来加速检索,最主流的是HNSWIVF

  • HNSW (Hierarchical Navigable Small World):
    • ef_construction: 构建索引时的邻居节点数,越高索引质量越好,但构建越慢。
    • ef_search: 查询时搜索的邻居节点数,越高越精确,但延迟也越高。
  • IVF (Inverted File):
    • nlist: 聚类中心的数量。
    • nprobe: 查询时要搜索的聚类中心数量。
  • 优化策略: 根据你的业务场景对延迟和精度的要求,通过实验来调整这些参数,找到最佳的平衡点。例如,对于离线分析任务,可以调高精度参数;对于在线实时问答,则需要优先保证低延迟。

检索后处理优化

  1. 重排序 (Re-ranking):

在通过ANN算法快速召回(例如Top 20个)候选文档后,再使用一个更强大、更精确但计算量也更大的模型对这批候选结果进行重新排序。

  • 工作原理: 初步检索使用的是双编码器(Bi-Encoder)模型,它独立地为query和document生成向量。而重排序通常使用交叉编码器(Cross-Encoder)模型,它将query和document同时输入模型进行计算,能更精准地判断两者间的相关性。
  • 效果: Re-ranker能有效地将最相关的文档从Top 20提升到Top 3,极大地改善了送给LLM的上下文质量。BGE-Reranker 是一个常用的开源模型。

查询转换:

  1. 查询重写: 让LLM将口语化的、模糊的查询改写为更清晰、更结构化的查询。

  2. 子问题分解: 当用户提出一个复杂问题时(例如“对比A和B的优缺点并说明应用场景”),可以先让LLM将其分解成多个独立的子问题(“A的优点是什么?”、“B的优点是什么?”、“A的应用场景?”等),然后对每个子问题分别进行检索,最后汇总结果。

Step-Back Prompting: 让LLM从一个具体问题中提炼出一个更泛化、更高层次的问题,对这个高层次问题进行检索,获取更宏观的背景知识,有助于回答原始的具体问题。

  1. 多路路由:

建立多个子索引: 根据文档的主题或来源(如“技术文档索引”、“市场分析索引”、“法律条款索引”)建立不同的向量索引。

查询路由器: 在检索前,先用一个LLM分类器(Router)来判断用户的查询属于哪个主题,然后仅在对应的子索引中进行检索。

优势: 大大缩小了检索范围,提升了速度和精度,避免了不同主题间的知识干扰。

  1. 动态构建知识图谱

一个 RAG 系统最怕的就是知识库过时和答案不可信(幻觉)。因此,让知识图谱能够动态更新,并让所有答案都可溯源,是最高阶的要求。

目标: 将知识图谱的“构建”与“查询”打通,并为每一次查询结果提供“证据”。

  • 实现知识抽取与写入流水线:创建一个“知识抽取 Agent”,它持续地读取新的非结构化文档。利用 LLM 的信息抽取能力,将文本转化为结构化的“实体-关系-实体”三元组。将这些三元组转化为图数据库的写入语句(如 MERGE)。通过 graphStore.execute(writeQuery) 将新知识写入知识图谱。
  • 为知识添加“证据”元数据,在执行上述写入操作时,最重要的一点是:为每一个创建的节点和关系,都附加上来源元数据
1
2
3
4
5
MERGE (p:Person {name: '马云'})
ON CREATE SET p.source = '2024年阿里巴巴年报.pdf', p.page = 5

MERGE (c:Company {name: '阿里巴巴'})
ON CREATE SET c.source = '2024年阿里巴巴年报.pdf', c.page = 1

修改 GraphClient 的返回结果,使其不仅包含数据,也包含数据对应的 source 等元数据。

在最终生成答案的 Prompt 中,明确指示 LLM:**“你必须根据提供的来源信息,为你的答案提供引用。”

知识图谱的设计

逻辑设计

1.初始化知识图谱走类似Google GraphRAG

  • 首先将用户输入的信息,转换为三元组的形式,存储为节点和关系,注意使用meger函数,还需要标记来源,作为证据。
  • 然后同样的数据,经过嵌入模型向量化,存储在向量数据库中。
  • 根据输入的信息的相关度,新建子索引,细化处理分析

2.已有知识图谱的cyher查询

  • 将输入的语言转为cyher语言进行查询,同时在向量数据库中进行关键词查询和语义搜索,两个数据库同时搜索出结果,将其放入重排模型进行topk查询,然后将最后结果交给LLM进行输出

模块设计

数据处理:

  1. 数据源适配器,负责从不同数据源(本地文件系统、S3、SharePoint、PostgreSQL、REST API)拉取数据。为每种数据源实现一个独立的适配器插件,输出统一格式的RawDocument对象(包含内容、元数据、来源URL等)。

  2. 知识抽取服务,接收RawDocument,进行预处理、信息抽取和格式化。

    预处理: 清洗文本(去除HTML标签、无关字符)。

    智能分块 (Semantic Chunking): 使用RecursiveCharacterTextSplitter等策略,将长文本切分为有意义的TextChunk

    LLM信息抽取: 设计包含Schema定义JSON输出格式的Prompt,调用LLM,从TextChunk中抽取出实体、关系、属性,生成结构化的JSON数据。

    向量化: 调用Embedding模型,将每个TextChunk向量化。

    打包消息: 将结构化JSON和向量化的TextChunk打包成一个KnowledgePacket消息。

输出: 向消息队列(Kafka)的特定Topic(如knowledge-ingestion)发送KnowledgePacket消息。

  1. 异步写入消费者,订阅消息队列,实现双模态数据的持久化。

Graph写入消费者:

  • 消费KnowledgePacket,解析其中的结构化JSON。
  • 将其转换为幂等的MERGE Cypher语句。
  • 关键设计: 节点属性必须包含source(来源)、chunk_id(关联向量库)、last_updated等溯源信息。
  • 批量写入Neo4j。

Vector写入消费者:

  • 消费KnowledgePacket,提取TextChunk、向量和元数据。
  • 元数据中必须包含从Graph写入后生成的node_uuid,用于反向关联。
  • 根据预设策略(如按文档来源),将数据写入Vector DB的指定子索引/集合中。

智能查询:将用户自然语言问题,高效、精准地转化为基于知识的答案。

  1. 查询编排服务:作为查询流程的大脑,接收API Gateway转发的用户请求,编排整个查询过程。

NL-to-Cypher生成: 调用LLM,将用户问题转换为Cypher查询。增加安全防护: 对生成的Cypher进行语法校验和规则检查(如限制查询深度),防止恶意或低效查询。

查询任务分发: 并发地向三个执行器分发任务:

  • Graph查询执行器(执行Cypher)
  • Vector语义查询执行器(执行向量相似度搜索)
  • Vector关键词查询执行器(执行全文检索或BM25)
  1. 查询融合与重排引擎,收集并发查询的结果,进行融合、排序,筛选出最佳上下文。

结果规范化: 将三路查询返回的异构结果(图路径、文本块)转换为统一的Evidence对象列表。Evidence对象结构:{ content: string, source: string, score: float, type: 'graph' | 'vector' }

  • 处理技巧: 对于Graph结果,需将其路径或节点属性“渲染”成一段通顺的文本作为content

初步融合与去重: 合并三个列表,并根据内容相似性进行去重。

调用重排模型 (Re-ranker): 将融合后的Evidence列表(例如Top 50)和原始用户问题一起发送给重排模型,模型会对每个Evidence与问题的相关性进行打分。

筛选Top-K: 根据重排模型的得分,选出最相关的Top-K个(如Top 5)Evidence作为最终上下文。

3.LLM答案合成服务,基于筛选出的高质量上下文,生成最终答案。

构建最终Prompt: 模板如下:“请根据以下上下文信息,用严谨、专业的语气回答用户的问题。在回答时,请明确引用信息的来源。上下文:[...插入Top-K Evidence...]。用户问题:[...原始问题...]”

调用LLM生成: 使用流式(Streaming)API调用LLM,将生成的答案实时返回给前端。

数据Model设计

Graph (Neo4j):

节点 (Labels): :Document, :Entity, :Person, :Organization, :Concept 等。

节点属性 (Properties):

  • uuid: 全局唯一标识符。
  • name: 实体名称。
  • source_url: 知识来源URL。
  • metadata: 其他业务属性(JSON格式)。
  • created_at, updated_at: 时间戳。

关系 (Types): HAS_ENTITY, WORKS_AT, PARTNERS_WITH, RELATED_TO 等。

关系属性: source: 关系来源的文档或句子。weight: 关系强度。

Vector (Milvus):

集合/Collection Schema:

  • chunk_id (Primary Key, VarChar)
  • text_content (String)
  • text_vector (FloatVector, 1024 dims)
  • metadata (JSON): { "doc_id": "...", "doc_title": "...", "graph_node_uuid": "..." }

分布式微服务设计

遇到问题及其解决

  1. 数据库的一致性问题

因为我们使用一个图数据库,一个向量库所以我们的数据库如何保证一致性呢?

写操作需要同时成功写入Neo4j和Milvus两个异构系统,无法使用传统的两阶段提交(2PC)实现分布式事务。

解决方案:采用“事务性发件箱(Transactional Outbox)+ 消息队列”模式,保证最终一致性。

改造知识抽取服务:

  • 当服务完成信息抽取和向量化后,不要直接向Kafka发消息
  • 而是在同一个本地数据库事务中,将KnowledgePacket存入一张本地的outbox表,并将业务数据的状态更新。这样保证了只要业务成功,要发送的消息就一定被持久化了。

引入消息中继(Message Relay):

  • 部署一个独立的服务(推荐使用xxl-job?),专门负责“扫描”outbox表。
  • 一旦发现新消息,就将其可靠地投递到Kafka,并在成功投递后,标记或删除outbox表中的对应记录。

效果: 这个模式将“业务操作”和“消息发送”解耦,彻底避免了“业务成功但消息没发出去”或“消息发出去了但业务回滚了”的经典问题,保证了消息至少成功发布一次

  1. 消息幂等的处理

至少一次”的消息投递保证,意味着消费者可能会收到重复消息。同时,网络或消费者崩溃可能导致消息丢失。

依靠Kafka的持久性保证 + 消费者的幂等设计。

防止消息丢失:

  • 生产者侧: 在消息中继服务中,设置Kafka Producer的acks=all,确保消息被所有ISR(In-sync Replicas)确认后才算成功。
  • 消费者侧: 关闭消费者的enable.auto.commit,改为手动提交偏移量。只有当数据成功写入Neo4j和Milvus后,才在代码中显式调用consumer.commitSync()。如果写入失败,程序抛出异常,偏移量不会被提交,下次重启后会重新消费这条消息。

保证消费者幂等:

  • KnowledgePacket消息中,生成一个全局唯一的event_id
  • Graph写入消费者: MERGE操作天生就是幂等的,无需额外处理。
  • Vector写入消费者: 在写入Milvus前,先根据chunk_id(主键)检查数据是否已存在。如果存在,则执行更新操作;如果不存在,则执行插入。或者,消费者可以维护一个已处理event_id的记录(例如存在Redis中,并设置TTL),在处理消息前先检查event_id是否已被处理。
  1. 引入缓存来解决LLM记忆力的问题

查询链路长,LLM调用成本高、延迟大;多轮对话需要维持上下文记忆。

引入Redis作为多级缓存和会话存储。

多级缓存策略:

  • 缓存层1 (NL-to-Cypher): Key: HASH(用户问题字符串) -> Value: 生成的Cypher字符串。对于高频、通用的问题,可以秒级响应。
  • 缓存层2 (查询结果): Key: HASH(Cypher查询或向量查询) -> Value: 序列化的查询结果。对于“热点”数据查询,直接返回缓存结果,无需穿透到数据库。

缓存失效与更新:

  • 策略: 采用TTL(Time-To-Live)作为基本失效策略(例如,缓存1小时)。这是一种简单有效的折衷。
  • 进阶策略: 采用事件驱动的缓存失效。当知识沉淀流水线更新了某部分知识时,可以向一个专门的cache-invalidation Topic发送消息,订阅该Topic的服务负责精确地删除相关的Redis缓存。

LLM记忆性 (会话管理):

  • 方案: 使用Redis来存储每一轮对话的上下文。
  • 数据结构: 使用Redis的LISTZSETKey: conversation_id -> Value: [消息1, 消息2, ...].
  • 流程:
    1. 用户发起新一轮对话时,传入conversation_id
    2. 后端服务从Redis中加载最近的K轮对话历史。
    3. 将对话历史、当前问题、检索到的Evidence一起构建成Prompt,发送给LLM。
    4. 将当前的问答对(Q&A)存回Redis的conversation_id对应的列表中。
  • 框架集成: LangChain等框架的ConversationBufferMemory等记忆模块,可以很容易地与自定义的存储后端(如Redis)集成。
  1. 冗余数据

    如何在两个物理隔离的数据库间建立逻辑引用,并优化存储成本。

通过ID关联,各司其职。

Graph写入消费者先行: Graph消费者处理消息,通过MERGE操作在Neo4j中创建节点,Neo4j会自动为节点生成一个内部ID,但我们最好自己生成一个业务上的uuid并存为属性。

消息增强与转发: Graph消费者成功写入后,它需要将这个uuid写回消息中(或者发送一条包含chunk_idnode_uuid关联关系的新消息到另一个Topic)。

Vector写入消费者后行: Vector消费者监听这条被“增强”后的消息,这样在写入Milvus时,它就能拿到对应的graph_node_uuid,并将其存入Vector记录的metadata字段中。

效果: 这样就建立了一个从Vector到Graph的单向引用。查询时,可以从Vector结果中轻松找到对应的Graph节点,进行更深度的图遍历。

成本优化: 再次强调,Neo4j中只存储结构化数据和少量元数据。完整的原始文本块只在Milvus中存储一份。需要时,通过上述ID关联进行查询。

  1. 异构结果的融合策略

List<GraphPath>List<TextChunk> 如何合并成一个有序的 List<Evidence>

渲染-规范-融合-去重

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
function fuseAndNormalizeResults(graphResults, vectorResults, keywordResults) {
List<Evidence> evidenceList = new ArrayList<>();

// --- Step 1: Render & Normalize Graph Results ---
for (GraphPath path : graphResults) {
String renderedText = renderPathToText(path); // 关键函数:将图路径转为自然语言描述
float score = calculateGraphScore(path); // 关键函数:根据路径长度、关系权重等计算得分
evidenceList.add(new Evidence(renderedText, path.source, score, "graph"));
}

// --- Step 2: Normalize Vector & Keyword Results ---
evidenceList.addAll(normalizeVectorResults(vectorResults, "vector"));
evidenceList.addAll(normalizeVectorResults(keywordResults, "keyword"));

// --- Step 3: Reciprocal Rank Fusion (RRF) or simple score-based sort ---
// 简单的做法是直接按分数排序,但RRF效果更好,能平衡不同召回源的排序偏差
// 这里简化为排序
evidenceList.sort((e1, e2) -> Float.compare(e2.getScore(), e1.getScore()));

// --- Step 4: Deduplicate ---
// 基于内容相似性(如Jaccard相似度)或ID进行去重,保留最高分的
List<Evidence> deduplicatedList = deduplicate(evidenceList);

return deduplicatedList;
}
  1. 网络延迟处理

复杂的分布式调用链条;LLM生成代码的不可控性。

解决:全链路异步化 + 严格的防护栏与超时控制。

降低延迟:

  • 全链路异步: 在Java后端,大量使用CompletableFuture,让三路查询、结果融合、LLM调用等IO密集型操作完全异步化,最大化利用计算资源。
  • 连接池: 确保所有数据库和外部服务的客户端都配置了合理的连接池。

Cypher稳定性与安全:

  • 模板化与参数化: 对于特定意图的查询,与其让LLM从头生成,不如预定义一些Cypher“查询模板”,让LLM只负责填充模板中的参数。这极大提升了稳定性和安全性。
  • 执行前静态分析: 在执行LLM生成的Cypher前,可以使用openCypherfront-end库在Java中对其进行AST(抽象语法树)解析。通过分析AST,可以实施非常精细的规则,例如:
    • 禁止不带LIMIT的全图扫描。
    • 限制MATCH路径的最大长度。
    • 禁止写入操作(如CREATE, DELETE)。
  • 运行时超时: 在Neo4j Java Driver中,为每个查询设置一个严格的执行超时时间。这是最后的、也是最有效的防线,可以防止一个糟糕的查询拖垮整个数据库。

提示词工程

AI记忆力&多轮对话的实现

短期记忆

使用滑动窗口来实现,固定窗口大小,

在每次请求时,只将最近的K轮对话(例如,最近5轮)作为上下文(Context)连同当前问题一起发送给LLM。

但是容易出现:

上下文丢失 (Context Loss): 一旦对话超过K轮,最早的信息就会被永久丢弃。如果用户在第1轮提到的关键信息(如“我叫张三”),在第K+1轮时AI就忘了。

记忆僵化: 无法识别哪些历史信息是重要的,哪些是无关紧要的闲聊。

存储: 使用RedisLIST数据结构是最佳选择。每个conversation_id对应一个LIST

数据模型: LIST中的每个元素是一个JSON字符串,如{"role": "user", "content": "你好"}

流程:

  1. 用户发来消息,附带conversation_id
  2. 从Redis中用LRANGE conversation_id 0 K-1获取最近的K条记录。
  3. 将这K条记录和当前问题一起构建成Prompt。
  4. 获取LLM的回答后,将当前的用户问题和AI回答用LPUSHLTRIM命令存入LIST,保持其大小不超过K。

框架支持: LangChain中的ConversationBufferWindowMemory就是这个模式的直接实现。

中期记忆

随着对话的进行,不再保留全部历史,而是调用LLM对越来越长的对话历史进行滚动总结,形成一个摘要。每次请求时,将“历史摘要 + 最近几轮对话”作为上下文。

可以作为中期记忆,作为每个会话结束的总结实现

存储: 摘要可以存在RedisSTRINGHASH中,或者持久化到PostgreSQL/MySQL的某个字段。

流程:

  1. 设定一个阈值N(例如,每5轮对话触发一次总结)。
  2. 当对话达到N轮时,启动一个后台任务或在当前请求中,将当前的“旧摘要 + N轮对话”发送给LLM,并附上Prompt:“请总结以下对话…”。
  3. 将LLM返回的新摘要更新到数据库中。
  4. 构建下一次请求的Prompt时,结构为:"这是我们之前的对话摘要:{summary}。接下来是最近的几轮对话:{recent_history}。用户问题:{query}"

框架支持: LangChain中的ConversationSummaryBufferMemory提供了此功能。

长期个性化记忆

这是一种结构化长期记忆的方案。在对话过程中,实时地使用一个LLM(或专门的NLU模型)来抽取出对话中出现的关键实体、属性和关系,并将这些结构化信息存入知识图谱(如Neo4j)或关系型数据库。

可以个性化存储用户的消息,可以深度思考跨会话的消息

    1. Schema设计: 在Neo4j中定义用户中心节点:User,以及:Entity, :Preference, :Fact等节点。
    2. 异步抽取流程:
      • 在每轮对话结束后,将对话内容发送到一个后台处理服务
      • 该服务调用LLM,Prompt为:“从以下文本中抽取出人名、地名、用户偏好等实体信息,并以JSON格式返回…”。
      • 服务解析JSON,生成MERGE Cypher语句,如 MERGE (u:User {userId:'123'}) MERGE (p:Preference {type:'音乐', name:'周杰伦'}) MERGE (u)-[:HAS_PREFERENCE]->(p),更新知识图谱。
    3. 查询与注入:
      • 在每轮对话开始时,除了加载短期记忆,还要根据当前问题中的实体,查询知识图谱,获取相关背景知识。
      • 例如,如果用户问“给我推荐几首歌”,可以先查询图谱中该用户的音乐偏好,然后将“该用户喜欢周杰伦”这个事实注入到最终的Prompt中,让LLM做出更个性化的推荐。

记忆上下文构建

职责就是:根据当前的对话ID和用户输入,从各个记忆层中并行地获取相关信息,并按照预设的优先级和格式,组装成最终注入到LLM Prompt中的上下文(Context)。

API层接收到用户请求(convId, userId, userQuery)。

调用MemoryContextBuilder.buildPromptContext()。该方法内部并行启动三路记忆的加载。

构建器等待所有结果返回,然后按照长期 -> 中期 -> 短期的优先级组装成一个结构化的上下文字符串。

将组装好的contextuserQuery填入最终的LLM Prompt模板中。

调用LLM,并将结果流式返回给用户。

在后台,将当前的问答对ChatMessage异步地发送到Kafka的conversation_turns Topic,触发短期和长期记忆的更新。

通过会话超时或用户登出等事件,触发conversation_ended事件,更新中期记忆。

性能问题解决

  1. 查询链路P99延迟过高,在高并发下,即使用户平均响应时间尚可,但总有5%或1%的用户会经历无法忍受的慢响应。MemoryContextBuilder的并行设计虽好,但木桶的最短板决定了最终性能。
  • 数据库层面:比如图数据库Neo4j中查询的信息都要加上索引,增快查询速度,使用EXPLAINPROFILE进行分析,防止全表扫描,合理配置堆内存和页缓存。
  • 引入新的缓存,实现三级缓存架构,本地缓存,redis缓存,但是要注意数据的一致性,可以引用新的canel中间件来保证数据的最终一致性的问题
  • 线程池的隔离,并行CompletableFuture中,为不同的任务指定不同的专用线程池(Executor)。防止一个慢任务拖垮其他任务的工作

使用Jaeger/Zipkin来进行链路的追踪,具体是哪个环节出了问题,然后数据库查慢日志,Grafana仪表盘等等手段

  1. 上下文的剪切

计算组装好的上下文的总Token数。

如果超过预设阈值(如6000 Token),则调用一个更便宜、更快的模型(如GPT-3.5-Turbo, Llama-3-8B),让它对上下文进行压缩和筛选,Prompt:“请从以下信息中,筛选出与用户问题‘{userQuery}’最相关的部分,并保持在2000个Token以内。”

将压缩后的上下文用于最终的、昂贵的模型(如GPT-4o)调用。

  1. 建立模型的路由

用户问题首先经过一个分类器(可以是小LLM,也可以是传统模型),判断其意图是“简单问答”、“闲聊”还是“复杂推理”。根据分类结果,将请求路由到不同成本和能力的LLM模型。

  1. 幻觉脏数据处理

LLM在抽取实体和关系时可能产生幻觉,导致错误的信息被存入“长期记忆”,进而污染后续所有基于此记忆的回答。

创建一个“黄金标准”的测试集,包含上百个有代表性的(问题,预期答案,应检索到的上下文)三元组。

自动化回归测试: 在每次系统有重大更新(如更换Embedding模型、修改Prompt)后,自动化地运行整个测试集,并计算关键指标:

  • 检索质量: Precision@K, Recall@K, MRR (Mean Reciprocal Rank)。
  • 答案质量: 使用 LLM-as-a-Judge 模式,让GPT-4等强模型来评估生成答案的准确性、忠实度、相关性,并打分。

对比较重要的数据,进行人工审核的处理,用户层面可以点赞或者点踩,然后我们重点分析点踩的那一部分数据

  1. IO模型的更换

采用非阻塞I/O与响应式编程,Spring WebFlux + Project Reactor + Netty。这是现代Java构建高性能网络应用的黄金组合。

WebFlux使用事件循环(Event Loop)模型,用极少数的线程(如CPU核心数)就可以处理成千上万的并发连接。线程不会因等待数据(等待LLM返回token,等待网络写入)而被阻塞,而是去处理其他事件。

数据从LLM API流出,到写入响应给客户端的网络缓冲区,全程不应在你的后端服务中发生显著的、完整的数据聚合。

SSE

一种基于标准HTTP的、服务端到客户端的单向消息推送协议。它是为流式更新而生的。

Server-Sent Events(服务器发送事件)是一种允许服务器随时向客户端推送数据的Web技术。从本质上讲,它是在单个、持久的HTTP连接上实现的、从服务器到客户端的单向数据流

客户端一旦与服务器的SSE端点建立连接,这个连接就会保持打开状态。服务器可以随时通过这个连接,以一种特定的文本格式,将事件(数据)发送给客户端。

SSE的美妙之处在于其协议极度简单。服务器在响应时,必须将HTTP Header的Content-Type设置为text/event-stream。之后发送的内容遵循以下格式:

  • data: 这是最重要的字段,用于承载你要发送的数据。可以是一行,也可以是多行。

    1
    2
    data: 这是第一行数据
    data: 这是第二行数据

    在客户端,这两行数据会被拼接成"这是第一行数据\n这是第二行数据"

  • event: 用于为事件命名。如果省略,事件的默认名称是message。通过命名事件,你可以在前端为不同类型的消息绑定不同的监听器。

    1
    2
    event: user_login
    data: {"username": "Alice", "timestamp": 1678886400}
  • id: 为每个事件设置一个唯一的ID。这个ID的主要作用是实现断线重连。如果连接意外中断,浏览器在自动重连时,会把上次接收到的id值通过Last-Event-ID请求头发送给服务器。服务器可以根据这个ID,补发在断连期间可能丢失的数据。

    1
    2
    id: msg1
    data: some data
  • retry: 告诉浏览器在连接断开后,应该等待多少毫秒再尝试重连。

    1
    retry: 10000

    这表示10秒后重试。

一个完整的消息块由一个空行(\n\n)分隔。

1
2
3
4
5
6
7
8
9
id: event-1
event: temperature_update
data: {"location": "Tokyo", "temp": 25.5}
retry: 5000

id: event-2
event: humidity_update
data: {"location": "Tokyo", "humidity": 65}
retry: 5000

实例:

LLM聊天机器人响应流: (我们的核心场景) 逐字或逐词地将AI的回答推送到前端。

实时通知: 如社交网络的“你有新消息”或电商的“订单已发货”。

金融数据更新: 股票价格、加密货币行情等实时数据的推送。

新闻或体育比分直播: 将最新的事件和比分实时更新到页面。

系统监控仪表盘: 实时显示服务器的CPU、内存使用率等监控指标。

Spring Boot通过SseEmitter类,提供了对SSE的完美支持。它以非阻塞的方式工作,不会为每个连接都占用一个宝贵的线程。

nl2sql

一个基于自然语言生成SQL查询(Natural Language to SQL, NL2SQL)的智能体框架

该框架的核心作用是充当一个“翻译器”,将用户输入的自然语言问题(例如,“查询上个季度销售额最高的三个产品是什么?”)自动转换成对应数据库可以执行的SQL查询语句。

核心

基本流程:

输入理解 (Input Understanding):框架接收用户的自然语言输入。

意图识别与槽位填充:利用大语言模型的自然语言理解(NLU)能力,分析问句的意图(查询、统计、对比等)并提取关键信息(查询的指标、过滤条件、时间范围等)。

数据库元数据理解 (Schema Understanding):框架需要理解目标数据库的结构信息,包括表名、列表、字段类型、表之间的关联关系(主外键)等。这通常通过预先读取和处理数据库的schema来实现。

SQL生成 (SQL Generation):这是最核心的环节。框架会将用户的意图、关键信息与数据库的schema信息一同作为上下文(Context),构建成一个高效的提示(Prompt),然后提交给通义大语言模型。大模型根据这些上下文信息,生成准确的、符合特定数据库方言(如MySQL, PostgreSQL)的SQL查询语句。

SQL执行与结果返回 (Execution & Response):生成的SQL语句被发送到目标数据库执行,并将查询结果返回给用户。框架可能还会对结果进行格式化或进一步的自然语言解释。

设计方式&扩展优化:

基于大语言模型驱动,轻量级与可扩展,智能体框架

Schema链接与召回策略优化,对于拥有成百上千张表的复杂数据库,如何根据用户问题动态、准确地选择最相关的表和字段是提升SQL生成质量的关键。可以引入更先进的向量化匹配和知识图谱技术来优化这一过程。

搜索处理模糊和同义词,增强对业务术语、别名和同义词的理解能力,例如用户说“查一下GMV”,系统能自动关联到“成交总额”这一指标对应的字段。

复杂函数的处理,窗口函数与复杂分析:提升对需要使用窗口函数、公共表达式(CTE)等高级SQL特性的复杂分析类问题的支持。

跨数据源查询:支持用户的一个问题需要查询多个异构数据库才能回答的场景。

搜索代理

在聊搜索代理之前,就需要先搞清楚代理是什么,简单理解可以是:LLM 在自主循环中使用工具

那么搜索代理的简单定义:LLM 使用各种搜索或检索工具,动态地,按需地获取相关上下文

我在测试各种模型对于检索工具的调用的时候,发现模型能力差异导致的搜索结果不同,优秀的模型的搜索路径规划的更加合理,并且搜索的方向非常准确,而能力较差的模型,方向飘忽不定,结果也很一般

所以我发现影响搜索代理成功的因素之一是:更智能的模型,可以使搜索代理自主应对复杂问题并从错误中恢复

RAG 和搜索代理的区别

搜索代理和 RAG 的最主要的区别在于工作执行流程:

  • RAG 是预推理检索:系统预先从向量数据库检索出来结果之后输入给 LLM,这个时候 LLM 是被动的接受
  • 搜索代理是即时检索:LLM 是主动决策者,由 LLM 来决定搜索什么,怎么搜索,并且可以根据中间结果来主动的调整搜索路径

搜索代理和 RAG 还有一个在优化行为方式的区别:

搜索代理中有一种工具是 glob,这个工具是用来搜索和匹配文件名的,这个时候文件名也可以成为有效搜索因素,当一个代码库中的文件结构非常清晰,并且文件命名合理,在这种工作空间下搜索代理会非常有效,搜索代理的影响因素如下:

  • 文件夹的结构: 例如:tests/test_agent.ts 这个就表示测试模块中的测试 agent 功能的文件的隐性含义
  • 命名约定:例如:*.config.js 表示配置文件 、README.md 表示文档说明
  • 文件大小:可以隐含复杂性的背景信息
  • 时间戳:可以表示文件是最近修改的,隐含该文件相关性比较强(因为这个文件是最近修改的,那用户输入的问题很大概率是因为他刚刚修改这个文件产生的,我们可以仔细想想自己使用 ClaudeCode 或者 Cursor 的 Agent 模式的时候是不是这样的呢)

在这种构建方式下,代理可以逐层构建理解,仅在工作记忆中保留必要信息,这种自我管理的上下文窗口使代理专注于相关子集,而不是被大量但可能无关的信息淹没。

而对于 RAG 来说,只能重新构建嵌入文档块了,而不是像搜索代理这样随心所欲的动态更换一下文件名就可以,这也是一个关键区别搜索代理的维护成本比较低,开发起来比较快

当然,事物总是相对的,RAG 还是有优秀的地方的

  • 在大量数据时,RAG 的检索会比搜索代理快很多
  • RAG 的开发构建策略是平稳的,有成熟的构建搜索方案:分叉索引、父文档索引等,但是搜索代理要有效的话,需要有深思熟虑的工程实践,要有成熟的指导开发方案,不如搜索代理会很容易进入死循环和无效检索来浪费上下文,在这个方向上,搜索代理的开发难度比 RAG 大多了

那我们来总结一下 RAG 和搜索代理的区别点

  1. 工作执行流程的区别,RAG 是预推理检索,搜索代理是即时检索
  2. 优化行为方式的区别,搜索代理的优化方向更多,策略是可以动态调整的,RAG 需要重新构建文档块
  3. 开发和维护成本的区别:搜索代理的维护成本比较低,文件系统工具即时开发,不需要像 RAG 那样整理文档块,然后构建向量数据库等,并且搜索代理调整和维护比较容易
  4. 大量数据的检索速度区别:在大量数据的前提下,RAG 检索速度回比搜索代理快很多
  5. 有效搜索代理开发难度大:要想搜索代理在实际的场景中有效,需要有合适的策略指导

应用场景

1、 对于上下文信息密度有较高的要求:例如编程开发领域,开发者使用代码来实现自己严密的业务逻辑,这种场景下使用代理搜索会非常合适

倘若我们采用 RAG 的方式来检索,我们需要对代码库进行文档块的分割,就不能像文本那样简单的按照行数来分割了,因为代码上下文之间有很强的逻辑依赖,如果文档块中少一行 if 判断,表达的意思就是天差地别了,比较合适的的方法是结合语法树和代码依赖关联来进行分割,

所以这种情况下,不仅存入向量数据库的过程更麻烦,检索时也因为内部机制不透明,导致开发者很难看清检索的全貌,调试起来也更困难。

2、动态上下文不多:要动态传入给 LLM 的动态外部数据不多的时候,搜索代理很适用,例如:中小型代码库、一些公司内部客服手册这种相对固定的数据

我非常推荐开发者在构建大模型应用初期的时候,完全可以优先考虑使用搜索代理来实现外部数据的引入,在应用构建初期的时候,遵循着快速迭代的原则,搜索代理的开发难度和开发效率相比复杂的 RAG 是有优势的

3、偏向简单有效原则构建 Agent:大模型应用开发这个领域是在快速发展的,而且大部分都是向着“LLM 越来越智能”这个方向前行,构建简单的模块和应用是最合适的,方便应用和模块进行迭代,像渐进式构建有效应用的原则一样

具体实现

  • 一个成熟的执行工具需要兼容多种系统的版本,这个时候是比较复杂的
  • 工具的描述和名称要定义的合理,表达的清晰,不然模型会混乱调用

FileRead负责最后的内容读取、ListDirectory负责第一层的目录搜索、Glob负责第二层文件名的搜索、Grep负责第三层文件内容的搜索

工具

实现 GrepTool 工具对象时,需要一些工具的参数定义,比较重要的是工具参数

  • pattern:搜索的正则表达式
  • path:搜索路径(目录)
  • include:文件模式过滤
 GrepTool工具的实现
import { InternalTool, InternalToolContext } from '../types.js'; import { ripGrep } from '../../utils/ripgrep.js'; export const TOOL_NAME_FOR_PROMPT = 'GrepTool'; export const DESCRIPTION = ` - 适用于任何代码库大小的快速内容搜索工具 - 使用正则表达式搜索文件内容 - 支持完整的正则表达式语法(例如 "log.*Error"、"function\\s+\\w+" 等) - 使用 include 参数按模式过滤文件(例如 "*.js"、"*.{ts,tsx}") - 返回按修改时间排序的匹配文件路径 - 当你需要查找包含特定模式的文件时使用此工具 - 当你进行可能需要多轮 glob 和 grep 的开放式搜索时,请改用 Agent 工具 `; /** * GrepTool 参数定义 */ export interface GrepToolArgs { /** 搜索的正则表达式模式 */ pattern: string; /** 搜索路径(目录) */ path?: string; /** 文件模式过滤 */ include?: string; } /** * GrepTool 返回结果 */ export interface GrepToolResult { /** 匹配的文件路径 */ matches: string[]; /** 匹配总数 */ count: number; } /** * GrepTool 处理函数 */ const grepToolHandler = async ( args: GrepToolArgs, context?: InternalToolContext ): Promise<GrepToolResult> => { const { pattern, path = context?.cwd || process.cwd(), include } = args; // 构建 ripgrep 参数 const rgArgs: string[] = []; // 只返回文件路径 rgArgs.push('-l'); // --files-with-matches // 文件模式过滤 if (include) { rgArgs.push('--glob', include); } // 搜索模式 rgArgs.push(pattern); // 执行搜索 const abortSignal = context?.abortSignal || new AbortController().signal; const results = (await ripGrep(rgArgs, path, abortSignal)) as string[]; return { matches: results, count: results.length, }; }; /** * GrepTool 工具定义 */ export const GrepTool: InternalTool<GrepToolArgs, GrepToolResult> = { name: 'grep_search', category: 'search', internal: true, description: DESCRIPTION, version: '1.0.0', parameters: { type: 'object', properties: { pattern: { type: 'string', description: 'The regular expression pattern to search for in file contents', }, path: { type: 'string', description: 'The directory to search in. Defaults to the current working directory.', }, include: { type: 'string', description: 'File pattern to include in the search (e.g. "*.js", "*.{ts,tsx}")', }, }, required: ['pattern'], }, handler: grepToolHandler, };

在实现这个 grepTool 工具函数的时候,使用的是 ripgrep 命令,这个命令执行的速度非常快,比一般的搜索命令要快很多,在构建这个命令执行的函数需要注意两点:

  1. Mac 系统需要构建执行文件的签名
  2. 命令执行的时候需要判断,如果本地电脑没有 rg 这个命令,就需要使用 vendor 里面的二进制文件来执行了

使用命令 ripgrep:https://github.com/BurntSushi/ripgrep

 ripgrep命令的实现
import { fileURLToPath } from 'url'; import path from 'path'; import { findActualExecutable } from 'spawn-rx'; import { execFile } from 'child_process'; import { execFileNoThrow } from './execFileNoThrow.js'; import { config } from '../config/env.js'; const __filename = fileURLToPath(import.meta.url); let __dirname = path.dirname(__filename); console.log(config.nodeEnv); if (config.nodeEnv === 'development' || config.nodeEnv === 'test') { __dirname = path.resolve(__dirname, '..', '..'); } function ripgrepPath() { //检查本地是否安装了ripgrep const { cmd } = findActualExecutable('rg', []); if (cmd !== 'rg') { return cmd; } else { const rgRoot = path.resolve(__dirname, 'vendor', 'ripgrep'); console.log(rgRoot); const ret = path.resolve( rgRoot, `${process.arch}-${process.platform}`, 'rg' ); console.log(ret); return ret; } } export async function ripGrep( args: string[], target: string, abortSignal: AbortSignal ) { //mac电脑的签名 await macSignature(); const rgPath = ripgrepPath(); return new Promise(resolve => { execFile( ripgrepPath(), [...args, target], { maxBuffer: 1_000_000, signal: abortSignal, timeout: 10_000, }, (error, stdout) => { if (error) { if (error.code !== 1) { console.log(error); } resolve([]); } else { resolve(stdout.split('\n').filter(Boolean)); } } ); }); } //mac电脑的签名 let alreadSign = false; async function macSignature() { if (process.platform !== 'darwin') { return ''; } alreadSign = true; console.log('开始验证签名...'); const lines = ( await execFileNoThrow( 'codesign', ['-vv', '-d', ripgrepPath()], undefined, undefined, false ) ).stdout.split('\n'); console.log(lines); const needsSigned = lines.find(line => line.includes('linker-signed')); if (!needsSigned) { console.log('不需要签名'); return; } try { console.log('生成ripgrep签名'); const signResult = await execFileNoThrow('codesign', [ '--sign', '-', '--force', '', '--preserve-metadata=entitlements,requirements,flags,runtime', ripgrepPath(), ]); if (signResult.code !== 0) { console.log('签名失败', signResult.stderr); } console.log('移除应用隔离标记'); const quarantineResult = await execFileNoThrow('xattr', [ '-d', 'com.apple.quarantine', ripgrepPath(), ]); if (quarantineResult.code !== 0) { console.log('移除应用隔离标记失败', quarantineResult.stderr); } console.log('签名成功'); } catch (error) { console.error('签名生成失败', error); } }

GlobTool 工具实现

实现 GlobTool 工具对象的时候,相应的工具参数的定义:

  • pattern:Glob 匹配模式
  • path:搜索路径
 GlobTool工具的实现
import { InternalTool, InternalToolContext } from '../types.js'; import { glob } from '../../utils/file.js'; export const TOOL_NAME_FOR_PROMPT = 'GlobTool'; export const DESCRIPTION = `- 适用于任何代码库大小的快速文件模式匹配工具 - 支持像 "**/*.js" 或 "src/**/*.ts" 这样的 glob 模式 - 返回按修改时间排序的匹配文件路径 - 当你需要通过名称模式查找文件时使用此工具 - 当你进行可能需要多轮 glob 和 grep 的开放式搜索时,请改用 Agent 工具 `; /** * GlobTool 参数定义 */ export interface GlobToolArgs { /** Glob 匹配模式 */ pattern: string; /** 搜索路径(目录) */ path?: string; } /** * GlobTool 返回结果 */ export interface GlobToolResult { /** 匹配的文件路径 */ files: string[]; /** 是否被截断 */ truncated: boolean; /** 匹配总数 */ count: number; } /** * GlobTool 处理函数 */ const globToolHandler = async ( args: GlobToolArgs, context?: InternalToolContext ): Promise<GlobToolResult> => { const { pattern, path = context?.cwd || process.cwd() } = args; // 执行 glob 搜索 const abortSignal = context?.abortSignal || new AbortController().signal; const result = await glob( pattern, path, { limit: 100, offset: 0 }, abortSignal ); return { files: result.files, truncated: result.truncated, count: result.files.length, }; }; /** * GlobTool 工具定义 */ export const GlobTool: InternalTool<GlobToolArgs, GlobToolResult> = { name: 'glob_search', category: 'search', internal: true, description: DESCRIPTION, version: '1.0.0', parameters: { type: 'object', properties: { pattern: { type: 'string', description: 'The glob pattern to match files (e.g., "**/*.ts", "src/**/*.{js,jsx}")', }, path: { type: 'string', description: 'The directory to search in. Defaults to the current working directory.', }, offset: { type: 'number', description: 'Number of results to skip (for pagination, default: 0)', }, limit: { type: 'number', description: 'Maximum number of results to return (default: 100)', }, }, required: ['pattern'], }, handler: globToolHandler, };

这个命令的实现相对来说比较简单,使用 glob 框架就可以搞定

 glob命令的实现
import { glob as globLib } from 'glob'; //匹配搜索文件目录下的文件 export async function glob( filePattern: string, cwd: string, { limit, offset }: { limit: number; offset: number }, abortSignal: AbortSignal ) { const paths = await globLib([filePattern], { cwd, nocase: true, nodir: true, signal: abortSignal, stat: true, withFileTypes: true, }); const sortedPaths = paths.sort((a, b) => (a.mtimeMs ?? 0) - (b.mtimeMs ?? 0)); const truncated = sortedPaths.length > offset + limit; return { files: sortedPaths .slice(offset, offset + limit) .map(path => path.fullpath()), truncated, }; }

FileReadTool 工具实现

实现这个文件内容读取的工具对象,增加了一个类似于分页读取内容的功能,避免一次读取太多,相应的工具参数定义:

  • file_path:文件路径
  • offset:起始行号
  • limit:最大读取行数
 FileReadTool工具实现
import { InternalTool, InternalToolContext } from '../types.js'; import { readFileContent } from '../../utils/file.js'; const MAX_LINES_TO_READ = 2000; const MAX_LINE_LENGTH = 2000; export const DESCRIPTION = '从本地文件系统读取文件。'; export const PROMPT = `从本地文件系统读取文件。file_path 参数必须是绝对路径,而不是相对路径。默认情况下,它从文件开头读取最多 ${MAX_LINES_TO_READ} 行。你可以选择指定行偏移量和限制(对于长文件特别有用),但建议通过不提供这些参数来读取整个文件。任何超过 ${MAX_LINE_LENGTH} 个字符的行将被截断。对于图像文件,该工具将为你显示图像。`; /** * FileReadTool 参数定义 */ export interface FileReadToolArgs { /** 文件路径 */ file_path: string; /** 起始行号(从0开始) */ offset?: number; /** 最大读取行数 */ limit?: number; } /** * FileReadTool 返回结果 */ export interface FileReadToolResult { /** 文件内容 */ content: string; /** 返回的行数 */ lineCount: number; /** 文件总行数 */ totalLines: number; /** 文件路径 */ filePath: string; } /** * FileReadTool 处理函数 */ const fileReadToolHandler = async ( args: FileReadToolArgs, context?: InternalToolContext ): Promise<FileReadToolResult> => { const { file_path, offset = 0, limit } = args; // 读取文件内容 const result = await readFileContent(file_path, offset, limit); return { content: result.content, lineCount: result.lineCount, totalLines: result.totalLines, filePath: file_path, }; }; /** * FileReadTool 工具定义 */ export const FileReadTool: InternalTool<FileReadToolArgs, FileReadToolResult> = { name: 'read_file', category: 'filesystem', internal: true, description: DESCRIPTION, version: '1.0.0', parameters: { type: 'object', properties: { file_path: { type: 'string', description: 'The absolute or relative path to the file to read', }, offset: { type: 'number', description: 'Starting line number (0-indexed, default: 0)', }, limit: { type: 'number', description: 'Maximum number of lines to read (omit to read entire file)', }, }, required: ['file_path'], }, handler: fileReadToolHandler, };

这个文件内容读取工具使用的是 Node 的 fs 模块中的文件读取命令,需要重点注意的是文件编码格式要获取,不一定文件编码格式都是 utf-8

 文件读取命令实现
//读取文件内容 export async function readFileContent( filePath: string, offset: number = 0, maxLines?: number ): Promise<{ content: string; lineCount: number; totalLines: number }> { //TODO:获取文件的编码格式 - 默认UTF-8这个原本是需要写一个文件编码获取的函数 const enc = 'utf-8'; //读取文件内容 const content = await readFileSync(filePath, enc); //按行切割 - 跨平台的兼容要使用"/\r?\n/" const lines = content.split('\n'); //整理返回结果 const toReturn = maxLines !== undefined && lines.length - offset > maxLines ? lines.slice(offset, offset + maxLines) : lines.slice(offset); return { content: toReturn.join('\n'), lineCount: toReturn.length, totalLines: lines.length, }; }

ListDirectoryTool 工具实现

该工具是文件目录结构查询,工具函数的参数只有一个:

  • path:要列出目录的路径
 ListDirectoryTool工具函数的实现
import { InternalTool, InternalToolContext } from '../types.js'; import { listDirectory, createFileTree, printTree, } from '../../utils/listDirectory.js'; export const DESCRIPTION = '列出指定路径中的文件和目录。path 参数必须是绝对路径,而不是相对路径。如果你知道要搜索哪些目录,通常应优先使用 Glob 和 Grep 工具。'; /** * ListDirectoryTool 参数定义 */ export interface ListDirectoryToolArgs { /** 要列出的目录路径 */ path?: string; } /** * ListDirectoryTool 返回结果 */ export interface ListDirectoryToolResult { /** 文件树格式输出 */ tree: string; /** 文件路径列表 */ files: string[]; /** 总数 */ count: number; } /** * ListDirectoryTool 处理函数 */ const listDirectoryToolHandler = async ( args: ListDirectoryToolArgs, context?: InternalToolContext ): Promise<ListDirectoryToolResult> => { const { path = context?.cwd || process.cwd() } = args; // 执行目录列出 const abortSignal = context?.abortSignal || new AbortController().signal; const files = listDirectory(path, context?.cwd || process.cwd(), abortSignal); // 创建文件树 const tree = createFileTree(files); const treeOutput = printTree(tree); return { tree: treeOutput, files, count: files.length, }; }; /** * ListDirectoryTool 工具定义 */ export const ListDirectoryTool: InternalTool< ListDirectoryToolArgs, ListDirectoryToolResult > = { name: 'list_directory', category: 'filesystem', internal: true, description: DESCRIPTION, version: '1.0.0', parameters: { type: 'object', properties: { path: { type: 'string', description: 'The directory path to list. Defaults to the current working directory.', }, }, required: [], }, handler: listDirectoryToolHandler, };

在实现该工具函数的中,没有使用递归函数去实现,担心栈溢出,使用的是一个广度搜索,需要注意的是两个格式化的函数

  • 将搜索出来的文件数组转换为 Tree 格式的数据结构
  • 将 Tree 格式的数据结构转换为空格隔开的目录字符串
 格式化函数和搜索算法实现
import { readdirSync } from 'fs'; import { basename, join, relative, sep } from 'path'; import { getCwd } from './common.js'; export function listDirectory( initialPath: string, cwd: string, abortSignal: AbortSignal ): string[] { const results: string[] = []; const queue = [initialPath]; while (queue.length > 0) { if (results.length > 500) { return results; } if (abortSignal.aborted) { return results; } const path: any = queue.shift(); if (skip(path)) { continue; } if (path !== initialPath) { results.push(relative(cwd, path) + sep); } let children; try { children = readdirSync(path, { withFileTypes: true, }); } catch (error) { console.error(error); continue; } for (const child of children) { if (child.isDirectory()) { queue.push(join(path, child.name) + sep); } else { const fileName = join(path, child.name); if (skip(fileName)) { continue; } results.push(relative(cwd, fileName)); if (results.length > 500) { return results; } } } } return results; } function skip(path: string): boolean { if (path !== '.' && basename(path).startsWith('.')) { return true; } if (path.includes(`__pycache__${sep}`)) { return true; } return false; } /** * 为了让结果更加可读和节省Token,要创建两个对于结果进行格式化的函数 * - createFileTree * - printTree */ type TreeNode = { name: string; path: string; type: 'file' | 'directory'; children?: TreeNode[]; }; export function createFileTree(sortedPaths) { const root: TreeNode[] = []; for (const path of sortedPaths) { const parts = path.split(sep); let currentLevel: any = root; let currentPath = ''; for (let i = 0; i < parts.length; i++) { const part = parts[i]; if (!part) { continue; } currentPath = currentPath ? `${currentPath}${sep}${part}` : part; const isLastPart = i === parts.length - 1; const existingNode = currentLevel.find(node => node.name === part); if (existingNode) { currentLevel = existingNode.children || []; } else { const newNode: TreeNode = { name: part, path: currentPath, type: isLastPart ? 'file' : 'directory', }; if (!isLastPart) { newNode.children = []; } currentLevel.push(newNode); currentLevel = newNode.children || []; } } } return root; } /** * eg. * - src/ * - index.ts * - utils/ * - file.ts * @param tree * @param level * @param prefix * @returns */ export function printTree(tree: TreeNode[], level = 0, prefix = ''): string { let result = ''; if (level == 0) { result += `- ${getCwd()}${sep}\n`; prefix = ' '; } for (const node of tree) { result += `${prefix}${'-'} ${node.name}${node.type === 'directory' ? sep : ''}\n`; if (node.children && node.children.length > 0) { result += printTree(node.children, level + 1, `${prefix} `); } } return result; }

工具管理

我们定义好这些工具之后,需要将这个工具提供给 LLM,并且 LLM 返回工具执行命令的时候,需要调用相应工具的函数执行,最后将结果返回给 LLM,所以这里需要创建一个工具管理器

我的实现思路:

  • 提供一个方法用来注册工具函数:registerAllTools
  • 提供一个方法用来获取注册好的工具函数:getTools
  • 提供一个工具执行函数:execute
 工具管理的实现
import { InternalTool, InternalToolContext, FormattedToolDefinition, } from './types.js'; import { GrepTool } from './GrepTool/GrepTool.js'; import { ListDirectoryTool } from './ListDirectoryTool/ListDirectoryTool.js'; import { GlobTool } from './GolbTool/GlobTool.js'; import { FileReadTool } from './FileReadTool/FileReadTool.js'; const toolsList = [GrepTool, GlobTool, ListDirectoryTool, FileReadTool]; /** * 工具管理类 * 负责工具的注册、查询和执行 */ export class ToolManager { private tools: Map<string, InternalTool> = new Map(); constructor() { // 在构造函数中自动注册所有工具 this.registerAllTools(); } /** * 注册所有工具 * 显式列出所有要注册的工具 */ private registerAllTools() { const tools = [...toolsList]; tools.forEach(tool => { this.tools.set(tool.name, tool); }); } /** * 执行指定工具 * @param name 工具名称 * @param args 工具参数 * @param context 可选的上下文参数(如 abortSignal) * @returns 工具执行结果 */ async execute<TArgs = any, TResult = any>( name: string, args: TArgs, context?: InternalToolContext ): Promise<TResult> { const tool = this.tools.get(name); if (!tool) { throw new Error( `Tool '${name}' not found. Available tools: ${this.getToolNames().join(', ')}` ); } try { const result = await tool.handler(args, context); return result; } catch (error) { console.error(`Tool '${name}' execution failed:`, error); throw error; } } /** * 获取所有已注册的工具 * @returns 工具数组 */ getTools(): InternalTool[] { return Array.from(this.tools.values()); } }

在使用的时候,因为供应商的不同,对于 api 中的 tool 参数的格式也是不一样的,所以这个需要在定义 LLM 类的时候,自己提供一个工具格式转换方法,例如 Openai 的参数格式是

LLM调用工具

关于 LLM 的调用中,最核心的思路是:LLM 循环执行并且判断,并且设置最大的循环数,不要陷入死循环

A2A