AI技术源码分析

AI技术源码分析
mengnankkzhouSpring AI Alibaba Graph
旨在利用 Spring AI 作为桥梁,连接阿里巴巴的大语言模型(如通义千wen)和图数据库(如阿里云 GDB),以实现通过自然语言与复杂关联数据进行交互的目的。
底层核心原理是 Text-to-Cypher/Gremlin,即将用户的自然语言问题,通过大语言模型(LLM)动态翻译成图数据库的专业查询语言。
设计思想
图数据库(Graph Database)非常擅长处理实体之间的复杂关系,例如社交网络中的“朋友的朋友”、金融风控中的“资金链路”、电商中的“共同购买”等。但要查询这些数据,需要使用专门的图查询语言,如 Gremlin (阿里云 GDB 支持) 或 Cypher。这些语言学习门槛高,普通业务人员无法直接使用。
“Spring AI Alibaba Graph”架构的核心思想就是解决这个问题:让 LLM 充当一个智能的“翻译官”。
流程:
赋予 LLM 上下文知识 (Context Awareness):一个通用的 LLM 不知道你的图数据库里有什么。因此,必须在查询时,动态地告诉 LLM 图的“Schema”(模式),包括:
- 节点标签 (Node Labels):例如
Person(人)、Company(公司)、Product(产品)。 - 边标签 (Edge Labels):例如
WORKS_FOR(为…工作)、INVESTED_IN(投资了)、BOUGHT(购买了)。 - 属性 (Properties):每个节点和边上可能有的属性,例如
Person节点有name,age属性。
利用 LLM 的代码生成能力 (Code Generation):大语言模型本质上是一个强大的序列生成器,它不仅能生成文章,也能生成代码。通过精巧的提示工程 (Prompt Engineering),我们可以引导 LLM 将用户的自然语言问题,结合我们提供的图 Schema,生成一段完全正确的 Gremlin 或 Cypher 查询代码。
自动化执行与反馈:生成查询代码后,程序自动执行它,并将结果返回。为了提升用户体验,甚至可以将图数据库返回的结构化数据再次交给 LLM,让它用自然语言进行总结和概括。
架构
实际流程:
用户提问: 用户通过前端界面或 API,输入自然语言问题:“查询投资了‘阿里巴巴’的所有人的姓名”。
Spring 应用接收: 一个基于 Spring Boot 和 Spring AI 构建的后端服务接收到这个请求。
构建精确的 Prompt: 这是最关键的一步。程序会动态构建一个发给 LLM 的 Prompt,它通常包含以下部分:
- 入口 : 开发者代码调用
graphClient.query("查詢投資了阿里巴巴的所有人")。
委派 :DefaultGraphClient将请求委派给graphChatService.chat("...")。
检索(Retrieval) :graphChatService首先调用其持有的graphStore.getSchema()方法。Neo4jGraphStore(假设)会连接 Neo4j 数据库,执行CALL db.schema.visualization()或类似命令,获取 Schema 并格式化为文本。
构建(Prompt Construction) :graphChatService使用GraphChatPromptFactory,将上一步获取的 Schema 文本和用户问题填充到一个预设的模板中,生成最终的Prompt对象。
生成(Generation) :graphChatService调用chatClient.call(prompt)。spring-ai-alibaba-tongyi-starter捕获此调用,向通义千问 API 发送请求。
解析(Parsing) : 通义千问返回一个包含 Cypher 查询的ChatResponse。graphChatService使用GraphCypherResponseParser从中提取出纯净的 Cypher 查询字符串。
执行(Execution) :graphChatService拿到查询字符串后,调用graphStore.execute(cypherQuery)。返回 :Neo4jGraphStore使用 Neo4j Java Driver 执行查询,并将结果返回。这个结果最终沿着调用链返回给开发者。
三大核心
GraphStore,图存储的统一抽象
设计目的 : 将上层的“Text-to-Query”逻辑与底层具体的图数据库(Neo4j, NebulaGraph, 阿里云 GDB 等)完全解耦。无论底层是什么图数据库,对上层来说,它们都只是一个 GraphStore
核心方法:
getSchema() : 这是实现 RAG 的信息来源 。此方法负责连接到底层数据库,并提取其 Schema 信息(节点标签、边标签、属性等),将其格式化为一个可以喂给 LLM 的字符串。
execute(String query) : 这是执行查询的统一入口。它接收 LLM 生成的查询字符串(如 Cypher 或 Gremlin),并通过具体的数据库驱动来执行它,然后返回结果。
设计模式 : 策略模式(Strategy Pattern) 。用户可以根据自己的数据库类型,注入不同的 GraphStore 实现(例如 Neo4jGraphStore , TinkerPopGremlinGraphStore ),而上层 GraphClient 的代码无需任何改动。一个抽象类的实现
问题and挑战:
GraphStore.getSchema()的“信息密度”挑战
“提示词爆炸” (Prompt Explosion) 与成本问题:当图的 Schema 变得非常庞大时(成百上千种节点和边),getSchema() 返回的字符串会非常长。这不仅会急剧增加调用 LLM 的 Token 成本,更严重的是,过长的、充斥着大量无关信息的上下文,反而可能降低 LLM 的理解和推理能力,导致生成错误的查询。
Schema 的“知识”密度不足:Schema 只定义了“结构”,但没有定义“语义”。例如,一个 TRANSACTION 的边,其 amount 属性是正是负代表什么业务含义?LLM 仅从 Schema 中无法得知。
解决:
一个更先进的 GraphStore 实现,不应该仅仅是“暴力”地返回整个 Schema。它应该进化为一个“Schema 智能检索器”:
- Schema 知识图谱化/向量化:将图的 Schema 本身(节点标签、边标签、属性及其注释)也构建成一个小型知识图谱或将其向量化存储。
- 两阶段 RAG:当用户提问时,第一阶段,先对用户的问题进行向量相似度搜索,从海量的 Schema 中检索出与问题最相关的子图 Schema(例如,问题关于“交易”,就只提取
Account,Transaction等相关的节点和边)。第二阶段,再将这个高度相关的、精简的子图 Schema 喂给 LLM 去生成查询。 - 动态数据样本:
getSchema()还可以被增强为getContext(),除了返回 Schema,还能动态地从图中抓取几条与问题相关的实际数据样本(例如,几条交易记录),一并作为上下文提供给 LLM。这能极大地帮助 LLM 理解数据格式和语义,提升查询准确率。
GraphClient :面向用户的核心入口
设计目的 : 隐藏所有内部复杂的交互流程(获取 Schema -> 构建 Prompt -> 调用 LLM -> 解析响应-> 执行查询),为最终用户提供一个极其简单的接口。
核心方法剖析:
query(String question, ...): 这是最核心的方法。开发者只需要传入自然语言问题,就可以得到图数据库的查询结果。它的默认实现DefaultGraphClient会编排下面要讲的GraphChatService和GraphStore来完成整个流程。
GraphChatService :LLM 交互与智能的核心
设计目的 : 专门负责处理与大语言模型(LLM)之间的所有交互,实现核心的“Text-to-Query”转换逻辑。
核心:
依赖注入 : 它会被注入一个 GraphStore 实例和一个 Spring AI 的 ChatClient 实例。
Prompt 工厂 : 内部会使用一个 GraphChatPromptFactory 来创建 Prompt。这个工厂是提示工程(Prompt Engineering) 的具体体现,它会接收 GraphStore.getSchema() 获取到的 Schema 和用户问题,然后把它们组装成一个结构化、高质量的 Prompt 模板。
调用 LLM : 它使用注入的 ChatClient 将生成的 Prompt 发送给 LLM(例如通义千问)。响应解析 : 它还包含一个 ResponseParser 。因为 LLM 的返回内容可能不只是纯粹的查询语句(例如,可能包含在 Markdown 的 cypher ... 代码块中,或者有一些解释性的文字), ResponseParser 负责从 LLM 的原始响应中精确地提取出可执行的查询代码 。这一步是保证系统稳定性的关键。
Google GraphRAG
Google GraphRAG 的核心目标是从非结构化文本中构建图,以优化 RAG 的检索效果。
传统的向量检索(Vector Search)只能找到与问题“表面相似”的文本片段,却忽略了文档内部和文档之间隐藏的深层语义关联。GraphRAG 的核心思想就是通过从文本中提取实体和关系来构建知识图谱,并利用图的社区结构来发现这些深层关联,从而为 LLM 提供更全面、更具上下文的背景信息。
图的角色:在这里,图不是最终的知识库,而是一个为了提升检索质量而生成的“中间索引结构”。最终答案的来源(Source of Truth)依然是原始的文本。
GraphRAG 的流程分为“离线索引”和“在线查询”两个阶段。
阶段一:离线索引(数据预处理)
- 实体与关系提取 (Information Extraction):系统读取所有源文档(如 PDF, Word, TXT),利用 LLM 遍历这些文本,提取出关键的实体(人、事、物、概念)以及它们之间的关系。
- 图谱构建 (Graph Construction):将提取出的实体作为“节点”,关系作为“边”,构建一个全局的知识图谱。
- 社区检测 (Community Detection):在构建好的图上运行图算法(如 Leiden 算法),将整个图划分成若干个高内聚、低耦合的“语义社区”。每个社区代表了一组在语义上紧密相关的主题或事件。
- 社区摘要 (Community Summarization):利用 LLM 为每一个检测出的社区生成一个高度概括的摘要描述。
阶段二:在线查询(应答用户提问)
- 用户提问:用户提出一个问题。
- 多路检索 (Multi-path Retrieval):系统并行地在两个层面上进行检索:
- 全局检索:在所有“社区摘要”上进行向量检索,快速定位到与问题最相关的几个社区。
- 局部检索:同时,也在所有原始文本块上进行传统的向量检索。
- 上下文构建 (Context Building):这是 GraphRAG 的核心创新。一旦通过全局检索定位到了一个或多个相关社区,系统不再是只返回几个零散的文本块,而是将整个社区内的所有实体、关系、以及关联的原始文本块,作为一个完整的、结构化的上下文提供出来。
- 生成答案 (Answer Generation):将这个极其丰富且上下文完整的“社区信息包”喂给 LLM,让它基于这些信息生成最终的、高质量的答案。
知识图谱Graph-bulider
基本流程
- 定义图谱模式
节点类型 (Entity Types): 定义你要抽取的实体类别,例如:Person (人物), Organization (组织), Product (产品), Location (地点)。
关系类型 (Relation Types): 定义实体之间可能存在的关系,例如:WORKS_AT (任职于), FOUNDED (创立了), LOCATED_IN (位于)。
属性 (Properties): 定义节点或关系可能包含的属性,例如 Person 节点可以有 name, birth_date 等属性。
类似于E-R图,实体对象,以及他们的关系和属性
- 数据准备与预处理
收集用于信息抽取的原始非结构化文本数据,如新闻文章、公司报告、技术文档等。根据需要进行清洗,去除无关信息(如HTML标签、广告等)。
提示词promt工程,注入属性查询
- 在Prompt中清晰地列出预定义的节点类型和关系类型。
- 要求LLM以标准、易于解析的格式(如JSON、JSON-LD)返回结果。这对于后续的自动化处理至关重要。指定输入输出格式
- 提供示例 (Few-shot Learning): 在Prompt中给出1-2个“输入文本 -> 输出JSON”的例子,能极大提升LLM的抽取准确率和格式一致性。提供示例,便于LLM进行输入输出
1 | 你是一个信息抽取专家。请从下面的文本中,根据我提供的Schema,抽取出所有实体和关系。 |
- 调用LLM进行抽取
将准备好的文本和精心设计的Prompt批量发送给LLM API,获取结构化的输出结果。
- 后处理与校验
格式校验: 验证返回的是否是合法的JSON。
实体对齐/归一化: “阿里”、”阿里巴巴”、”Alibaba” 可能指向同一实体,需要进行归一化处理。
关系校验: 检查关系是否符合Schema定义(例如,WORKS_AT 关系必须连接 Person 和 Company)。
6.数据持久化
将校验过的实体和关系数据转换为图数据库(如Neo4j)的查询语句(通常是Cypher),然后批量导入到数据库中,完成知识图谱的构建。
查询语言 Cypher
create node:
1 | CREATE (p:Person {name: 'Alice', born: 1990}); |
create friends:
1 | MATCH (a:Person {name: 'Alice'}), (b:Person {name: 'Bob'}) |
match node:
1 | MATCH (p:Person) RETURN p; |
match node by where:
1 | // 查找名字是'Alice'的人 |
set/update/remove:
1 | // 给'Alice'添加一个email属性 |
friend:
1 | // 删除'Alice'和她所有朋友的关系 |
1 | // 删除没有关系的节点 |
index:
1 | CREATE INDEX person_name_index FOR (p:Person) ON (p.name); |
优化检索
- 智能分块:
按语义分块: 利用NLP库(如NLTK, spaCy)或LangChain中的RecursiveCharacterTextSplitter,尝试按句子、段落、Markdown标题等语义边界进行切分,并设置重叠区域(overlap)来保留上下文联系。
面向实体/命题的分块: 对于更高级的应用,可以先用LLM提取出文本中的核心命题(Propositions)或实体关系,然后将这些结构化的信息作为检索单元。这能实现更精准的原子化信息检索。
文档结构化解析: 针对PDF、HTML等文件,不仅仅是提取纯文本。解析其标题、表格、列表等结构化信息,并将这些元数据(Metadata)与文本块一同索引,对后续的元数据过滤至关重要。
- 优化嵌入模型:
选择合适的模型: 不同的模型有不同的优势。例如,BGE (BAAI General Embedding)系列模型在中英文任务上表现优异,而OpenAI的text-embedding-3-large则在多语言和综合性能上领先。你需要根据你的主要语种和应用场景进行选型和评测。
领域微调 (Fine-tuning): 如果你的业务数据具有非常强的领域特性(如法律、医疗),通用Embedding模型可能无法很好地理解专业术语的语义。在这种情况下,使用领域数据对Embedding模型进行微调,可以显著提升检索效果。
数据清洗:去除文档中的噪声,如HTML标签、页眉页脚、广告语、不相关的图片描述等。
添加元数据: 为每个文本块添加丰富的元数据,如来源文档、章节标题、作者、发布日期等。
生成假设性问题 (Hypothetical Questions): 使用LLM为每个文本块生成几个它可能回答的问题,并将这些问题与文本块一起进行向量化。这样当用户的查询与某个假设性问题相似时,就能更容易地找到对应的原文。这个方法也称为“HyDE”(Hypothetical Document Embeddings),效果显著。
检索:
- 使用混合检索 (Hybrid Search)
语义检索 (Dense Retrieval): 即向量检索,擅长理解概念、意图和模糊查询。
关键词检索 (Sparse Retrieval): 如传统的BM25算法,擅长匹配精确的关键词、术语、ID等。
优势: 当用户查询“Neo4j 5.18.0的新特性”时,关键词检索能精准定位到5.18.0这个版本号,而语义检索能理解“新特性”这个概念。两者结合,能大幅提升召回率和准确率。许多现代向量数据库(如Weaviate, Pinecone)都原生支持混合检索。
- 优化参数:
向量数据库通常使用近似最近邻(ANN)算法来加速检索,最主流的是HNSW和IVF。
- HNSW (Hierarchical Navigable Small World):
ef_construction: 构建索引时的邻居节点数,越高索引质量越好,但构建越慢。ef_search: 查询时搜索的邻居节点数,越高越精确,但延迟也越高。
- IVF (Inverted File):
nlist: 聚类中心的数量。nprobe: 查询时要搜索的聚类中心数量。
- 优化策略: 根据你的业务场景对延迟和精度的要求,通过实验来调整这些参数,找到最佳的平衡点。例如,对于离线分析任务,可以调高精度参数;对于在线实时问答,则需要优先保证低延迟。
检索后处理优化
- 重排序 (Re-ranking):
在通过ANN算法快速召回(例如Top 20个)候选文档后,再使用一个更强大、更精确但计算量也更大的模型对这批候选结果进行重新排序。
- 工作原理: 初步检索使用的是双编码器(Bi-Encoder)模型,它独立地为query和document生成向量。而重排序通常使用交叉编码器(Cross-Encoder)模型,它将query和document同时输入模型进行计算,能更精准地判断两者间的相关性。
- 效果: Re-ranker能有效地将最相关的文档从Top 20提升到Top 3,极大地改善了送给LLM的上下文质量。
BGE-Reranker是一个常用的开源模型。
查询转换:
查询重写: 让LLM将口语化的、模糊的查询改写为更清晰、更结构化的查询。
子问题分解: 当用户提出一个复杂问题时(例如“对比A和B的优缺点并说明应用场景”),可以先让LLM将其分解成多个独立的子问题(“A的优点是什么?”、“B的优点是什么?”、“A的应用场景?”等),然后对每个子问题分别进行检索,最后汇总结果。
Step-Back Prompting: 让LLM从一个具体问题中提炼出一个更泛化、更高层次的问题,对这个高层次问题进行检索,获取更宏观的背景知识,有助于回答原始的具体问题。
- 多路路由:
建立多个子索引: 根据文档的主题或来源(如“技术文档索引”、“市场分析索引”、“法律条款索引”)建立不同的向量索引。
查询路由器: 在检索前,先用一个LLM分类器(Router)来判断用户的查询属于哪个主题,然后仅在对应的子索引中进行检索。
优势: 大大缩小了检索范围,提升了速度和精度,避免了不同主题间的知识干扰。
- 动态构建知识图谱
一个 RAG 系统最怕的就是知识库过时和答案不可信(幻觉)。因此,让知识图谱能够动态更新,并让所有答案都可溯源,是最高阶的要求。
目标: 将知识图谱的“构建”与“查询”打通,并为每一次查询结果提供“证据”。
- 实现知识抽取与写入流水线:创建一个“知识抽取 Agent”,它持续地读取新的非结构化文档。利用 LLM 的信息抽取能力,将文本转化为结构化的“实体-关系-实体”三元组。将这些三元组转化为图数据库的写入语句(如
MERGE)。通过graphStore.execute(writeQuery)将新知识写入知识图谱。 - 为知识添加“证据”元数据,在执行上述写入操作时,最重要的一点是:为每一个创建的节点和关系,都附加上来源元数据。
1 | MERGE (p:Person {name: '马云'}) |
修改 GraphClient 的返回结果,使其不仅包含数据,也包含数据对应的 source 等元数据。
在最终生成答案的 Prompt 中,明确指示 LLM:**“你必须根据提供的来源信息,为你的答案提供引用。”
知识图谱的设计
逻辑设计
1.初始化知识图谱走类似Google GraphRAG
- 首先将用户输入的信息,转换为三元组的形式,存储为节点和关系,注意使用meger函数,还需要标记来源,作为证据。
- 然后同样的数据,经过嵌入模型向量化,存储在向量数据库中。
- 根据输入的信息的相关度,新建子索引,细化处理分析
2.已有知识图谱的cyher查询
- 将输入的语言转为cyher语言进行查询,同时在向量数据库中进行关键词查询和语义搜索,两个数据库同时搜索出结果,将其放入重排模型进行topk查询,然后将最后结果交给LLM进行输出
模块设计
数据处理:
数据源适配器,负责从不同数据源(本地文件系统、S3、SharePoint、PostgreSQL、REST API)拉取数据。为每种数据源实现一个独立的适配器插件,输出统一格式的
RawDocument对象(包含内容、元数据、来源URL等)。知识抽取服务,接收
RawDocument,进行预处理、信息抽取和格式化。预处理: 清洗文本(去除HTML标签、无关字符)。
智能分块 (Semantic Chunking): 使用
RecursiveCharacterTextSplitter等策略,将长文本切分为有意义的TextChunk。LLM信息抽取: 设计包含Schema定义和JSON输出格式的Prompt,调用LLM,从
TextChunk中抽取出实体、关系、属性,生成结构化的JSON数据。向量化: 调用Embedding模型,将每个
TextChunk向量化。打包消息: 将结构化JSON和向量化的
TextChunk打包成一个KnowledgePacket消息。
输出: 向消息队列(Kafka)的特定Topic(如knowledge-ingestion)发送KnowledgePacket消息。
- 异步写入消费者,订阅消息队列,实现双模态数据的持久化。
Graph写入消费者:
- 消费
KnowledgePacket,解析其中的结构化JSON。 - 将其转换为幂等的
MERGECypher语句。 - 关键设计: 节点属性必须包含
source(来源)、chunk_id(关联向量库)、last_updated等溯源信息。 - 批量写入Neo4j。
Vector写入消费者:
- 消费
KnowledgePacket,提取TextChunk、向量和元数据。 - 元数据中必须包含从Graph写入后生成的
node_uuid,用于反向关联。 - 根据预设策略(如按文档来源),将数据写入Vector DB的指定子索引/集合中。
智能查询:将用户自然语言问题,高效、精准地转化为基于知识的答案。
- 查询编排服务:作为查询流程的大脑,接收API Gateway转发的用户请求,编排整个查询过程。
NL-to-Cypher生成: 调用LLM,将用户问题转换为Cypher查询。增加安全防护: 对生成的Cypher进行语法校验和规则检查(如限制查询深度),防止恶意或低效查询。
查询任务分发: 并发地向三个执行器分发任务:
- Graph查询执行器(执行Cypher)
- Vector语义查询执行器(执行向量相似度搜索)
- Vector关键词查询执行器(执行全文检索或BM25)
- 查询融合与重排引擎,收集并发查询的结果,进行融合、排序,筛选出最佳上下文。
结果规范化: 将三路查询返回的异构结果(图路径、文本块)转换为统一的Evidence对象列表。Evidence对象结构:{ content: string, source: string, score: float, type: 'graph' | 'vector' }。
- 处理技巧: 对于Graph结果,需将其路径或节点属性“渲染”成一段通顺的文本作为
content。
初步融合与去重: 合并三个列表,并根据内容相似性进行去重。
调用重排模型 (Re-ranker): 将融合后的Evidence列表(例如Top 50)和原始用户问题一起发送给重排模型,模型会对每个Evidence与问题的相关性进行打分。
筛选Top-K: 根据重排模型的得分,选出最相关的Top-K个(如Top 5)Evidence作为最终上下文。
3.LLM答案合成服务,基于筛选出的高质量上下文,生成最终答案。
构建最终Prompt: 模板如下:“请根据以下上下文信息,用严谨、专业的语气回答用户的问题。在回答时,请明确引用信息的来源。上下文:[...插入Top-K Evidence...]。用户问题:[...原始问题...]”
调用LLM生成: 使用流式(Streaming)API调用LLM,将生成的答案实时返回给前端。
数据Model设计
Graph (Neo4j):
节点 (Labels): :Document, :Entity, :Person, :Organization, :Concept 等。
节点属性 (Properties):
uuid: 全局唯一标识符。name: 实体名称。source_url: 知识来源URL。metadata: 其他业务属性(JSON格式)。created_at,updated_at: 时间戳。
关系 (Types): HAS_ENTITY, WORKS_AT, PARTNERS_WITH, RELATED_TO 等。
关系属性: source: 关系来源的文档或句子。weight: 关系强度。
Vector (Milvus):
集合/Collection Schema:
chunk_id(Primary Key, VarChar)text_content(String)text_vector(FloatVector, 1024 dims)metadata(JSON):{ "doc_id": "...", "doc_title": "...", "graph_node_uuid": "..." }
分布式微服务设计
遇到问题及其解决
- 数据库的一致性问题
因为我们使用一个图数据库,一个向量库所以我们的数据库如何保证一致性呢?
写操作需要同时成功写入Neo4j和Milvus两个异构系统,无法使用传统的两阶段提交(2PC)实现分布式事务。
解决方案:采用“事务性发件箱(Transactional Outbox)+ 消息队列”模式,保证最终一致性。
改造知识抽取服务:
- 当服务完成信息抽取和向量化后,不要直接向Kafka发消息。
- 而是在同一个本地数据库事务中,将
KnowledgePacket存入一张本地的outbox表,并将业务数据的状态更新。这样保证了只要业务成功,要发送的消息就一定被持久化了。
引入消息中继(Message Relay):
- 部署一个独立的服务(推荐使用xxl-job?),专门负责“扫描”
outbox表。 - 一旦发现新消息,就将其可靠地投递到Kafka,并在成功投递后,标记或删除
outbox表中的对应记录。
效果: 这个模式将“业务操作”和“消息发送”解耦,彻底避免了“业务成功但消息没发出去”或“消息发出去了但业务回滚了”的经典问题,保证了消息至少成功发布一次。
- 消息幂等的处理
至少一次”的消息投递保证,意味着消费者可能会收到重复消息。同时,网络或消费者崩溃可能导致消息丢失。
依靠Kafka的持久性保证 + 消费者的幂等设计。
防止消息丢失:
- 生产者侧: 在消息中继服务中,设置Kafka Producer的
acks=all,确保消息被所有ISR(In-sync Replicas)确认后才算成功。 - 消费者侧: 关闭消费者的
enable.auto.commit,改为手动提交偏移量。只有当数据成功写入Neo4j和Milvus后,才在代码中显式调用consumer.commitSync()。如果写入失败,程序抛出异常,偏移量不会被提交,下次重启后会重新消费这条消息。
保证消费者幂等:
- 在
KnowledgePacket消息中,生成一个全局唯一的event_id。 - Graph写入消费者:
MERGE操作天生就是幂等的,无需额外处理。 - Vector写入消费者: 在写入Milvus前,先根据
chunk_id(主键)检查数据是否已存在。如果存在,则执行更新操作;如果不存在,则执行插入。或者,消费者可以维护一个已处理event_id的记录(例如存在Redis中,并设置TTL),在处理消息前先检查event_id是否已被处理。
- 引入缓存来解决LLM记忆力的问题
查询链路长,LLM调用成本高、延迟大;多轮对话需要维持上下文记忆。
引入Redis作为多级缓存和会话存储。
多级缓存策略:
- 缓存层1 (NL-to-Cypher):
Key: HASH(用户问题字符串)->Value: 生成的Cypher字符串。对于高频、通用的问题,可以秒级响应。 - 缓存层2 (查询结果):
Key: HASH(Cypher查询或向量查询)->Value: 序列化的查询结果。对于“热点”数据查询,直接返回缓存结果,无需穿透到数据库。
缓存失效与更新:
- 策略: 采用TTL(Time-To-Live)作为基本失效策略(例如,缓存1小时)。这是一种简单有效的折衷。
- 进阶策略: 采用事件驱动的缓存失效。当知识沉淀流水线更新了某部分知识时,可以向一个专门的
cache-invalidationTopic发送消息,订阅该Topic的服务负责精确地删除相关的Redis缓存。
LLM记忆性 (会话管理):
- 方案: 使用Redis来存储每一轮对话的上下文。
- 数据结构: 使用Redis的
LIST或ZSET。Key: conversation_id->Value: [消息1, 消息2, ...]. - 流程:
- 用户发起新一轮对话时,传入
conversation_id。 - 后端服务从Redis中加载最近的K轮对话历史。
- 将对话历史、当前问题、检索到的
Evidence一起构建成Prompt,发送给LLM。 - 将当前的问答对(Q&A)存回Redis的
conversation_id对应的列表中。
- 用户发起新一轮对话时,传入
- 框架集成: LangChain等框架的
ConversationBufferMemory等记忆模块,可以很容易地与自定义的存储后端(如Redis)集成。
冗余数据
如何在两个物理隔离的数据库间建立逻辑引用,并优化存储成本。
通过ID关联,各司其职。
Graph写入消费者先行: Graph消费者处理消息,通过MERGE操作在Neo4j中创建节点,Neo4j会自动为节点生成一个内部ID,但我们最好自己生成一个业务上的uuid并存为属性。
消息增强与转发: Graph消费者成功写入后,它需要将这个uuid写回消息中(或者发送一条包含chunk_id和node_uuid关联关系的新消息到另一个Topic)。
Vector写入消费者后行: Vector消费者监听这条被“增强”后的消息,这样在写入Milvus时,它就能拿到对应的graph_node_uuid,并将其存入Vector记录的metadata字段中。
效果: 这样就建立了一个从Vector到Graph的单向引用。查询时,可以从Vector结果中轻松找到对应的Graph节点,进行更深度的图遍历。
成本优化: 再次强调,Neo4j中只存储结构化数据和少量元数据。完整的原始文本块只在Milvus中存储一份。需要时,通过上述ID关联进行查询。
- 异构结果的融合策略
List<GraphPath> 和 List<TextChunk> 如何合并成一个有序的 List<Evidence>。
渲染-规范-融合-去重
1 | function fuseAndNormalizeResults(graphResults, vectorResults, keywordResults) { |
- 网络延迟处理
复杂的分布式调用链条;LLM生成代码的不可控性。
解决:全链路异步化 + 严格的防护栏与超时控制。
降低延迟:
- 全链路异步: 在Java后端,大量使用
CompletableFuture,让三路查询、结果融合、LLM调用等IO密集型操作完全异步化,最大化利用计算资源。 - 连接池: 确保所有数据库和外部服务的客户端都配置了合理的连接池。
Cypher稳定性与安全:
- 模板化与参数化: 对于特定意图的查询,与其让LLM从头生成,不如预定义一些Cypher“查询模板”,让LLM只负责填充模板中的参数。这极大提升了稳定性和安全性。
- 执行前静态分析: 在执行LLM生成的Cypher前,可以使用
openCypher的front-end库在Java中对其进行AST(抽象语法树)解析。通过分析AST,可以实施非常精细的规则,例如:- 禁止不带
LIMIT的全图扫描。 - 限制
MATCH路径的最大长度。 - 禁止写入操作(如
CREATE,DELETE)。
- 禁止不带
- 运行时超时: 在Neo4j Java Driver中,为每个查询设置一个严格的执行超时时间。这是最后的、也是最有效的防线,可以防止一个糟糕的查询拖垮整个数据库。
提示词工程
AI记忆力&多轮对话的实现
短期记忆
使用滑动窗口来实现,固定窗口大小,
在每次请求时,只将最近的K轮对话(例如,最近5轮)作为上下文(Context)连同当前问题一起发送给LLM。
但是容易出现:
上下文丢失 (Context Loss): 一旦对话超过K轮,最早的信息就会被永久丢弃。如果用户在第1轮提到的关键信息(如“我叫张三”),在第K+1轮时AI就忘了。
记忆僵化: 无法识别哪些历史信息是重要的,哪些是无关紧要的闲聊。
存储: 使用Redis的LIST数据结构是最佳选择。每个conversation_id对应一个LIST。
数据模型: LIST中的每个元素是一个JSON字符串,如{"role": "user", "content": "你好"}。
流程:
- 用户发来消息,附带
conversation_id。 - 从Redis中用
LRANGE conversation_id 0 K-1获取最近的K条记录。 - 将这K条记录和当前问题一起构建成Prompt。
- 获取LLM的回答后,将当前的用户问题和AI回答用
LPUSH和LTRIM命令存入LIST,保持其大小不超过K。
框架支持: LangChain中的ConversationBufferWindowMemory就是这个模式的直接实现。
中期记忆
随着对话的进行,不再保留全部历史,而是调用LLM对越来越长的对话历史进行滚动总结,形成一个摘要。每次请求时,将“历史摘要 + 最近几轮对话”作为上下文。
可以作为中期记忆,作为每个会话结束的总结实现
存储: 摘要可以存在Redis的STRING或HASH中,或者持久化到PostgreSQL/MySQL的某个字段。
流程:
- 设定一个阈值N(例如,每5轮对话触发一次总结)。
- 当对话达到N轮时,启动一个后台任务或在当前请求中,将当前的“旧摘要 + N轮对话”发送给LLM,并附上Prompt:“请总结以下对话…”。
- 将LLM返回的新摘要更新到数据库中。
- 构建下一次请求的Prompt时,结构为:
"这是我们之前的对话摘要:{summary}。接下来是最近的几轮对话:{recent_history}。用户问题:{query}"。
框架支持: LangChain中的ConversationSummaryBufferMemory提供了此功能。
长期个性化记忆
这是一种结构化长期记忆的方案。在对话过程中,实时地使用一个LLM(或专门的NLU模型)来抽取出对话中出现的关键实体、属性和关系,并将这些结构化信息存入知识图谱(如Neo4j)或关系型数据库。
可以个性化存储用户的消息,可以深度思考跨会话的消息
- Schema设计: 在Neo4j中定义用户中心节点
:User,以及:Entity,:Preference,:Fact等节点。 - 异步抽取流程:
- 在每轮对话结束后,将对话内容发送到一个后台处理服务。
- 该服务调用LLM,Prompt为:“从以下文本中抽取出人名、地名、用户偏好等实体信息,并以JSON格式返回…”。
- 服务解析JSON,生成
MERGECypher语句,如MERGE (u:User {userId:'123'}) MERGE (p:Preference {type:'音乐', name:'周杰伦'}) MERGE (u)-[:HAS_PREFERENCE]->(p),更新知识图谱。
- 查询与注入:
- 在每轮对话开始时,除了加载短期记忆,还要根据当前问题中的实体,查询知识图谱,获取相关背景知识。
- 例如,如果用户问“给我推荐几首歌”,可以先查询图谱中该用户的音乐偏好,然后将“该用户喜欢周杰伦”这个事实注入到最终的Prompt中,让LLM做出更个性化的推荐。
- Schema设计: 在Neo4j中定义用户中心节点
记忆上下文构建
职责就是:根据当前的对话ID和用户输入,从各个记忆层中并行地获取相关信息,并按照预设的优先级和格式,组装成最终注入到LLM Prompt中的上下文(Context)。
API层接收到用户请求(convId, userId, userQuery)。
调用MemoryContextBuilder.buildPromptContext()。该方法内部并行启动三路记忆的加载。
构建器等待所有结果返回,然后按照长期 -> 中期 -> 短期的优先级组装成一个结构化的上下文字符串。
将组装好的context和userQuery填入最终的LLM Prompt模板中。
调用LLM,并将结果流式返回给用户。
在后台,将当前的问答对ChatMessage异步地发送到Kafka的conversation_turns Topic,触发短期和长期记忆的更新。
通过会话超时或用户登出等事件,触发conversation_ended事件,更新中期记忆。
性能问题解决
- 查询链路P99延迟过高,在高并发下,即使用户平均响应时间尚可,但总有5%或1%的用户会经历无法忍受的慢响应。
MemoryContextBuilder的并行设计虽好,但木桶的最短板决定了最终性能。
- 数据库层面:比如图数据库Neo4j中查询的信息都要加上索引,增快查询速度,使用
EXPLAIN和PROFILE进行分析,防止全表扫描,合理配置堆内存和页缓存。 - 引入新的缓存,实现三级缓存架构,本地缓存,redis缓存,但是要注意数据的一致性,可以引用新的canel中间件来保证数据的最终一致性的问题
- 线程池的隔离,并行
CompletableFuture中,为不同的任务指定不同的专用线程池(Executor)。防止一个慢任务拖垮其他任务的工作
使用Jaeger/Zipkin来进行链路的追踪,具体是哪个环节出了问题,然后数据库查慢日志,Grafana仪表盘等等手段
- 上下文的剪切
计算组装好的上下文的总Token数。
如果超过预设阈值(如6000 Token),则调用一个更便宜、更快的模型(如GPT-3.5-Turbo, Llama-3-8B),让它对上下文进行压缩和筛选,Prompt:“请从以下信息中,筛选出与用户问题‘{userQuery}’最相关的部分,并保持在2000个Token以内。”
将压缩后的上下文用于最终的、昂贵的模型(如GPT-4o)调用。
- 建立模型的路由
用户问题首先经过一个分类器(可以是小LLM,也可以是传统模型),判断其意图是“简单问答”、“闲聊”还是“复杂推理”。根据分类结果,将请求路由到不同成本和能力的LLM模型。
- 幻觉脏数据处理
LLM在抽取实体和关系时可能产生幻觉,导致错误的信息被存入“长期记忆”,进而污染后续所有基于此记忆的回答。
创建一个“黄金标准”的测试集,包含上百个有代表性的(问题,预期答案,应检索到的上下文)三元组。
自动化回归测试: 在每次系统有重大更新(如更换Embedding模型、修改Prompt)后,自动化地运行整个测试集,并计算关键指标:
- 检索质量:
Precision@K,Recall@K,MRR(Mean Reciprocal Rank)。 - 答案质量: 使用
LLM-as-a-Judge模式,让GPT-4等强模型来评估生成答案的准确性、忠实度、相关性,并打分。
对比较重要的数据,进行人工审核的处理,用户层面可以点赞或者点踩,然后我们重点分析点踩的那一部分数据
- IO模型的更换
采用非阻塞I/O与响应式编程,Spring WebFlux + Project Reactor + Netty。这是现代Java构建高性能网络应用的黄金组合。
WebFlux使用事件循环(Event Loop)模型,用极少数的线程(如CPU核心数)就可以处理成千上万的并发连接。线程不会因等待数据(等待LLM返回token,等待网络写入)而被阻塞,而是去处理其他事件。
数据从LLM API流出,到写入响应给客户端的网络缓冲区,全程不应在你的后端服务中发生显著的、完整的数据聚合。
SSE
一种基于标准HTTP的、服务端到客户端的单向消息推送协议。它是为流式更新而生的。
Server-Sent Events(服务器发送事件)是一种允许服务器随时向客户端推送数据的Web技术。从本质上讲,它是在单个、持久的HTTP连接上实现的、从服务器到客户端的单向数据流。
客户端一旦与服务器的SSE端点建立连接,这个连接就会保持打开状态。服务器可以随时通过这个连接,以一种特定的文本格式,将事件(数据)发送给客户端。
SSE的美妙之处在于其协议极度简单。服务器在响应时,必须将HTTP Header的Content-Type设置为text/event-stream。之后发送的内容遵循以下格式:
data:这是最重要的字段,用于承载你要发送的数据。可以是一行,也可以是多行。1
2data: 这是第一行数据
data: 这是第二行数据在客户端,这两行数据会被拼接成
"这是第一行数据\n这是第二行数据"。event:用于为事件命名。如果省略,事件的默认名称是message。通过命名事件,你可以在前端为不同类型的消息绑定不同的监听器。1
2event: user_login
data: {"username": "Alice", "timestamp": 1678886400}id:为每个事件设置一个唯一的ID。这个ID的主要作用是实现断线重连。如果连接意外中断,浏览器在自动重连时,会把上次接收到的id值通过Last-Event-ID请求头发送给服务器。服务器可以根据这个ID,补发在断连期间可能丢失的数据。1
2id: msg1
data: some dataretry:告诉浏览器在连接断开后,应该等待多少毫秒再尝试重连。1
retry: 10000
这表示10秒后重试。
一个完整的消息块由一个空行(\n\n)分隔。
1 | id: event-1 |
实例:
LLM聊天机器人响应流: (我们的核心场景) 逐字或逐词地将AI的回答推送到前端。
实时通知: 如社交网络的“你有新消息”或电商的“订单已发货”。
金融数据更新: 股票价格、加密货币行情等实时数据的推送。
新闻或体育比分直播: 将最新的事件和比分实时更新到页面。
系统监控仪表盘: 实时显示服务器的CPU、内存使用率等监控指标。
Spring Boot通过SseEmitter类,提供了对SSE的完美支持。它以非阻塞的方式工作,不会为每个连接都占用一个宝贵的线程。
nl2sql
一个基于自然语言生成SQL查询(Natural Language to SQL, NL2SQL)的智能体框架。
该框架的核心作用是充当一个“翻译器”,将用户输入的自然语言问题(例如,“查询上个季度销售额最高的三个产品是什么?”)自动转换成对应数据库可以执行的SQL查询语句。
核心
基本流程:
输入理解 (Input Understanding):框架接收用户的自然语言输入。
意图识别与槽位填充:利用大语言模型的自然语言理解(NLU)能力,分析问句的意图(查询、统计、对比等)并提取关键信息(查询的指标、过滤条件、时间范围等)。
数据库元数据理解 (Schema Understanding):框架需要理解目标数据库的结构信息,包括表名、列表、字段类型、表之间的关联关系(主外键)等。这通常通过预先读取和处理数据库的schema来实现。
SQL生成 (SQL Generation):这是最核心的环节。框架会将用户的意图、关键信息与数据库的schema信息一同作为上下文(Context),构建成一个高效的提示(Prompt),然后提交给通义大语言模型。大模型根据这些上下文信息,生成准确的、符合特定数据库方言(如MySQL, PostgreSQL)的SQL查询语句。
SQL执行与结果返回 (Execution & Response):生成的SQL语句被发送到目标数据库执行,并将查询结果返回给用户。框架可能还会对结果进行格式化或进一步的自然语言解释。
设计方式&扩展优化:
基于大语言模型驱动,轻量级与可扩展,智能体框架
Schema链接与召回策略优化,对于拥有成百上千张表的复杂数据库,如何根据用户问题动态、准确地选择最相关的表和字段是提升SQL生成质量的关键。可以引入更先进的向量化匹配和知识图谱技术来优化这一过程。
搜索处理模糊和同义词,增强对业务术语、别名和同义词的理解能力,例如用户说“查一下GMV”,系统能自动关联到“成交总额”这一指标对应的字段。
复杂函数的处理,窗口函数与复杂分析:提升对需要使用窗口函数、公共表达式(CTE)等高级SQL特性的复杂分析类问题的支持。
跨数据源查询:支持用户的一个问题需要查询多个异构数据库才能回答的场景。
搜索代理
在聊搜索代理之前,就需要先搞清楚代理是什么,简单理解可以是:LLM 在自主循环中使用工具
那么搜索代理的简单定义:LLM 使用各种搜索或检索工具,动态地,按需地获取相关上下文
我在测试各种模型对于检索工具的调用的时候,发现模型能力差异导致的搜索结果不同,优秀的模型的搜索路径规划的更加合理,并且搜索的方向非常准确,而能力较差的模型,方向飘忽不定,结果也很一般
所以我发现影响搜索代理成功的因素之一是:更智能的模型,可以使搜索代理自主应对复杂问题并从错误中恢复
RAG 和搜索代理的区别
搜索代理和 RAG 的最主要的区别在于工作执行流程:
- RAG 是预推理检索:系统预先从向量数据库检索出来结果之后输入给 LLM,这个时候 LLM 是被动的接受
- 搜索代理是即时检索:LLM 是主动决策者,由 LLM 来决定搜索什么,怎么搜索,并且可以根据中间结果来主动的调整搜索路径
搜索代理和 RAG 还有一个在优化行为方式的区别:
搜索代理中有一种工具是 glob,这个工具是用来搜索和匹配文件名的,这个时候文件名也可以成为有效搜索因素,当一个代码库中的文件结构非常清晰,并且文件命名合理,在这种工作空间下搜索代理会非常有效,搜索代理的影响因素如下:
- 文件夹的结构: 例如:
tests/test_agent.ts这个就表示测试模块中的测试 agent 功能的文件的隐性含义 - 命名约定:例如:
*.config.js表示配置文件 、README.md表示文档说明 - 文件大小:可以隐含复杂性的背景信息
- 时间戳:可以表示文件是最近修改的,隐含该文件相关性比较强(因为这个文件是最近修改的,那用户输入的问题很大概率是因为他刚刚修改这个文件产生的,我们可以仔细想想自己使用 ClaudeCode 或者 Cursor 的 Agent 模式的时候是不是这样的呢)
在这种构建方式下,代理可以逐层构建理解,仅在工作记忆中保留必要信息,这种自我管理的上下文窗口使代理专注于相关子集,而不是被大量但可能无关的信息淹没。
而对于 RAG 来说,只能重新构建嵌入文档块了,而不是像搜索代理这样随心所欲的动态更换一下文件名就可以,这也是一个关键区别搜索代理的维护成本比较低,开发起来比较快
当然,事物总是相对的,RAG 还是有优秀的地方的
- 在大量数据时,RAG 的检索会比搜索代理快很多
- RAG 的开发构建策略是平稳的,有成熟的构建搜索方案:分叉索引、父文档索引等,但是搜索代理要有效的话,需要有深思熟虑的工程实践,要有成熟的指导开发方案,不如搜索代理会很容易进入死循环和无效检索来浪费上下文,在这个方向上,搜索代理的开发难度比 RAG 大多了
那我们来总结一下 RAG 和搜索代理的区别点
- 工作执行流程的区别,RAG 是预推理检索,搜索代理是即时检索
- 优化行为方式的区别,搜索代理的优化方向更多,策略是可以动态调整的,RAG 需要重新构建文档块
- 开发和维护成本的区别:搜索代理的维护成本比较低,文件系统工具即时开发,不需要像 RAG 那样整理文档块,然后构建向量数据库等,并且搜索代理调整和维护比较容易
- 大量数据的检索速度区别:在大量数据的前提下,RAG 检索速度回比搜索代理快很多
- 有效搜索代理开发难度大:要想搜索代理在实际的场景中有效,需要有合适的策略指导
应用场景
1、 对于上下文信息密度有较高的要求:例如编程开发领域,开发者使用代码来实现自己严密的业务逻辑,这种场景下使用代理搜索会非常合适
倘若我们采用 RAG 的方式来检索,我们需要对代码库进行文档块的分割,就不能像文本那样简单的按照行数来分割了,因为代码上下文之间有很强的逻辑依赖,如果文档块中少一行 if 判断,表达的意思就是天差地别了,比较合适的的方法是结合语法树和代码依赖关联来进行分割,
所以这种情况下,不仅存入向量数据库的过程更麻烦,检索时也因为内部机制不透明,导致开发者很难看清检索的全貌,调试起来也更困难。
2、动态上下文不多:要动态传入给 LLM 的动态外部数据不多的时候,搜索代理很适用,例如:中小型代码库、一些公司内部客服手册这种相对固定的数据
我非常推荐开发者在构建大模型应用初期的时候,完全可以优先考虑使用搜索代理来实现外部数据的引入,在应用构建初期的时候,遵循着快速迭代的原则,搜索代理的开发难度和开发效率相比复杂的 RAG 是有优势的
3、偏向简单有效原则构建 Agent:大模型应用开发这个领域是在快速发展的,而且大部分都是向着“LLM 越来越智能”这个方向前行,构建简单的模块和应用是最合适的,方便应用和模块进行迭代,像渐进式构建有效应用的原则一样
具体实现
- 一个成熟的执行工具需要兼容多种系统的版本,这个时候是比较复杂的
- 工具的描述和名称要定义的合理,表达的清晰,不然模型会混乱调用
FileRead负责最后的内容读取、ListDirectory负责第一层的目录搜索、Glob负责第二层文件名的搜索、Grep负责第三层文件内容的搜索
工具
实现 GrepTool 工具对象时,需要一些工具的参数定义,比较重要的是工具参数
- pattern:搜索的正则表达式
- path:搜索路径(目录)
- include:文件模式过滤
GrepTool工具的实现
import { InternalTool, InternalToolContext } from '../types.js';
import { ripGrep } from '../../utils/ripgrep.js';
export const TOOL_NAME_FOR_PROMPT = 'GrepTool';
export const DESCRIPTION = `
- 适用于任何代码库大小的快速内容搜索工具
- 使用正则表达式搜索文件内容
- 支持完整的正则表达式语法(例如 "log.*Error"、"function\\s+\\w+" 等)
- 使用 include 参数按模式过滤文件(例如 "*.js"、"*.{ts,tsx}")
- 返回按修改时间排序的匹配文件路径
- 当你需要查找包含特定模式的文件时使用此工具
- 当你进行可能需要多轮 glob 和 grep 的开放式搜索时,请改用 Agent 工具
`;
/**
* GrepTool 参数定义
*/
export interface GrepToolArgs {
/** 搜索的正则表达式模式 */
pattern: string;
/** 搜索路径(目录) */
path?: string;
/** 文件模式过滤 */
include?: string;
}
/**
* GrepTool 返回结果
*/
export interface GrepToolResult {
/** 匹配的文件路径 */
matches: string[];
/** 匹配总数 */
count: number;
}
/**
* GrepTool 处理函数
*/
const grepToolHandler = async (
args: GrepToolArgs,
context?: InternalToolContext
): Promise<GrepToolResult> => {
const { pattern, path = context?.cwd || process.cwd(), include } = args;
// 构建 ripgrep 参数
const rgArgs: string[] = [];
// 只返回文件路径
rgArgs.push('-l'); // --files-with-matches
// 文件模式过滤
if (include) {
rgArgs.push('--glob', include);
}
// 搜索模式
rgArgs.push(pattern);
// 执行搜索
const abortSignal = context?.abortSignal || new AbortController().signal;
const results = (await ripGrep(rgArgs, path, abortSignal)) as string[];
return {
matches: results,
count: results.length,
};
};
/**
* GrepTool 工具定义
*/
export const GrepTool: InternalTool<GrepToolArgs, GrepToolResult> = {
name: 'grep_search',
category: 'search',
internal: true,
description: DESCRIPTION,
version: '1.0.0',
parameters: {
type: 'object',
properties: {
pattern: {
type: 'string',
description:
'The regular expression pattern to search for in file contents',
},
path: {
type: 'string',
description:
'The directory to search in. Defaults to the current working directory.',
},
include: {
type: 'string',
description:
'File pattern to include in the search (e.g. "*.js", "*.{ts,tsx}")',
},
},
required: ['pattern'],
},
handler: grepToolHandler,
};
在实现这个 grepTool 工具函数的时候,使用的是 ripgrep 命令,这个命令执行的速度非常快,比一般的搜索命令要快很多,在构建这个命令执行的函数需要注意两点:
- Mac 系统需要构建执行文件的签名
- 命令执行的时候需要判断,如果本地电脑没有 rg 这个命令,就需要使用 vendor 里面的二进制文件来执行了
使用命令 ripgrep:https://github.com/BurntSushi/ripgrep
ripgrep命令的实现
import { fileURLToPath } from 'url';
import path from 'path';
import { findActualExecutable } from 'spawn-rx';
import { execFile } from 'child_process';
import { execFileNoThrow } from './execFileNoThrow.js';
import { config } from '../config/env.js';
const __filename = fileURLToPath(import.meta.url);
let __dirname = path.dirname(__filename);
console.log(config.nodeEnv);
if (config.nodeEnv === 'development' || config.nodeEnv === 'test') {
__dirname = path.resolve(__dirname, '..', '..');
}
function ripgrepPath() {
//检查本地是否安装了ripgrep
const { cmd } = findActualExecutable('rg', []);
if (cmd !== 'rg') {
return cmd;
} else {
const rgRoot = path.resolve(__dirname, 'vendor', 'ripgrep');
console.log(rgRoot);
const ret = path.resolve(
rgRoot,
`${process.arch}-${process.platform}`,
'rg'
);
console.log(ret);
return ret;
}
}
export async function ripGrep(
args: string[],
target: string,
abortSignal: AbortSignal
) {
//mac电脑的签名
await macSignature();
const rgPath = ripgrepPath();
return new Promise(resolve => {
execFile(
ripgrepPath(),
[...args, target],
{
maxBuffer: 1_000_000,
signal: abortSignal,
timeout: 10_000,
},
(error, stdout) => {
if (error) {
if (error.code !== 1) {
console.log(error);
}
resolve([]);
} else {
resolve(stdout.split('\n').filter(Boolean));
}
}
);
});
}
//mac电脑的签名
let alreadSign = false;
async function macSignature() {
if (process.platform !== 'darwin') {
return '';
}
alreadSign = true;
console.log('开始验证签名...');
const lines = (
await execFileNoThrow(
'codesign',
['-vv', '-d', ripgrepPath()],
undefined,
undefined,
false
)
).stdout.split('\n');
console.log(lines);
const needsSigned = lines.find(line => line.includes('linker-signed'));
if (!needsSigned) {
console.log('不需要签名');
return;
}
try {
console.log('生成ripgrep签名');
const signResult = await execFileNoThrow('codesign', [
'--sign',
'-',
'--force',
'',
'--preserve-metadata=entitlements,requirements,flags,runtime',
ripgrepPath(),
]);
if (signResult.code !== 0) {
console.log('签名失败', signResult.stderr);
}
console.log('移除应用隔离标记');
const quarantineResult = await execFileNoThrow('xattr', [
'-d',
'com.apple.quarantine',
ripgrepPath(),
]);
if (quarantineResult.code !== 0) {
console.log('移除应用隔离标记失败', quarantineResult.stderr);
}
console.log('签名成功');
} catch (error) {
console.error('签名生成失败', error);
}
}
GlobTool 工具实现
实现 GlobTool 工具对象的时候,相应的工具参数的定义:
- pattern:Glob 匹配模式
- path:搜索路径
GlobTool工具的实现
import { InternalTool, InternalToolContext } from '../types.js';
import { glob } from '../../utils/file.js';
export const TOOL_NAME_FOR_PROMPT = 'GlobTool';
export const DESCRIPTION = `- 适用于任何代码库大小的快速文件模式匹配工具
- 支持像 "**/*.js" 或 "src/**/*.ts" 这样的 glob 模式
- 返回按修改时间排序的匹配文件路径
- 当你需要通过名称模式查找文件时使用此工具
- 当你进行可能需要多轮 glob 和 grep 的开放式搜索时,请改用 Agent 工具
`;
/**
* GlobTool 参数定义
*/
export interface GlobToolArgs {
/** Glob 匹配模式 */
pattern: string;
/** 搜索路径(目录) */
path?: string;
}
/**
* GlobTool 返回结果
*/
export interface GlobToolResult {
/** 匹配的文件路径 */
files: string[];
/** 是否被截断 */
truncated: boolean;
/** 匹配总数 */
count: number;
}
/**
* GlobTool 处理函数
*/
const globToolHandler = async (
args: GlobToolArgs,
context?: InternalToolContext
): Promise<GlobToolResult> => {
const { pattern, path = context?.cwd || process.cwd() } = args;
// 执行 glob 搜索
const abortSignal = context?.abortSignal || new AbortController().signal;
const result = await glob(
pattern,
path,
{ limit: 100, offset: 0 },
abortSignal
);
return {
files: result.files,
truncated: result.truncated,
count: result.files.length,
};
};
/**
* GlobTool 工具定义
*/
export const GlobTool: InternalTool<GlobToolArgs, GlobToolResult> = {
name: 'glob_search',
category: 'search',
internal: true,
description: DESCRIPTION,
version: '1.0.0',
parameters: {
type: 'object',
properties: {
pattern: {
type: 'string',
description:
'The glob pattern to match files (e.g., "**/*.ts", "src/**/*.{js,jsx}")',
},
path: {
type: 'string',
description:
'The directory to search in. Defaults to the current working directory.',
},
offset: {
type: 'number',
description: 'Number of results to skip (for pagination, default: 0)',
},
limit: {
type: 'number',
description: 'Maximum number of results to return (default: 100)',
},
},
required: ['pattern'],
},
handler: globToolHandler,
};
这个命令的实现相对来说比较简单,使用 glob 框架就可以搞定
glob命令的实现
import { glob as globLib } from 'glob';
//匹配搜索文件目录下的文件
export async function glob(
filePattern: string,
cwd: string,
{ limit, offset }: { limit: number; offset: number },
abortSignal: AbortSignal
) {
const paths = await globLib([filePattern], {
cwd,
nocase: true,
nodir: true,
signal: abortSignal,
stat: true,
withFileTypes: true,
});
const sortedPaths = paths.sort((a, b) => (a.mtimeMs ?? 0) - (b.mtimeMs ?? 0));
const truncated = sortedPaths.length > offset + limit;
return {
files: sortedPaths
.slice(offset, offset + limit)
.map(path => path.fullpath()),
truncated,
};
}
FileReadTool 工具实现
实现这个文件内容读取的工具对象,增加了一个类似于分页读取内容的功能,避免一次读取太多,相应的工具参数定义:
- file_path:文件路径
- offset:起始行号
- limit:最大读取行数
FileReadTool工具实现
import { InternalTool, InternalToolContext } from '../types.js';
import { readFileContent } from '../../utils/file.js';
const MAX_LINES_TO_READ = 2000;
const MAX_LINE_LENGTH = 2000;
export const DESCRIPTION = '从本地文件系统读取文件。';
export const PROMPT = `从本地文件系统读取文件。file_path 参数必须是绝对路径,而不是相对路径。默认情况下,它从文件开头读取最多 ${MAX_LINES_TO_READ} 行。你可以选择指定行偏移量和限制(对于长文件特别有用),但建议通过不提供这些参数来读取整个文件。任何超过 ${MAX_LINE_LENGTH} 个字符的行将被截断。对于图像文件,该工具将为你显示图像。`;
/**
* FileReadTool 参数定义
*/
export interface FileReadToolArgs {
/** 文件路径 */
file_path: string;
/** 起始行号(从0开始) */
offset?: number;
/** 最大读取行数 */
limit?: number;
}
/**
* FileReadTool 返回结果
*/
export interface FileReadToolResult {
/** 文件内容 */
content: string;
/** 返回的行数 */
lineCount: number;
/** 文件总行数 */
totalLines: number;
/** 文件路径 */
filePath: string;
}
/**
* FileReadTool 处理函数
*/
const fileReadToolHandler = async (
args: FileReadToolArgs,
context?: InternalToolContext
): Promise<FileReadToolResult> => {
const { file_path, offset = 0, limit } = args;
// 读取文件内容
const result = await readFileContent(file_path, offset, limit);
return {
content: result.content,
lineCount: result.lineCount,
totalLines: result.totalLines,
filePath: file_path,
};
};
/**
* FileReadTool 工具定义
*/
export const FileReadTool: InternalTool<FileReadToolArgs, FileReadToolResult> =
{
name: 'read_file',
category: 'filesystem',
internal: true,
description: DESCRIPTION,
version: '1.0.0',
parameters: {
type: 'object',
properties: {
file_path: {
type: 'string',
description: 'The absolute or relative path to the file to read',
},
offset: {
type: 'number',
description: 'Starting line number (0-indexed, default: 0)',
},
limit: {
type: 'number',
description:
'Maximum number of lines to read (omit to read entire file)',
},
},
required: ['file_path'],
},
handler: fileReadToolHandler,
};
这个文件内容读取工具使用的是 Node 的 fs 模块中的文件读取命令,需要重点注意的是文件编码格式要获取,不一定文件编码格式都是 utf-8
文件读取命令实现
//读取文件内容
export async function readFileContent(
filePath: string,
offset: number = 0,
maxLines?: number
): Promise<{ content: string; lineCount: number; totalLines: number }> {
//TODO:获取文件的编码格式 - 默认UTF-8这个原本是需要写一个文件编码获取的函数
const enc = 'utf-8';
//读取文件内容
const content = await readFileSync(filePath, enc);
//按行切割 - 跨平台的兼容要使用"/\r?\n/"
const lines = content.split('\n');
//整理返回结果
const toReturn =
maxLines !== undefined && lines.length - offset > maxLines
? lines.slice(offset, offset + maxLines)
: lines.slice(offset);
return {
content: toReturn.join('\n'),
lineCount: toReturn.length,
totalLines: lines.length,
};
}
ListDirectoryTool 工具实现
该工具是文件目录结构查询,工具函数的参数只有一个:
- path:要列出目录的路径
ListDirectoryTool工具函数的实现
import { InternalTool, InternalToolContext } from '../types.js';
import {
listDirectory,
createFileTree,
printTree,
} from '../../utils/listDirectory.js';
export const DESCRIPTION =
'列出指定路径中的文件和目录。path 参数必须是绝对路径,而不是相对路径。如果你知道要搜索哪些目录,通常应优先使用 Glob 和 Grep 工具。';
/**
* ListDirectoryTool 参数定义
*/
export interface ListDirectoryToolArgs {
/** 要列出的目录路径 */
path?: string;
}
/**
* ListDirectoryTool 返回结果
*/
export interface ListDirectoryToolResult {
/** 文件树格式输出 */
tree: string;
/** 文件路径列表 */
files: string[];
/** 总数 */
count: number;
}
/**
* ListDirectoryTool 处理函数
*/
const listDirectoryToolHandler = async (
args: ListDirectoryToolArgs,
context?: InternalToolContext
): Promise<ListDirectoryToolResult> => {
const { path = context?.cwd || process.cwd() } = args;
// 执行目录列出
const abortSignal = context?.abortSignal || new AbortController().signal;
const files = listDirectory(path, context?.cwd || process.cwd(), abortSignal);
// 创建文件树
const tree = createFileTree(files);
const treeOutput = printTree(tree);
return {
tree: treeOutput,
files,
count: files.length,
};
};
/**
* ListDirectoryTool 工具定义
*/
export const ListDirectoryTool: InternalTool<
ListDirectoryToolArgs,
ListDirectoryToolResult
> = {
name: 'list_directory',
category: 'filesystem',
internal: true,
description: DESCRIPTION,
version: '1.0.0',
parameters: {
type: 'object',
properties: {
path: {
type: 'string',
description:
'The directory path to list. Defaults to the current working directory.',
},
},
required: [],
},
handler: listDirectoryToolHandler,
};
在实现该工具函数的中,没有使用递归函数去实现,担心栈溢出,使用的是一个广度搜索,需要注意的是两个格式化的函数
- 将搜索出来的文件数组转换为 Tree 格式的数据结构
- 将 Tree 格式的数据结构转换为空格隔开的目录字符串
格式化函数和搜索算法实现
import { readdirSync } from 'fs';
import { basename, join, relative, sep } from 'path';
import { getCwd } from './common.js';
export function listDirectory(
initialPath: string,
cwd: string,
abortSignal: AbortSignal
): string[] {
const results: string[] = [];
const queue = [initialPath];
while (queue.length > 0) {
if (results.length > 500) {
return results;
}
if (abortSignal.aborted) {
return results;
}
const path: any = queue.shift();
if (skip(path)) {
continue;
}
if (path !== initialPath) {
results.push(relative(cwd, path) + sep);
}
let children;
try {
children = readdirSync(path, {
withFileTypes: true,
});
} catch (error) {
console.error(error);
continue;
}
for (const child of children) {
if (child.isDirectory()) {
queue.push(join(path, child.name) + sep);
} else {
const fileName = join(path, child.name);
if (skip(fileName)) {
continue;
}
results.push(relative(cwd, fileName));
if (results.length > 500) {
return results;
}
}
}
}
return results;
}
function skip(path: string): boolean {
if (path !== '.' && basename(path).startsWith('.')) {
return true;
}
if (path.includes(`__pycache__${sep}`)) {
return true;
}
return false;
}
/**
* 为了让结果更加可读和节省Token,要创建两个对于结果进行格式化的函数
* - createFileTree
* - printTree
*/
type TreeNode = {
name: string;
path: string;
type: 'file' | 'directory';
children?: TreeNode[];
};
export function createFileTree(sortedPaths) {
const root: TreeNode[] = [];
for (const path of sortedPaths) {
const parts = path.split(sep);
let currentLevel: any = root;
let currentPath = '';
for (let i = 0; i < parts.length; i++) {
const part = parts[i];
if (!part) {
continue;
}
currentPath = currentPath ? `${currentPath}${sep}${part}` : part;
const isLastPart = i === parts.length - 1;
const existingNode = currentLevel.find(node => node.name === part);
if (existingNode) {
currentLevel = existingNode.children || [];
} else {
const newNode: TreeNode = {
name: part,
path: currentPath,
type: isLastPart ? 'file' : 'directory',
};
if (!isLastPart) {
newNode.children = [];
}
currentLevel.push(newNode);
currentLevel = newNode.children || [];
}
}
}
return root;
}
/**
* eg.
* - src/
* - index.ts
* - utils/
* - file.ts
* @param tree
* @param level
* @param prefix
* @returns
*/
export function printTree(tree: TreeNode[], level = 0, prefix = ''): string {
let result = '';
if (level == 0) {
result += `- ${getCwd()}${sep}\n`;
prefix = ' ';
}
for (const node of tree) {
result += `${prefix}${'-'} ${node.name}${node.type === 'directory' ? sep : ''}\n`;
if (node.children && node.children.length > 0) {
result += printTree(node.children, level + 1, `${prefix} `);
}
}
return result;
}
工具管理
我们定义好这些工具之后,需要将这个工具提供给 LLM,并且 LLM 返回工具执行命令的时候,需要调用相应工具的函数执行,最后将结果返回给 LLM,所以这里需要创建一个工具管理器
我的实现思路:
- 提供一个方法用来注册工具函数:
registerAllTools - 提供一个方法用来获取注册好的工具函数:
getTools - 提供一个工具执行函数:
execute
工具管理的实现
import {
InternalTool,
InternalToolContext,
FormattedToolDefinition,
} from './types.js';
import { GrepTool } from './GrepTool/GrepTool.js';
import { ListDirectoryTool } from './ListDirectoryTool/ListDirectoryTool.js';
import { GlobTool } from './GolbTool/GlobTool.js';
import { FileReadTool } from './FileReadTool/FileReadTool.js';
const toolsList = [GrepTool, GlobTool, ListDirectoryTool, FileReadTool];
/**
* 工具管理类
* 负责工具的注册、查询和执行
*/
export class ToolManager {
private tools: Map<string, InternalTool> = new Map();
constructor() {
// 在构造函数中自动注册所有工具
this.registerAllTools();
}
/**
* 注册所有工具
* 显式列出所有要注册的工具
*/
private registerAllTools() {
const tools = [...toolsList];
tools.forEach(tool => {
this.tools.set(tool.name, tool);
});
}
/**
* 执行指定工具
* @param name 工具名称
* @param args 工具参数
* @param context 可选的上下文参数(如 abortSignal)
* @returns 工具执行结果
*/
async execute<TArgs = any, TResult = any>(
name: string,
args: TArgs,
context?: InternalToolContext
): Promise<TResult> {
const tool = this.tools.get(name);
if (!tool) {
throw new Error(
`Tool '${name}' not found. Available tools: ${this.getToolNames().join(', ')}`
);
}
try {
const result = await tool.handler(args, context);
return result;
} catch (error) {
console.error(`Tool '${name}' execution failed:`, error);
throw error;
}
}
/**
* 获取所有已注册的工具
* @returns 工具数组
*/
getTools(): InternalTool[] {
return Array.from(this.tools.values());
}
}
在使用的时候,因为供应商的不同,对于 api 中的 tool 参数的格式也是不一样的,所以这个需要在定义 LLM 类的时候,自己提供一个工具格式转换方法,例如 Openai 的参数格式是
LLM调用工具
关于 LLM 的调用中,最核心的思路是:LLM 循环执行并且判断,并且设置最大的循环数,不要陷入死循环














