rag 系统的基本搭建流程-爱代码爱编程
一、RAG搭建流程
1、文档加载,并按一定条件切割成片段
2、将切割的文本片段灌入检索引擎
3、封装检索接口
4、构建调用流程:Query -> 检索 -> Prompt -> LLM -> 回复
1.1、文档的加载与切割
(1)安装 pdf 解析库
!pip install pdfminer.six
(2)pdf文档切割
from pdfminer.high_level import extract_pages
from pdfminer.layout import LTTextContainer
def extract_text_from_pdf(filename, page_numbers=None, min_line_length=5):
'''从 PDF 文件中(按指定页码)提取文字'''
paragraphs = []
buffer = ''
full_text = ''
# 提取全部文本
for i, page_layout in enumerate(extract_pages(filename)):
# 如果指定了页码范围,跳过范围外的页
if page_numbers is not None and i not in page_numbers:
continue
for element in page_layout:
if isinstance(element, LTTextContainer):
full_text += element.get_text() + '\n'
# 按空行分隔,将文本重新组织成段落
lines = full_text.split('\n')
for text in lines:
if len(text) >= min_line_length:
buffer += (' '+text) if not text.endswith('-') else text.strip('-')
elif buffer:
paragraphs.append(buffer)
buffer = ''
if buffer:
paragraphs.append(buffer)
return paragraphs
paragraphs = extract_text_from_pdf("data.pdf", min_line_length=5)#llama2
for para in paragraphs[:15]:
print(para+"\n")
1.2、文档的加载与切割
(1)安装 ES 客户端
!pip install elasticsearch7
推荐第二种:线下下载,通过网盘分享的文件:elasticsearch-8.14.3.7z
链接: https://pan.baidu.com/s/1cVi1Y7YpT1iOjRrpVMdgQQ 提取码: f3qd
运行bin文件下的elasticsearch.bat启动
(2)安装 NLTK(文本处理方法库)
!pip install nltk
from elasticsearch7 import Elasticsearch, helpers
from nltk.stem import PorterStemmer
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
import nltk
import re
import warnings
warnings.simplefilter("ignore") # 屏蔽 ES 的一些Warnings
(3)to_keywords只针对英文
def to_keywords(input_string):
'''(英文)文本只保留关键字'''
# 使用正则表达式替换所有非字母数字的字符为空格
no_symbols = re.sub(r'[^a-zA-Z0-9\s]', ' ', input_string)
word_tokens = word_tokenize(no_symbols)
# 加载停用词表
stop_words = set(stopwords.words('english'))
ps = PorterStemmer()
# 去停用词,取词根
filtered_sentence = [ps.stem(w)
for w in word_tokens if not w.lower() in stop_words]
return ' '.join(filtered_sentence)
针对中文
import re
import jieba
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')
def to_keywords(input_string):
"""将句子转成检索关键词序列"""
# 按搜索引擎模式分词
word_tokens = jieba.cut_for_search(input_string)
# 加载停用词表
stop_words = set(stopwords.words('chinese'))
# 去除停用词
filtered_sentence = [w for w in word_tokens if not w in stop_words]
return ' '.join(filtered_sentence)
def sent_tokenize(input_string):
"""按标点断句"""
# 按标点切分
sentences = re.split(r'(?<=[。!?;?!])', input_string)
# 去掉空字符串
return [sentence for sentence in sentences if sentence.strip()]
if "__main__" == __name__:
# 测试关键词提取
print(to_keywords("小明硕士毕业于中国科学院计算所,后在日本京都大学深造"))
# 测试断句
print(sent_tokenize("这是,第一句。这是第二句吗?是的!啊"))
(4)将文本灌入检索引擎
import os, time
import nltk
nltk.download('punkt')
nltk.download('stopwords')
# 引入配置文件
# ELASTICSEARCH_BASE_URL = os.getenv('ELASTICSEARCH_BASE_URL')
# ELASTICSEARCH_PASSWORD = os.getenv('ELASTICSEARCH_PASSWORD')
# ELASTICSEARCH_NAME= os.getenv('ELASTICSEARCH_NAME')
# tips: 如果想在本地运行,请在下面一行 print(ELASTICSEARCH_BASE_URL) 获取真实的配置
#1. 创建Elasticsearch连接
es = Elasticsearch(
hosts=["127.0.0.1:9200"], # 服务地址与端口
http_auth=('elastic','elastic'), # 用户名,密码
)
# 2. 定义索引名称
index_name = "teacher_demo_index1"
# 3. 如果索引已存在,删除它(仅供演示,实际应用时不需要这步)
if es.indices.exists(index=index_name):
es.indices.delete(index=index_name)
# 4. 创建索引
es.indices.create(index=index_name)
# 5. 灌库指令
actions = [
{
"_index": index_name,
"_source": {
"keywords": to_keywords(para),
"text": para
}
}
for para in paragraphs
]
# 6. 文本灌库
helpers.bulk(es, actions)
# 灌库是异步的
time.sleep(2)
(5)实现关键字检索
def search(query_string, top_n=1):
# ES 的查询语言
search_query = {
"match": {
"keywords": to_keywords(query_string)
}
}
res = es.search(index=index_name, query=search_query, size=top_n)
return [hit["_source"]["text"] for hit in res["hits"]["hits"]]
results = search("截至到 2017 年底,北京市新能源车辆保有量为多少辆,较上年增长多少?")#2020 年全国电动汽车保有量将超过多少辆
for r in results:
print(r+"\n")
1.3、 LLM 接口封装
from openai import OpenAI
import os
# 加载环境变量
# from dotenv import load_dotenv, find_dotenv
# _ = load_dotenv(find_dotenv()) # 读取本地 .env 文件,里面定义了 OPENAI_API_KEY
# client = OpenAI()
client = OpenAI(api_key="****",base_url='***')# 默认使用环境变量中的 OPENAI_API_KEY 和 OPENAI_BASE_URL
def get_completion(prompt, model="gpt-3.5-turbo"):
'''封装 openai 接口'''
messages = [{"role": "user", "content": prompt}]
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=0, # 模型输出的随机性,0 表示随机性最小
)
return response.choices[0].message.content
def build_prompt(prompt_template, **kwargs):
'''将 Prompt 模板赋值'''
inputs = {}
for k, v in kwargs.items():
if isinstance(v, list) and all(isinstance(elem, str) for elem in v):
val = '\n\n'.join(v)
else:
val = v
inputs[k] = val
return prompt_template.format(**inputs)
prompt模板
prompt_template = """
你是一个问答机器人。
你的任务是根据下述给定的已知信息回答用户问题。
已知信息:
{context}
用户问:
{query}
如果已知信息不包含用户问题的答案,或者已知信息不足以回答用户的问题,请直接回复"我无法回答您的问题"。
请不要输出已知信息中不包含的信息或答案。
请用中文回答用户问题。
"""
user_query = "截至到 2017 年底,北京市新能源车辆保有量为多少辆,较上年增长多少?"
# 1. 检索
search_results = search(user_query, 3)
# 2. 构建 Prompt
prompt = build_prompt(prompt_template, context=search_results, query=user_query)
# print("===Prompt===")
# print(prompt)
# 3. 调用 LLM
response = get_completion(prompt)
print("===回复===")
print(response)