目标

初步认识 RAG。学习基本RAG的实现方式。


索引数据

第一步,我们用Loader载入文档

1
2
3
4
path = "./test"
text_loader_kwargs = {'autodetect_encoding': True}
loader = DirectoryLoader(path, glob="**/*.txt", loader_cls=TextLoader, loader_kwargs=text_loader_kwargs,show_progress=True)
docs = loader.load()

第二部,我们分割文档

1
2
3
4
5
6
7
8
9
10
11
text_splitter = RecursiveCharacterTextSplitter(
# Set a really small chunk size, just to show.
chunk_size=500,
chunk_overlap=0,
length_function=len,
)

doc_list = []
for doc in docs:
tmp_docs = text_splitter.create_documents([doc.page_content])
doc_list += tmp_docs

第三步,我们向量化文档,并保存到向量数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 使用千帆 embedding bge_large_zh 模块
embeddings_model = QianfanEmbeddingsEndpoint(model="bge_large_zh", endpoint="bge_large_zh")

# 使用ElasticSearch 数据库
vectorstore = ElasticsearchStore(
es_url=os.environ['ELASTIC_HOST_HTTP'],
index_name="index_sd_1024_vectors",
embedding=embeddings_model,
es_user="elastic",
vector_query_field='question_vectors',
es_password=os.environ['ELASTIC_ACCESS_PASSWORD']
)
# 将本文加入数据库(注意,只需要运行一次;多次运行会有冗余数据)
vectorstore.add_documents(doc_list)

检索和生成、

到langsmith上抓取提示词模板

prompt = hub.pull(“rlm/rag-prompt”)

【1】点击hug 图标 【2】查询关键词

image-20240627101543526

image-20240627101550907

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 得到向量数据库的一个【检索器】
retriever = vectorstore.as_retriever()

# 【2】 建立查询连
chat = QianfanChatEndpoint(model="ERNIE-Bot-4")

template = """Use the following pieces of context to answer the question at the end.
If you don't know the answer, just say that you don't know, don't try to make up an answer.
Use three sentences maximum and keep the answer as concise as possible.
Always say "thanks for asking!" at the end of the answer.

{context}

Question: {question}

Helpful Answer:"""
custom_rag_prompt = PromptTemplate.from_template(template)

# 可以直接从langsmith 获取
#custom_rag_prompt = hub.pull("rlm/rag-prompt")

# 将文档连接成字符串
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)

# 检索 + 回答 chain
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| custom_rag_prompt
| chat
| StrOutputParser()
)

go_on = True
while go_on:
query_text = input("你的问题: ")

if 'exit' in query_text:
break

print("AI需要回答的问题 [{}]\n".format(query_text))
res = rag_chain.invoke(query_text)
print(res)

什么?就这?我都会了!

用Langserve 打开我的第一个RAG

LangServe帮助开发者将LangChain可运行程序和链部署为REST API。

功能特点

  • 从您的LangChain对象自动推断输入和输出模式,并在每次API调用时强制执行,提供丰富的错误消息
  • 带有JSONSchema和Swagger的API文档页面(插入示例链接)
  • 高效的支持单个服务器上许多并发请求的/invoke/、/batch/和/stream/端点 /stream_log/端点,用于流式传输链/代理的所有(或部分)中间步骤
  • 从0.0.40版本开始支持astream_events,使流式传输更加容易,无需解析stream_log的输出。 /playground/页面具有流式输出和中间步骤的游乐场
  • 内置(可选)跟踪到LangSmith,只需添加您的API密钥(参见说明)
  • 所有这些都是使用经过实战测试的开源Python库构建的,如FastAPI、Pydantic、uvloop和asyncio。 使用客户端SDK调用LangServe服务器,就像它是本地运行的可运行程序一样(或者直接调用HTTP API) LangServe中心

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 1. 定义APP
app = FastAPI(
title="LangChain Server",
version="1.0",
description="A simple API server using LangChain's Runnable interfaces",
)

# 2. 将chain加入 服务器的rag_chain 路径
add_routes(
app,
rag_chain,
path="/rag_chain",
)

add_routes(
app,
retriever,
path="/rag_docs",
)

if __name__ == "__main__":
import uvicorn
# 访问 http://localhost:8001/rag_chain/playground/
uvicorn.run(app, host="localhost", port=8001)

通过Agent实现RAG

  • 之前的做法:主动控制数据的流向,硬编码

    问题→retriever→doc处理→生成提示词→大模型处理→数据解析→用户

  • 通过Agents:交给大模型自己判断,依赖模型

    问题→Agents→用户

image-20240627101606370

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
tool = create_retriever_tool(
retriever,
"search_mojuan_docs",
"搜索粥余知识库中的文档并返回",
)
tools = [tool]

prompt = hub.pull("hwchase17/openai-tools-agent")
prompt.messages

# 定义agent
agent = create_openai_tools_agent(chat, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools)

go_on = True
while go_on:
query_text = input("你的问题: ")

if 'exit' in query_text:
break

print("AI需要回答的问题 [{}]\n".format(query_text))

result = agent_executor.invoke({"input": query_text}) # <---- 调用

print(result)

Agent 对提示词敏感,不提示“粥余知识库”,就不会调用工具

image-20240627101617176

修改问题,明确在“粥余知识库”中搜索,才会使用retriever工具

【1】识别到需要调用工具(retriever)

【2】在数据库中召回文档

【3】根据文档回答

image-20240627101641116

完整代码

命令行RAG

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import os
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.chat_models import QianfanChatEndpoint
from langchain_community.document_loaders import TextLoader, DirectoryLoader
from langchain_community.embeddings import QianfanEmbeddingsEndpoint
from langchain import hub
from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough

if __name__ == '__main__':

# 【1】 建立文本的向量数据库
# 【1-1】 载入数据
path = "./test"
text_loader_kwargs = {'autodetect_encoding': True}
loader = DirectoryLoader(path, glob="**/*.txt", loader_cls=TextLoader, loader_kwargs=text_loader_kwargs,
show_progress=True)
docs = loader.load()
pass

# 【1-2】 分割数据
text_splitter = RecursiveCharacterTextSplitter(
# Set a really small chunk size, just to show.
chunk_size=400,
chunk_overlap=0,
length_function=len,
)

# 分割Document 对象
doc_list = []
for doc in docs:
tmp_docs = text_splitter.create_documents([doc.page_content])
doc_list += tmp_docs

# 文本向量化,存入数据库
# 使用千帆 embedding bge_large_zh 模块
# 远程百度调用
os.environ["QIANFAN_ACCESS_KEY"] = os.getenv('MY_QIANFAN_ACCESS_KEY')
os.environ["QIANFAN_SECRET_KEY"] = os.getenv('MY_QIANFAN_SECRET_KEY')
embeddings_model = QianfanEmbeddingsEndpoint(model="bge_large_zh", endpoint="bge_large_zh")

vectorstore = ElasticsearchStore(
es_url=os.environ['ELASTIC_HOST_HTTP'],
index_name="index_sd_1024_vectors",
embedding=embeddings_model,
es_user="elastic",
vector_query_field='question_vectors',
es_password=os.environ['ELASTIC_ACCESS_PASSWORD']
)
# 将本文加入数据库(注意,只需要运行一次;多次运行会有冗余数据)
# vectorstore.add_documents(doc_list)

# 得到向量数据库的一个【检索器】
retriever = vectorstore.as_retriever()

# 【2】 建立查询连
chat = QianfanChatEndpoint(model="ERNIE-Bot-4")

template = """Use the following pieces of context to answer the question at the end.
If you don't know the answer, just say that you don't know, don't try to make up an answer.
Use three sentences maximum and keep the answer as concise as possible.
Always say "thanks for asking!" at the end of the answer.

{context}

Question: {question}

Helpful Answer:"""
custom_rag_prompt = PromptTemplate.from_template(template)

# 可以直接从langsmith 获取
# custom_rag_prompt = hub.pull("rlm/rag-prompt")

# 将文档连接成字符串
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)

# 检索 + 回答 chain
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| custom_rag_prompt
| chat
| StrOutputParser()
)

go_on = True
while go_on:
query_text = input("你的问题: ")

if 'exit' in query_text:
break

print("AI需要回答的问题 [{}]\n".format(query_text))
res = rag_chain.invoke(query_text)
print(res)

Langserve RAG

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import os

from fastapi import FastAPI
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.chat_models import QianfanChatEndpoint
from langchain_community.document_loaders import TextLoader, DirectoryLoader
from langchain_community.embeddings import QianfanEmbeddingsEndpoint
from langchain import hub
from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langserve import add_routes

# 使用千帆 embedding bge_large_zh 模块
# 远程百度调用
os.environ["QIANFAN_ACCESS_KEY"] = os.getenv('MY_QIANFAN_ACCESS_KEY')
os.environ["QIANFAN_SECRET_KEY"] = os.getenv('MY_QIANFAN_SECRET_KEY')
embeddings_model = QianfanEmbeddingsEndpoint(model="bge_large_zh", endpoint="bge_large_zh")

vectorstore = ElasticsearchStore(
es_url=os.environ['ELASTIC_HOST_HTTP'],
index_name="index_sd_1024_vectors",
embedding=embeddings_model,
es_user="elastic",
vector_query_field='question_vectors',
es_password=os.environ['ELASTIC_ACCESS_PASSWORD']
)
# 得到向量数据库的一个【检索器】
retriever = vectorstore.as_retriever()

# 建立查询chain
chat = QianfanChatEndpoint(model="ERNIE-Bot-4")

template = """Use the following pieces of context to answer the question at the end.
If you don't know the answer, just say that you don't know, don't try to make up an answer.
Use three sentences maximum and keep the answer as concise as possible.
Always say "thanks for asking!" at the end of the answer.

{context}

Question: {question}

Helpful Answer:"""
custom_rag_prompt = PromptTemplate.from_template(template)

# 可以直接从langsmith 获取
# custom_rag_prompt = hub.pull("rlm/rag-prompt")

# 将文档连接成字符串
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)

# 检索 + 回答 chain
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| custom_rag_prompt
| chat
| StrOutputParser()
)
# 1. 定义APP
app = FastAPI(
title="LangChain Server",
version="1.0",
description="A simple API server using LangChain's Runnable interfaces",
)

# 2. 将chain加入 服务器的rag_chain 路径
add_routes(
app,
rag_chain,
path="/rag_chain",
)

add_routes(
app,
retriever,
path="/rag_docs",
)

if __name__ == "__main__":
import uvicorn
# 访问 http://localhost:8001/rag_chain/playground/
uvicorn.run(app, host="localhost", port=8001)

Langserve client

1
2
3
4
5
6
7
from langserve import RemoteRunnable

if __name__ == '__main__':
remote_runnable = RemoteRunnable("http://localhost:8001/rag_docs")

docs = remote_runnable.invoke("巴菲特")
pass

Agent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import os
from uuid import uuid4

from langchain.agents import create_openai_tools_agent, AgentExecutor
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.tools.retriever import create_retriever_tool
from langchain_community.chat_models import QianfanChatEndpoint
from langchain_community.chat_models.azure_openai import AzureChatOpenAI
from langchain_community.document_loaders import TextLoader, DirectoryLoader
from langchain_community.embeddings import QianfanEmbeddingsEndpoint
from langchain import hub
from langchain_community.vectorstores.elasticsearch import ElasticsearchStore
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough

# Azure Openai
os.environ["AZURE_OPENAI_API_KEY"] = os.getenv('MY_AZURE_OPENAI_API_KEY')
os.environ["AZURE_OPENAI_ENDPOINT"] = os.getenv('MY_AZURE_OPENAI_ENDPOINT')
DEPLOYMENT_NAME_GPT3P5 = os.getenv('MY_DEPLOYMENT_NAME_GPT3P5')
chat = AzureChatOpenAI(
openai_api_version="2023-05-15",
azure_deployment=DEPLOYMENT_NAME_GPT3P5,
)

# Langsmith 配置,不用可注掉
unique_id = uuid4().hex[0:8]
os.environ["LANGCHAIN_PROJECT"] = f" Agent RAG - {unique_id}"
# os.environ["LANGCHAIN_TRACING_V2"] = 'true'
os.environ["LANGCHAIN_API_KEY"] = os.getenv('MY_LANGCHAIN_API_KEY')

if __name__ == '__main__':

# 使用千帆 embedding bge_large_zh 模块
os.environ["QIANFAN_ACCESS_KEY"] = os.getenv('MY_QIANFAN_ACCESS_KEY')
os.environ["QIANFAN_SECRET_KEY"] = os.getenv('MY_QIANFAN_SECRET_KEY')
embeddings_model = QianfanEmbeddingsEndpoint(model="bge_large_zh", endpoint="bge_large_zh")

# 使用ElasticSearch
vectorstore = ElasticsearchStore(
es_url=os.environ['ELASTIC_HOST_HTTP'],
index_name="index_sd_1024_vectors",
embedding=embeddings_model,
es_user="elastic",
vector_query_field='question_vectors',
es_password=os.environ['ELASTIC_ACCESS_PASSWORD']
)

# 得到向量数据库的一个【检索器】
retriever = vectorstore.as_retriever()

tool = create_retriever_tool(
retriever,
"search_mojuan_docs",
"搜索粥余知识库中的文档并返回",
)
tools = [tool]

prompt = hub.pull("hwchase17/openai-tools-agent")
prompt.messages


agent = create_openai_tools_agent(chat, tools, prompt)
agent_executor = AgentExecutor(agent=agent, tools=tools)

go_on = True
while go_on:
query_text = input("你的问题: ")

if 'exit' in query_text:
break

print("AI需要回答的问题 [{}]\n".format(query_text))

result = agent_executor.invoke({"input": query_text})

print(result)