基于DeepSeek-R1的问答机器人

在人工智能 (AI) 日益融入我们日常生活的时代,一个问题仍然处于最前沿:隐私。尽管基于云的 AI 系统功能强大,但它们通常伴随着重大的权衡——你的数据在远程服务器上处理,这引发了对安全性和机密性的质疑。对于许多应用程序,尤其是那些处理敏感信息的应用程序,这根本是不可接受的。如果我们能够利用人工智能的力量,同时将所有内容保持在本地,确保你的数据永远不会离开您的机器,那会怎样?这就是本地 AI 的闪光点,也是我们即将探索的项目的基础。

Deepseek R1 是一种尖端推理模型,旨在用类似人类的逻辑解决复杂问题。与仅专注于模式识别的传统语言模型不同,Deepseek R1 是为思考而构建的。它不只是检索信息,还会对其进行推理,使其成为问答 (Q&A) 和检索增强生成 (RAG) 应用程序的理想选择。但为什么推理在这些情况下如此重要?在问答系统中,用户不只是想要答案;他们想要富有洞察力、上下文感知的响应,以表明理解。像 Deepseek R1 这样的推理模型弥合了原始数据和有意义的答案之间的差距,使其成为 AI 驱动交互的游戏规则改变者。

在这个项目中,我们将结合本地 AI 的隐私与 Deepseek R1 的智能,创建一个完全本地化、推理驱动的问答机器人。使用 Ollama 执行本地模型、使用 ChromaDB 高效存储和检索文档以及使用 Streamlit 提供用户友好界面,我们将构建一个系统,让您可以加载文档、提出问题并接收答案 — 同时保证数据安全。但这还不是全部。该应用程序还可以让你看到模型的思考过程以及它用于生成响应的源文本,从而提供透明度并深入了解 AI 的工作原理。

读完本文后,你将清楚地了解如何构建自己的以隐私为中心的问答机器人,以及如何利用 Ollama 和 ChromaDB 等现代工具来创建强大的本地 AI 解决方案。最终的应用程序将如下所示:

1、深入研究项目架构

在深入研究代码之前,让我们快速介绍一下启动和运行该项目的设置要求:

Ollama:你需要在本地运行 Ollama来使用两个关键模型:

  • deepseek-r1:支持问答功能的以推理为中心的语言模型。
  • nomic-embed-text:用于标记和检索相关文档块的嵌入模型。

为了为该项目设置 ollama,只需下载应用程序并使用以下命令:

ollama pull deepseek-r1
ollama pull nomic-embed-text
ollama serve

这些命令将下载 deepseek-R1 7b 用于推理和 nomic-embed-text 用于从我们的文档创建嵌入。最后一个命令将创建一个本地服务器供我们与这些模型交互。

Python 依赖项:该项目依赖于一些 Python 库,你可以通过 requirements.txt 安装这些库。其中包括用于矢量存储的 chromadb、用于 UI 的 streamlit 以及其他支持工具。该项目的要求文件位于文章末尾的 github 链接中。

可以通过以下方式加载所需的模块:

pip install requirements.txt

设置环境后,项目将遵循简单而强大的工作流程:文档提取 → 检索 → 问答 → 用户界面。每个步骤都旨在确保隐私、效率和智能推理。让我们分解一下:

  • 文档提取:你的文件在 ChromaDB 中本地加载、处理和存储,这是一个矢量数据库,可确保你的数据永远不会离开你的机器。
  • 检索:当你提出问题时,应用程序会使用语义搜索从数据库中检索最相关的文本块。
  • 问答:检索到的块将传递给 Deepseek R1,后者处理信息并生成智能的上下文感知答案。
  • 用户界面:Streamlit 应用程序将所有内容整合在一起,提供简洁直观的界面,用于与机器人交互、提问和探索模型的推理过程。

这种模块化架构不仅可以确保隐私,还可以使系统灵活且易于扩展。让我们详细探索每个组件。

我们将从摄取过程开始。ingest.py 将是这样的:

import os
import uuid
from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OllamaEmbeddings

def process_documents(docs_dir: str = "documents"):
    # Initialize embeddings and text splitter
    embeddings = OllamaEmbeddings(model="nomic-embed-text")
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=3000,
        chunk_overlap=500,
        length_function=len,
        is_separator_regex=False,
    )

    # Process PDF files
    pdf_files = [f for f in os.listdir(docs_dir) if f.endswith(".pdf")]
    if not pdf_files:
        raise ValueError(f"No PDF files found in {docs_dir}")

    all_docs = []
    for pdf_file in pdf_files:
        file_path = os.path.join(docs_dir, pdf_file)
        loader = PyPDFLoader(file_path)
        pages = loader.load()
        
        # Add metadata to each page
        for page_num, page in enumerate(pages, start=1):
            page.metadata.update({
                "source": pdf_file,
                "page_number": page_num,
                "chunk_id": str(uuid.uuid4())[:8]
            })
        
        # Split pages into chunks
        chunks = text_splitter.split_documents(pages)
        all_docs.extend(chunks)

    # Create/update vector store
    Chroma.from_documents(
        documents=all_docs,
        embedding=embeddings,
        persist_directory="chroma_db",
        collection_metadata={"hnsw:space": "cosine"},
        collection_name="main_collection"
    )
  • PyPDFLoader:用于从 PDF 文件加载和提取文本。
  • RecursiveCharacterTextSplitter:将文本拆分为较小的块进行处理。
  • Chroma:存储已处理文档的矢量数据库。
  • OllamaEmbeddings:使用 nomic-embed-text 模型为文本生成嵌入。
  • uuid:为文档块生成唯一标识符。

初始化嵌入和文本分割器:

embeddings = OllamaEmbeddings(model="nomic-embed-text")
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=3000,
    chunk_overlap=500,
    length_function=len,
    is_separator_regex=False,
)

OllamaEmbeddings:使用 nomic-embed-text 模型生成嵌入。此模型通过 Ollama 在本地运行,确保隐私。

RecursiveCharacterTextSplitter:

  • chunk_size=3000:将文本分割成 3000 个字符的块。
  • chunk_overlap=500:确保块之间有 500 个字符的重叠以保持上下文
  • length_function=len:使用 Python 的 len 函数计算块大小。
  • is_separator_regex=False:表示文本分割器不使用正则表达式进行分割。

加载和处理 PDF 文件:

pdf_files = [f for f in os.listdir(docs_dir) if f.endswith(".pdf")]
if not pdf_files:
    raise ValueError(f"No PDF files found in {docs_dir}")

all_docs = []
for pdf_file in pdf_files:
    file_path = os.path.join(docs_dir, pdf_file)
    loader = PyPDFLoader(file_path)
    pages = loader.load()
  • 脚本检​​查指定目录中的 PDF 文件。如果未找到任何文件,则会引发错误。
  • PyPDFLoader:加载每个 PDF 文件并将其页面提取为单独的文档,以创建关于每个页面的单独元数据
  • pages:Document 对象列表,其中每个对象代表 PDF 中的一页。

将元数据添加到页面:

for page_num, page in enumerate(pages, start=1):
    page.metadata.update({
        "source": pdf_file,
        "page_number": page_num,
        "chunk_id": str(uuid.uuid4())[:8]
    })
  • 每个页面都包含元数据,用于查找文本块的源文件:
  • source:PDF 文件的名称。
  • page_number:PDF 中的页码。
  • chunk_id:块的唯一标识符(截断的 UUID)。

将页面拆分为块:

chunks = text_splitter.split_documents(pages)
all_docs.extend(chunks)
  • RecursiveCharacterTextSplitter 根据指定的 chunk_sizechunk_overlap 将每个页面拆分为较小的块。
  • 这些块将添加到 all_docs 列表中,该列表累积了所有已处理的文档。

将文档存储在 ChromaDB 中:

Chroma.from_documents(
    documents=all_docs,
    embedding=embeddings,
    persist_directory="chroma_db",
    collection_metadata={"hnsw:space": "cosine"},
    collection_name="main_collection"
)

Chroma.from_documents:

  • 将所有处理过的文档导入 ChromaDB。
  • 使用 nomic-embed-text 嵌入对文本进行矢量化。
  • 将数据库本地保存在 chroma_db 目录中。
  • 使用余弦相似度配置集合以进行检索 (hnsw:space": "cosine")。
  • 将集合命名为“main_collection”。

使用 ingest.py 构建摄取过程后,我们可以继续根据问题检索存储的文档。我们将为此使用 retreive.py 文件:

import os

from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OllamaEmbeddings

class DocumentRetriever:
    def __init__(self):
        self.embeddings = OllamaEmbeddings(model="nomic-embed-text")
        self.persist_dir = "chroma_db"
        
        # Create directory if it doesn't exist
        os.makedirs(self.persist_dir, exist_ok=True)
        
        # Initialize with empty collection if needed
        self.vector_store = Chroma(
            persist_directory=self.persist_dir,
            embedding_function=self.embeddings,
            collection_name="main_collection"  # Fixed collection name
        )
        
        # Workaround for Chroma's empty DB issue
        if not self.vector_store.get()['documents']:
            self.vector_store.add_texts(["Initial empty document"])
            self.vector_store.delete(ids=["0"])  # Remove placeholder
    
    def query_documents(self, query: str, k: int = 5):
        results = self.vector_store.similarity_search_with_score(query, k=k)
        
        formatted_results = []
        for doc, score in results:
            metadata = doc.metadata
            formatted_results.append({
                "text": doc.page_content,
                "source": metadata["source"],
                "page": metadata["page_number"],
                "chunk_id": metadata["chunk_id"],
                "score": float(score)
            })
        
        return formatted_results

导入和依赖项:

import os
from langchain_community.vectorstores import Chroma
from langchain_community.embeddings import OllamaEmbeddings
  • Chroma:用于与 ChromaDB 向量存储交互。
  • OllamaEmbeddings:使用 nomic-embed-text 模型为查询生成嵌入。
  • os:用于目录操作(例如,如果不存在,则创建 chroma_db 目录)。

DocumentRetreiver 类的 __init__ 方法:

def __init__(self):
    self.embeddings = OllamaEmbeddings(model="nomic-embed-text")
    self.persist_dir = "chroma_db"
    
    # Create directory if it doesn't exist
    os.makedirs(self.persist_dir, exist_ok=True)
    
    # Initialize with empty collection if needed
    self.vector_store = Chroma(
        persist_directory=self.persist_dir,
        embedding_function=self.embeddings,
        collection_name="main_collection"  # Fixed collection name
    )
    
    # Workaround for Chroma's empty DB issue
    if not self.vector_store.get()['documents']:
        self.vector_store.add_texts(["Initial empty document"])
        self.vector_store.delete(ids=["0"])  # Remove placeholder
  • self.embeddings: 使用 nomic-embed-text 模型初始化 OllamaEmbeddings用于生成查询嵌入。
  • self.persist_dir:指定存储 ChromaDB 数据的目录 (chroma_db)。
  • os.makedirs:确保 chroma_db 目录存在,如有必要则创建该目录。
  • self.vector_store:使用指定的 persist_directory、embedding_function 和collection_name 初始化 ChromaDB 向量存储。
  • 查询空集合时,ChromaDB 可能会引发错误。为了避免这种情况,脚本会添加一个占位符文档并立即将其删除。这确保了 collection 已正确初始化。

query_documents 方法:

def query_documents(self, query: str, k: int = 5):
    results = self.vector_store.similarity_search_with_score(query, k=k)
    
    formatted_results = []
    for doc, score in results:
        metadata = doc.metadata
        formatted_results.append({
            "text": doc.page_content,
            "source": metadata["source"],
            "page": metadata["page_number"],
            "chunk_id": metadata["chunk_id"],
            "score": float(score)
        })
    
    return formatted_results
  • query:用户的查询字符串。
  • k:要检索的相关块数(默认值为 5)。
  • similarity_search_with_score:查询 ChromaDB 向量存储中与查询语义相似的文档。返回一个元组列表,其中每个元组包含一个 Document 对象及其相似度得分。
  • formatted_results:将原始结果处理为更方便用户的格式。每个结果包括:
    text:文档块的内容。source:块来自的 PDF 文件的名称。page:PDF 中的页码。chunk_id:块的唯一标识符。score:相似度得分。
  • 返回值:一个字典列表,每个字典代表一个检索到的文档块。

现在我们已经完成了与文档相关的任务,我们准备构建问答链,它将从数据库中检索相关文档并将其与我们的问题结合起来以获得答案和模型在创建该结果时的思维过程。我们将为此使用 qa.py 文件:

from retrieve import DocumentRetriever
import ollama
import regex as re

class QAPipeline:
    def __init__(self):
        self.retriever = DocumentRetriever()
        
    PROMPT_TEMPLATE = """Context information:
        {context}
        Using the context above and your general knowledge, answer this question:
        Question: {question}
        Format requirements:
        - If uncertain, say "The documents don't specify"""

    def parse_response(self, response: str) -> dict:
        """Extract thinking and answer components without <answer> tags"""
        # Extract thinking process
        think_match = re.search(r'<think>(.*?)</think>', response, re.DOTALL)
        
        # Get everything AFTER </think> as the answer
        answer_start = response.find('</think>') + len('</think>')
        answer = response[answer_start:].strip()
        
        return {
            "thinking": think_match.group(1).strip() if think_match else "",
            "answer": answer,
            "raw_response": response
        }

    def generate_answer(self, question: str, k: int = 5) -> dict:
        """Full QA workflow with enhanced output"""
        try:
            # Retrieve documents
            context_docs = self.retriever.query_documents(question, k=k)
            
            # Format context preserving full metadata
            context_str = "\n".join(
                f"[Document {idx+1}] {doc['source']} (Page {doc['page']}):\n{doc['text']}"
                for idx, doc in enumerate(context_docs)
            )
            
            # Generate response
            response = ollama.generate(
                model="deepseek-r1:latest",
                prompt=self.PROMPT_TEMPLATE.format(
                    context=context_str,
                    question=question
                )
            )
            
            # Parse components
            parsed = self.parse_response(response['response'])
            
            return {
                **parsed,
                "sources": [
                    {
                        "source": doc["source"],
                        "page": doc["page"],
                        "confidence": doc["score"],
                        "full_text": doc["text"]
                    } for doc in context_docs
                ]
            }
            
        except Exception as e:
            return {
                "error": str(e),
                "thinking": "",
                "answer": "Failed to generate response",
                "sources": []
            }

导入和依赖项:

from retrieve import DocumentRetriever
import ollama
import regex as re
  • DocumentRetriever:来自检索.py 的类,用于处理文档检索。
  • ollama:用于与本地 Ollama 服务器交互并使用 Deepseek R1 生成响应。
  • regex:用于解析模型的响应以提取思考过程和答案。

QAPipeline 类:

__init__ 方法:

def __init__(self):
    self.retriever = DocumentRetriever()
  • 初始化 DocumentRetriever 的实例以处理文档检索。
  • 这可确保问答管道可以访问向量存储并可以检索相关块。

PROMPT_TEMPLATE 变量:

PROMPT_TEMPLATE = """Context information:
    {context}

    Using the context above and your general knowledge, answer this question:
    Question: {question}

    Format requirements:

    - If uncertain, say "The documents don't specify"""

此模板定义发送给 Deepseek R1 的提示的结构。它包括:

  • {context}:格式化为上下文的检索到的文档块。
  • {question}:用户的查询。
  • 格式化说明:指示模型在上下文未提供足够信息时指示不确定性。

parse_response 方法:

def parse_response(self, response: str) -> dict:
    """Extract thinking and answer components without <answer> tags"""
    # Extract thinking process
    think_match = re.search(r'<think>(.*?)</think>', response, re.DOTALL)
    
    # Get everything AFTER </think> as the answer
    answer_start = response.find('</think>') + len('</think>')
    answer = response[answer_start:].strip()
    
    return {
        "thinking": think_match.group(1).strip() if think_match else "",
        "answer": answer,
        "raw_response": response
    }
  • 此方法解析 Deepseek R1 的原始响应,以提取思考过程和答案。
  • <think></think> 标签:
  • 思考过程包含在 <think> 标签中。
  • 答案是 </think> 标签后的所有内容。
  • 返回值是一个字典,包含:thinking:模型的推理过程;answer:最终答案;raw_response:用于调试的未解析响应。

generate_answer 方法:

def generate_answer(self, question: str, k: int = 5) -> dict:
    """Full QA workflow with enhanced output"""
    try:
        # Retrieve documents
        context_docs = self.retriever.query_documents(question, k=k)
        
        # Format context preserving full metadata
        context_str = "\n".join(
            f"[Document {idx+1}] {doc['source']} (Page {doc['page']}):\n{doc['text']}"
            for idx, doc in enumerate(context_docs)
        )
        
        # Generate response
        response = ollama.generate(
            model="deepseek-r1:latest",
            prompt=self.PROMPT_TEMPLATE.format(
                context=context_str,
                question=question
            )
        )
        
        # Parse components
        parsed = self.parse_response(response['response'])
        
        return {
            **parsed,
            "sources": [
                {
                    "source": doc["source"],
                    "page": doc["page"],
                    "confidence": doc["score"],
                    "full_text": doc["text"]
                } for doc in context_docs
            ]
        }
        
    except Exception as e:
        return {
            "error": str(e),
            "thinking": "",
            "answer": "Failed to generate response",
            "sources": []
        }
  • question:用户的查询。
  • k:要检索的文档块数(默认值为 5)。

工作流程:

  • 检索文档:调用 self.retriever.query_documents 检索相关块。
  • 格式化上下文:将检索到的块组合成单个字符串,保留元数据(来源、页码)。
  • 生成响应:通过 Ollama 将格式化的上下文和问题发送到 Deepseek R1。
  • 解析响应:使用 parse_response 提取思考过程和答案。
  • 返回结果:返回一个包含以下内容的字典:解析后的响应(思考、答案、原始响应);使用的来源(来源、页面、置信度、全文);如果发生错误,该方法将返回一个包含错误消息和空/默认值的字典。

通过构建问答链,我们的核心功能已经完成。现在,我们需要将所有这些逻辑封装到一个易于使用的 UI 中。我们将使用 app.py 来实现这一点:

import streamlit as st
from qa import QAPipeline
from ingest import process_documents
import os
import shutil
import random

def reset_database():
    """Clear all stored data and reset session state"""
    try:
        if 'qa_pipeline' in st.session_state:
            st.session_state.qa_pipeline.retriever.vector_store.delete_collection()
        
        if os.path.exists("documents"):
            shutil.rmtree("documents")
            os.makedirs("documents")
        
        st.session_state.qa_pipeline = QAPipeline()
        st.success("Database cleared successfully!")
    except Exception as e:
        st.error(f"Error clearing database: {str(e)}")

def main():
    st.set_page_config(page_title="DocuMind AI", layout="wide")
    
    # Initialize session state
    if 'qa_pipeline' not in st.session_state:
        st.session_state.qa_pipeline = QAPipeline()
    if 'ingested' not in st.session_state:
        st.session_state.ingested = False
    if 'history' not in st.session_state:
        st.session_state.history = []

    # Sidebar for document management
    with st.sidebar:
        st.header("Document Management")
        uploaded_files = st.file_uploader(
            "Upload PDF documents", 
            type=["pdf"],
            accept_multiple_files=True
        )
        
        col1, col2 = st.columns(2)
        with col1:
            if st.button("Ingest Documents"):
                if uploaded_files:
                    try:
                        reset_database()
                        os.makedirs("documents", exist_ok=True)
                        for file in uploaded_files:
                            with open(os.path.join("documents", file.name), "wb") as f:
                                f.write(file.getbuffer())
                        with st.spinner("Processing documents..."):
                            process_documents()
                            st.session_state.ingested = True
                            st.success(f"Ingested {len(uploaded_files)} documents!")
                    except Exception as e:
                        st.error(f"Error processing documents: {str(e)}")
                else:
                    st.warning("Please upload documents first")
        with col2:
            st.button("Clear Database", on_click=reset_database)

    # Main interface
    st.title("📄 Document AI Assistant")
    
    # Display chat history with Streamlit's default styling
    for qa in st.session_state.history:
        with st.expander(f"Q: {qa['question']}", expanded=False):
            # Thinking Process Section
            with st.container():
                st.markdown("#### 🧠 Thinking Process of the Assistant")
                st.markdown(qa['thinking'])
                st.markdown("---")
            
            # Final Answer Section
            with st.container():
                st.markdown("#### 📝 Final Answer")
                st.markdown(qa['answer'])
                st.markdown("---")
            
            # Source Documents Section
            if qa["sources"]:
                st.markdown("#### 🔍 Source Documents")
                for idx, source in enumerate(qa["sources"], 1):
                    with st.container():
                        st.write(f"📄 **{source['source']}** (Page {source['page']})")
                        st.text_area(
                            "Relevant text excerpt:",
                            value=source['full_text'],
                            key=f"source_{idx}_{random.randint(0, 999999)}",
                            disabled=True
                        )

    # Question input with processing spinner
    question = st.chat_input(
        "Ask a question about your documents:",
        disabled=not st.session_state.ingested
    )

    # Process question with visual feedback
    if question and st.session_state.ingested:
        # Add to history immediately
        st.session_state.history.append({
            "question": question,
            "thinking": "",
            "answer": "Processing...",
            "sources": []
        })
        
        try:
            with st.status("🧠 Processing your question...", expanded=True) as status:
                st.write("🔍 Retrieving relevant documents...")
                result = st.session_state.qa_pipeline.generate_answer(question)
                status.update(label="✅ Processing complete", state="complete")
            
            # Update history with results
            st.session_state.history[-1] = {
                "question": question,
                "thinking": result.get('thinking', ''),
                "answer": result.get('answer', 'No answer generated'),
                "sources": result.get('sources', [])
            }
            
            st.rerun()
            
        except Exception as e:
            st.error(f"Error generating answer: {str(e)}")
            st.session_state.history.pop()

if __name__ == "__main__":
    main()

导入和依赖项:

import streamlit as st
from qa import QAPipeline
from ingest import process_documents
import os
import shutil
import random
  • streamlit:用于构建 Web 界面。
  • QAPipeline:来自 qa.py 的类,用于处理问答工作流。
  • process_documents:来自 ingest.py 的函数,用于处理和提取上传的文档。
  • os 和shutil:用于文件和目录操作(例如,创建目录、删除文件)。
  • random:用于为 Streamlit 组件生成唯一密钥。

reset_database 函数:

def reset_database():
    """Clear all stored data and reset session state"""
    try:
        if 'qa_pipeline' in st.session_state:
            st.session_state.qa_pipeline.retriever.vector_store.delete_collection()
        
        if os.path.exists("documents"):
            shutil.rmtree("documents")
            os.makedirs("documents")
        
        st.session_state.qa_pipeline = QAPipeline()
        st.success("Database cleared successfully!")
    except Exception as e:
        st.error(f"Error clearing database: {str(e)}")
  • 此函数通过以下方式重置系统:
  • 删除 ChromaDB 集合。
  • 删除文档目录并重新创建。
  • 在会话状态中重新初始化 QAPipeline。

初始化会话状态:

if 'qa_pipeline' not in st.session_state:
    st.session_state.qa_pipeline = QAPipeline()
if 'ingested' not in st.session_state:
    st.session_state.ingested = False
if 'history' not in st.session_state:
    st.session_state.history = []
  • st.session_state:用于在应用程序重新运行期间保留数据。
  • qa_pipeline:存储 QAPipeline 实例。
  • ingested:跟踪文档是否已被摄取。
  • history:存储问题和答案的历史记录。

列管理的侧边栏:

with st.sidebar:
    st.header("Document Management")
    uploaded_files = st.file_uploader(
        "Upload PDF documents", 
        type=["pdf"],
        accept_multiple_files=True
    )
    
    col1, col2 = st.columns(2)
    with col1:
        if st.button("Ingest Documents"):
            if uploaded_files:
                try:
                    reset_database()
                    os.makedirs("documents", exist_ok=True)
                    for file in uploaded_files:
                        with open(os.path.join("documents", file.name), "wb") as f:
                            f.write(file.getbuffer())
                    with st.spinner("Processing documents..."):
                        process_documents()
                        st.session_state.ingested = True
                        st.success(f"Ingested {len(uploaded_files)} documents!")
                except Exception as e:
                    st.error(f"Error processing documents: {str(e)}")
            else:
                st.warning("Please upload documents first")
    with col2:
        st.button("Clear Database", on_click=reset_database)
  • st.sidebar:创建用于文档管理的侧边栏。
  • st.file_uploader:允许用户上传多个 PDF 文件。
  • 提取文档按钮:重置数据库;将上传的文件保存到文档目录;调用 process_documents 将文件提取到 ChromaDB;更新会话状态中的提取标志。
  • 清除数据库按钮:调用 reset_database 清除所有存储的数据。

显示聊天记录:

for qa in st.session_state.history:
    with st.expander(f"Q: {qa['question']}", expanded=False):
        # Thinking Process Section
        with st.container():
            st.markdown("#### 🧠 Thinking Process of the Assistant")
            st.markdown(qa['thinking'])
            st.markdown("---")
        
        # Final Answer Section
        with st.container():
            st.markdown("#### 📝 Final Answer")
            st.markdown(qa['answer'])
            st.markdown("---")
        
        # Source Documents Section
        if qa["sources"]:
            st.markdown("#### 🔍 Source Documents")
            for idx, source in enumerate(qa["sources"], 1):
                with st.container():
                    st.write(f"📄 **{source['source']}** (Page {source['page']})")
                    st.text_area(
                        "Relevant text excerpt:",
                        value=source['full_text'],
                        key=f"source_{idx}_{random.randint(0, 999999)}",
                        disabled=True
                    )
  • 使用 st.expander 显示每个问答对的聊天记录。
  • 思考过程:显示模型的推理过程。
  • 最终答案:显示生成的答案。
  • 源文档:列出使用的源文档,包括源文件、页码和相关文本摘录。

问题输入:

question = st.chat_input(
    "Ask a question about your documents:",
    disabled=not st.session_state.ingested
)
  • st.chat_input:为用户提供文本输入以提问。
  • 禁用状态:输入被禁用,直到文档被提取。

处理问题:

if question and st.session_state.ingested:
    # Add to history immediately
    st.session_state.history.append({
        "question": question,
        "thinking": "",
        "answer": "Processing...",
        "sources": []
    })
    
    try:
        with st.status("🧠 Processing your question...", expanded=True) as status:
            st.write("🔍 Retrieving relevant documents...")
            result = st.session_state.qa_pipeline.generate_answer(question)
            status.update(label="✅ Processing complete", state="complete")
        
        # Update history with results
        st.session_state.history[-1] = {
            "question": question,
            "thinking": result.get('thinking', ''),
            "answer": result.get('answer', 'No answer generated'),
            "sources": result.get('sources', [])
        }
        
        st.rerun()
        
    except Exception as e:
        st.error(f"Error generating answer: {str(e)}")
        st.session_state.history.pop()
  • 即时反馈:使用“正在处理…”占位符将问题添加到历史记录中。
  • 处理状态:使用 st.status 显示问答工作流程的进度;处理完成后更新状态。
  • 更新历史记录:用实际结果替换占位符。

就是这样。多么漫长的旅程!

2、未来的增强功能

虽然该项目已经提供了一个功能强大、注重隐私的问答机器人,但总有改进的空间。以下是一些可能将其提升到新水平的潜在增强功能和功能:

支持更多文档格式

目前,该系统支持 PDF 文件,但扩展到其他格式(如 Word 文档、Markdown 甚至纯文本文件)将使其更加通用。这将允许用户处理更广泛的文档,而无需先转换它们。

与其他本地 LLM 集成

Deepseek R1 是一个很棒的推理模型,但集成对其他本地 LLM(例如 LLaMA、Mistral 或 Falcon)的支持将为用户提供更大的灵活性。这可以通过模块化架构来实现,允许用户根据自己的需求选择自己喜欢的模型。

增强的 UI 功能

Streamlit 界面已经很用户友好了,但其他功能可以改善体验:

  • 可搜索的历史记录:允许用户搜索过去的问题和答案。
  • 可自定义的主题:添加对明暗模式或自定义主题的支持。
  • 导出功能:使用户能够导出答案、来源或整个聊天记录。

3、高级检索技术

改进检索过程可以提高答案的质量:

  • 混合搜索:结合基于关键字和语义的搜索以获得更好的结果。
  • 重新排序:使用较小的模型对检索到的块进行重新排序以提高相关性。

4、可扩展性改进

随着系统的发展,可扩展性将变得越来越重要。潜在的改进包括:

  • 高效索引:优化 ChromaDB 或探索替代矢量数据库,以便更快地检索大型数据集。
  • 批处理:允许批量提取文档以更有效地处理大型集合。
  • 分布式处理:探索分布式计算框架(例如 Ray 或 Dask)以处理更大的工作负载。

5、结束语

使用 Deepseek R1、Ollama、ChromaDB 和 Streamlith 构建本地、以隐私为中心的问答机器人是一段令人兴奋的旅程。该项目展示了如何结合现代 AI 工具来创建功能强大、安全且用户友好的应用程序,这些应用程序在提供基于推理的智能答案的同时尊重用户隐私。

通过利用 Deepseek R1 的高级推理能力,我们构建了一个不仅可以检索信息,还可以思考信息,提供富有洞察力和情境感知的响应的系统。使用 Ollama 可确保所有处理都在本地进行,从而保证数据的安全和私密。同时,ChromaDB 可实现高效的文档存储和检索,Streamlit 可提供简洁直观的界面来与机器人交互。

在本文中,我们介绍了整个过程——从文档提取和检索到问答和 UI 集成。我们还探讨了推理模型在问答应用程序中的重要性,并强调了该项目如何通过支持更多文档格式、与其他本地 LLM 集成以及可扩展性改进等功能进行扩展。

欢迎查看 GitHub 存储库以获取完整代码并尝试构建你自己的版本。


原文链接:Thinking Locally, Acting Privately: Building a Reasoning-Powered Q&A App with Deepseek R1 using Ollama, Streamlit and RAG

汇智网翻译整理,转载请标明出处