当前位置: 首页 > news >正文

3.RAG

一、RAG初识RAG(Retrieval-Augmented Generation检索增强生成)是一种将 信息检索与文本生成 相结合的技术框架。它通过以下流程解决大模型(LLM)的“知识盲区”问题:用户问题-从知识库检索相关文档-将文档作为上下文输入LLM-生成精准答案二、核心流程1.用户提问XXX公司XXXX年销售总额是多少首先拆分关键词、理解句意2.知识库检索使用向量数据库检索相似文档再返回相关片段3.提示词工程基于以下信息回答问题 检索到的文档1 检索到的文档2 ... 问题{用户的问题} 答案4.生成答案三、RAG关键组件1.文本加载器用于解析PDF/Word等原始文件2.嵌入模型用户将文本转化为向量3.向量数据库用于储存或快速检索向量4.大语言模型用于生成答案5.提示词管理器动态构建上下文提示demo1调用大模型1.创建项目-2.添加notebookuv add notebook-3.安装依赖安装OpenAI的依赖uv add openai4.初始化客户端load_dotenv() API_KEYos.getenv(DEEPSEEK_API_KEY) #创建OpenAI客户端 clientOpenAI( api_keyAPI_KEY,#api_key从环境变量中获取api_key base_urlhttps://api.deepseek.com#路径 ) #补充api_key不能直接写死不能是硬编码应该从环境变量中获取 #读取环境变量需要用到python-dotenv(做环境管理的一个包需要先安装依赖uv add python-dotenv #再创建.env文件并写入API_KEYAPI_KEY #再从环境变量中获取api_key#补充api_key不能直接写死不能是硬编码应该从环境变量中获取 #读取环境变量需要用到python-dotenv(做环境管理的一个包需要先安装依赖uv add python-dotenv #再创建.env文件并写入API_KEYAPI_KEY #再从环境变量中获取api_key5.访问大模型print(...正在调用大模型...) responseclient.chat.completions.create( modeldeepseek-chat,#模型类型 messages[ {role:system,content:你是ai},#system系统设定 {role:user,content:你是谁},#user用户回答 ], streamFalse,#阻塞式 temperature0.9#自由度 ) print(...成功调用大模型...)demo2向量化选用的m3e模型该模型主要针对中文文本进行向量化处理#首先得到一个文档将文档存入faiss这个库中 #数据 documents[ ] #加载模型先检查本地是否存在 local_model_path ../local_m3e_model if os.path.exists(local_model_path): print(f从本地加载模型{local_model_path}) modelSentenceTransformer(local_model_path) else: print(f本地模型不存在从网络加载:moka-ai/m3e_model) modelSentenceTransformer(moka-ai/m3e_base) print(f保存模型到本地{local_model_path}) model.save(local_model_path) #打印加载成功 print(模型加载成功!\n) #将文档转换为向量 embeddingsmodel.encode(documents,convert_to_numpyTrue) #使用FAISS创建索引 index faiss.IndexFlatL2(embeddings.shape[1]) index.add(embeddings) #查询最相似的都向量 query界面新闻 query_embeddingmodel.encode(query,convert_to_numpyTrue) D,Iindex.search(query_embedding,k2) #k2:返回两个距离最近的 print(D) print(I)该小demo目的是为了后面将文档进行向量化以便做检索demo3文档分块文档分块策略 将长文档切割为小块chunks以提高检索精度和效果 策略 1.固定长度分块 2.按语义分块如句子、段落 3.确保块之间有一定重def chunk_document(text,max_chars500,overlap100): 将长文档切分为交小的块 参数 :param text:要切分的文本 :param max_chars: 每个块的最大字符数 :param overlap: 相邻块之间的重叠字符数 :return chunk: 切分后的文本块列表 #如果文本长度小于最大长度直接返回 if len(text)max_chars: return [text] chunks[] start0 while startlen(text): #确定当前块的结束位置 endstartmax_chars #如果没有达到文本末尾尝试在句子边界切分 if endlen(text): #在结束位置查找最近的句子结束标记 sentence_ends[ m.end() for m in re.finditer(r[。.!?]\s*,text[start:end]) ] if sentence_ends:#如果找到句子结束标记在最后一个句子结束处切分 endstartsentence_ends[-1] else:#如果没有找到尝试在单词或标点处切分 last_spacetext[start:end].rfind( ) last_punctmax(text[start:end].rfind( ),text[end:last_space].rfind( )) cut_pointmax(last_space,last_punct) if cut_point0:#如果找到了合适的切分点 endstartcut_point1 #添加当前块到结果列表 chunks.append(text[start:end]) #移动开始位置考虑重叠 startend-overlap #确保开始位置不会后退 if start0: start0 #避免无限循环 if startlen(text): break return chunks该demo用于处理长文档并且保证文档内容的连贯性以及完整性demo4处理不同格式的文档# 尝试导入不可能存在的包 try: import docx # 处理word文档的一个库 DOCX_AVAILABLE True except ImportError: DOCX_AVAILABLE False print(警告python-docx未安装无法处理.docx文件) try: import PyPDF2 PyPDF2_AVAILABLE True except ImportError: PyPDF2_AVAILABLE False print(警告PyPDF2未安装无法处理.pdf文件) def clean_text(text): 清理文本清除多余的空格和换行 if not text: return # 替换多个换行为空格 text re.sub(r\n, , text) # 替换多个空格/制表符为单个空格 text re.sub(r\s, , text) # 去除首尾空白 return text.strip() def load_text_file(file_path): 加载文本文件.txt, .md, .csv等 encodings [utf-8, gbk, gb2312, latin-1] for enc in encodings: try: with open(file_path, r, encodingenc) as f: content f.read() return clean_text(content), None except UnicodeDecodeError: continue except Exception as e: return None, f无法读取文本文件 {file_path}: {str(e)} return None, f所有编码均无法读取 {file_path} def load_pdf_file(file_path): 加载PDF文件 if not PyPDF2_AVAILABLE: return None, PyPDF2包未安装无法处理.pdf文件 try: content with open(file_path, rb) as f: # 兼容新版 PyPDF2 pdf_reader PyPDF2.PdfReader(f) for page in pdf_reader.pages: page_content page.extract_text() if page_content: content page_content \n if not content.strip(): return None, fPDF文件 {file_path} 未能提取到文本内容 return clean_text(content), None except Exception as e: return None, f读取PDF文件 {file_path} 时出错: {str(e)} def load_docx_file(file_path): 加载Word文档 (.docx) if not DOCX_AVAILABLE: return None, python-docx未安装无法处理.docx文件 try: doc docx.Document(file_path) content \n.join([para.text for para in doc.paragraphs]) if not content.strip(): return None, fDOCX文件 {file_path} 无文本内容 return clean_text(content), None except Exception as e: return None, f读取DOCX文件 {file_path} 时出错: {str(e)} def load_markdown_file(file_path): 加载markdown文件 try: with open(file_path, r, encodingutf-8) as f: md_content f.read() # 将markdown转换为HTML然后提取纯文本 html markdown.markdown(md_content) soup BeautifulSoup(html, html.parser) content soup.get_text() return clean_text(content), None except Exception as e: return None, f读取markdown文件 {file_path} 时出错: {str(e)} def load_excel_file(file_path): 加载excel文件 (.xlsx, .xls) try: # 读取所有sheet并合并 excel_file pd.ExcelFile(file_path) all_content [] for sheet_name in excel_file.sheet_names: df pd.read_excel(file_path, sheet_namesheet_name) # 去掉全空的行/列 df df.dropna(howall, axis0).dropna(howall, axis1) if df.empty: continue sheet_text f【Sheet: {sheet_name}】\n sheet_text df.to_string(indexFalse, na_rep) all_content.append(sheet_text) if not all_content: return None, Excel文件内容为空 content \n\n.join(all_content) return clean_text(content), None except Exception as e: return None, f读取Excel文件 {file_path} 时出错: {str(e)} def load_documents_from_directory(directory_path, file_typesNone): 从指定目录加载多种类型的文档 :param directory_path: 文档所在目录路径 :param file_types: 文件类型列表如[txt,pdf,docx,md,xlsx] 如果为None则加载所有支持的类型 :return: documents 文档内容列表 doc_sources 文档来源路径列表 errors 错误信息列表 # 支持的文件后缀自动识别 SUPPORTED_EXT { txt: [.txt], pdf: [.pdf], docx: [.docx], md: [.md, .markdown], excel: [.xlsx, .xls] } if file_types is None: file_types list(SUPPORTED_EXT.keys()) documents [] doc_sources [] errors [] if not os.path.isdir(directory_path): errors.append(f目录不存在{directory_path}) return documents, doc_sources, errors # 遍历所有文件 all_files glob.glob(os.path.join(directory_path, *)) for file_path in all_files: if not os.path.isfile(file_path): continue content, error None, None ext os.path.splitext(file_path)[1].lower() # 根据后缀分发处理函数 if ext .txt: content, error load_text_file(file_path) elif ext .pdf: content, error load_pdf_file(file_path) elif ext .docx: content, error load_docx_file(file_path) elif ext in [.md, .markdown]: content, error load_markdown_file(file_path) elif ext in [.xlsx, .xls]: content, error load_excel_file(file_path) else: continue # 不支持的格式跳过 # 处理结果 if content: documents.append(content) doc_sources.append(file_path) print(f 成功加载{os.path.basename(file_path)}) if error: errors.append(error) print(f {error}) if not documents: errors.append(f在目录中未找到可处理的文档{directory_path}) return documents, doc_sources, errors该demo用于处理不同格式的文档project将以上4个demo整合优化后以下就是一个简单的ragimport os import re import faiss import numpy as np from openai import OpenAI from sentence_transformers import SentenceTransformer from sympy.printing import preview from file_utils import load_documents_from_directory #加载文档 documents,sources,errorsload_documents_from_directory(./docs)#load_documents_from_directory是从本地 #显示结果 print(f\n成功加载{len(documents)}个文档) for i,(doc,source) in enumerate(zip(documents,sources)): print(f\n文档{i1}来源{source}) #只显示文档的开头部分 previewdoc[:200]...if len(doc)200 else doc print(f内容预览{preview}) #如果有错误显示错误信息 if errors: print(\n加载过程中出现以下错误) for error in errors: print(f- {error}) #调用m3e模型将文档转化为向量数组方便存储和调用 def get_embeddings(text): embeddingsmodel.encode(text,normalize_embeddingsTrue) return np.array(embeddings) #加载m3e模型 local_model_pathlocal_m3e_model if os.path.exists(local_model_path): print(f从本地加载模型:{local_model_path}) modelSentenceTransformer(local_model_path) else: print(f本地模型不存在从网络加载moka-ai/m3e_base) modelSentenceTransformer(moka-ai/m3e_base) #保存到本地以便下次使用 print(f保存模型到本地{local_model_path}) model.save(local_model_path) #打印加载成功 print(模型加载成功!\n) #文档分块 def chunk_document(text,max_chars500,overlap100): 将长文档切分为交小的块 参数 :param text:要切分的文本 :param max_chars: 每个块的最大字符数 :param overlap: 相邻块之间的重叠字符数 :return chunk: 切分后的文本块列表 #如果文本长度小于最大长度直接返回 if len(text)max_chars: return [text] chunks[] start0 while startlen(text): #确定当前块的结束位置 endstartmax_chars #如果没有达到文本末尾尝试在句子边界切分 if endlen(text): #在结束位置查找最近的句子结束标记 sentence_ends[ m.end() for m in re.finditer(r[。.!?]\s*,text[start:end]) ] if sentence_ends:#如果找到句子结束标记在最后一个句子结束处切分 endstartsentence_ends[-1] else:#如果没有找到尝试在单词或标点处切分 last_spacetext[start:end].rfind( ) last_punctmax(text[start:end].rfind( ),text[end:last_space].rfind( )) cut_pointmax(last_space,last_punct) if cut_point0:#如果找到了合适的切分点 endstartcut_point1 #添加当前块到结果列表 chunks.append(text[start:end]) #移动开始位置考虑重叠 startend-overlap #确保开始位置不会后退 if start0: start0 #避免无限循环 if startlen(text): break return chunks #文档和chunk的映射关系 document_to_chunks{} chunks_to_docunment{} all_chunks[] #索引文件路径 index_file_pathm3e_faiss_index.bin chunks_map_pathchunks_mapping.npy #判断是否已存在索引文件 if os.path.exists(index_file_path) and os.path.exists(chunks_map_path): print(f从本地加载索引和映射{index_file_path,chunks_map_path}) indexfaiss.read_index(index_file_path) #加载映射关系 mapping_datanp.load(chunks_map_path,allow_pickleTrue).item() document_to_chunksmapping_data[doc_to_chunks] chunks_to_docunmentmapping_data[chunks_to_doc] all_chunksmapping_data[all_chunks] else: print(本地索引不存在创建新索引) #处理文档并分块 for doc_id,doc in enumerate(documents):#doc_id文档编号doc文档内容对内存中的文档进行遍历 #对长文档进行分块 chunkschunk_document(doc)#调用分片函数将文档切开 #存储映射关系 document_to_chunks[doc_id][] for chunk in chunks: chunk_idlen(all_chunks) all_chunks.append(chunk) document_to_chunks[doc_id].append(chunk_id)#文档和区域块的对应关系 chunks_to_docunment[chunk_id]doc_id#区域块和文档的对应关系 #生成文档块嵌入 chunk_embeddingsget_embeddings(all_chunks)#将所有的文档块进行向量化 #初始化FAISS索引 dimensionchunk_embeddings.shape[1] indexfaiss.IndexFlatL2(dimension) index.add(chunk_embeddings)#所有的文档块存到索引中 #保存索引 faiss.write_index(index,index_file_path) #保存映射关系用字典保存 mapping_data{ doc_to_chunks:document_to_chunks,#文档-块 chunks_to_doc:chunks_to_docunment,#块-文档 all_chunks:all_chunks, } np.save(chunks_map_path,mapping_data)#将字典也保存下来 print(f索引创建并保存成功:{index_file_path}\n) #检索函数 def retrieve_docs(query,index,k5): 检索最相关的文档 :param query:查询文本 :param index: FAISS索引 :param k: 返回的相关chunk数量 :return: 按相关性排序的原始文档列表 query_embeddingget_embeddings([query]) distances,chunk_indicesindex.search(query_embedding,kk) #获取包含这些chunks的原始文档ID retrieved_doc_idsset() retrieved_chunks[] for chunk_idx in chunk_indices[0]: if chunk_idx 0 and chunk_idxlen(all_chunks):#确保索引有效 doc_idchunks_to_docunment.get(int(chunk_idx)) if doc_id is not None: retrieved_doc_ids.add(doc_id) retrieved_chunks.append((doc_id,all_chunks[int(chunk_idx)])) #获取原始文档 retrieved_docs[documents[doc_id] for doc_id in retrieved_doc_ids] #返回文档和对应的相关块 return retrieved_docs,retrieved_chunks #deepseek clientOpenAI( api_keysk-e0274bea43e445179235ddfbb87e5ada, base_urlhttps://api.deepseek.com) def generate_answer(query,retrieved_docs,retrieved_chunks): 基于检索到的文档生成回答 :param query: 用户查询 :param retrieved_docs:检索到的完整文档 :param retrieved_chunks: 检索到的文档块 :return: 生成回答 #构建上下文包含原始文档和相关块 context原始文档\n\n.join(retrieved_docs) #添加相关块信息 context\n\n相关文本块:\n for doc_id,chunk in retrieved_chunks: contextf[文档{doc_id}]{chunk}\n promptf上下文信息:\n{context}\n\n问题:{query}\n请基于上下文信息回答问题 response client.chat.completions.create( modeldeepseek-chat, messages[ {role: system, content: 你是一个专业的问答助手。请仅基于提供的上下文信息回答问题。}, {role: user, content: prompt}, ], streamFalse, ) return response.choices[0].message.content def main(): #测试 query print(Query:,query) retrieved_docs,retrieved_chunksretrieve_docs(query,index) #打印检索到的文档 print(检索到的文档:) for doc in retrieved_docs: print(doc) #打印检索到的文本块 print(\n检索到的文本块:) for doc_id,chunk in retrieved_chunks: print(f[文档{doc_id}]{chunk}) answergenerate_answer(query,retrieved_docs,retrieved_chunks) print(\nAnswer:,answer) if __name____main__: main()思考但是在实际落地场景中往往会存在检索准确率低噪音干扰多召回完整性专业性不够导致LLM幻觉严重的问题。知识库文档越来越多以后检索噪音大召回准确率不高召回不全完整性不够召回和用户问题意图相关性不大只能回答静态数据无法动态获取知识导致答疑应用比较呆比较笨。优化思路1.知识加载目的:对文档进行精确的解析优化建议1将docx、txt优先处理为pdf或markdown格式。为什么因为这两种格式的结构信息标题、段落、表格更规范工具更容易识别能减少文本提取错误。2提取文本中的表格信息。表格是结构化数据直接当成文本处理会丢失行列关系、数字含义。优化方案是把表格转成 Markdown 格式甚至提取成键值对3Markdown 和 PDF 自带标题层级比如 H1、H2、H3处理时要保留下来。后续可以基于标题层级做 “层级索引”比如用户问 “XXX 公司XXXX年的销售总额”可以先定位到 “XXXX年” 这个二级标题下的所有内容大幅缩小检索范围减少噪音。4把图片链接、公式也转成 Markdown 格式避免这些信息被丢弃。2.Chunk尽量保持完整Chunk 就是你把长文档切分成的小块文本是 RAG 里用来做向量检索的最小单位。1图片 表格单独抽取成 Chunk。把文档里的图片、表格不跟正文混在一起切分而是单独抽出来作为独立的 Chunk同时把图片 / 表格的标题、说明存到 Chunk 的metadata元数据里。2按标题层级 / Markdown Header 拆分保留Chunk的完整性3如果文档里有固定的自定义分隔符比如---、###、章节分隔线等可以基于这些分隔符来切分。3.多元化信息抽取4.知识处理工作流5.静态知识RAG优化6.动态知识RAG优化
http://www.zskr.cn/news/1376105.html

相关文章:

  • 引力波透镜探测:参数偏移与似然比检验的统计框架与应用
  • 从CentOS迁移到openEuler?手把手教你在vSphere ESXi 7.0上搭建测试环境
  • 用信息架构拆解豪芬车载香薰官网
  • 2026年学习Java还有前景吗?如何看待2026Java程序员就业难现状?
  • 机器学习优化活性粒子信息引擎:突破热力学极限的非平衡控制
  • 基于BERT与LSTM的抽取式新闻摘要实战:从原理到实现
  • 深度学习与神经网络学习笔记 —— 卷积神经网络(CNN)基础
  • Week 1:机器学习入门与核心框架
  • GHelper终极指南:华硕笔记本轻量控制工具的专业使用教程
  • 别只盯着烘焙!深入理解Unity URP中反射球与屏幕空间反射的实战抉择与配置
  • Codex适配国产信创环境安装部署与技术适配全解析
  • 数据集上新:柬埔寨环境健康入户调查
  • 内存池仿Nginx C++实现
  • 基于CRISP-DM与HMM的国有企业内部威胁安全成熟度评估框架
  • 从安装到卸载:我在macOS Big Sur上使用雷蛇雷云2.0驱动的完整踩坑记录
  • Type-C接口水冷散热器
  • 2026照片去水印免费软件全攻略:一看就会的保姆级教程,赶紧收藏
  • 从PDB到Mol:手把手教你用PyMOL和Open Babel搞定蛋白质-小分子复合物的结构文件转换
  • 鸿蒙PC:Qt适配OpenHarmony实战【番茄刻】:工作和休息两种倒计时如何写成一个 QML 状态机
  • 手把手教你:把Ubuntu 20.04完整系统塞进U盘,打造随身便携开发环境
  • 如何快速配置Windows任务栏透明美化:TranslucentTB新手完整入门指南
  • YOLOv13涨点改进| TGRS 2026|独家创新首发、特征融合改进篇| 引入CGIM 通道组交互融合模块,增强目标关联信息的建模,助力目标检测、遥感目标检测、双时相遥感变化检测、图像融合有效涨点
  • YOLOv13涨点改进| TGRS 2026 |独家创新首发、特征融合改进篇| 引入SGAM空间高斯注意力融合模块,助力YOLOv13模型目标检测、遥感目标检测、双时相遥感变化检测、图像分割有效涨点
  • Unity商业游戏逆向解剖:天命6源码的真实结构与设计逻辑
  • 鸿蒙数学 108 篇 第十五篇:阴阳对称运算规则
  • 医学影像AI迁移学习:如何科学选择预训练数据集?
  • 猫抓:5步掌握网页资源嗅探工具,轻松下载全网视频
  • G-Helper深度解析:华硕笔记本性能调优实战手册
  • 量子生成模型:原理、优势与应用场景解析
  • RePKG深度技术解析:逆向工程驱动的Wallpaper Engine资源处理框架