医学文本处理多智能体系统

在快速发展的人工智能领域,多智能体(multi-agent)系统因其通过协作处理复杂任务的能力而受到关注。本文探讨了用于处理医学文本的多智能体人工智能应用程序的架构、工作流程和未来范围。该系统使用 Streamlit 作为前端,并通过 Ollama 利用 Llama-3.2:3b 模型构建,包括用于总结医学文本、撰写研究文章和编辑受保护健康信息 (PHI) 的代理。

1、工作流架构

此应用程序的架构是模块化的,由几个关键组件组成:

  • 前端(Streamlit):为用户交互提供直观的 Web 界面。用户可以选择任务、输入数据和查看结果。
  • 代理管理器:充当中央协调器,将任务委托给适当的主要代理及其相应的验证器代理。

主要代理:

  • 总结代理:生成医学文本摘要。
  • 撰写文章代理:创建研究文章的草稿。
  • 编辑数据代理:屏蔽医疗数据中的 PHI。

验证器代理:

  • 总结验证器代理:评估总结的质量。
  • 精炼器代理:增强草稿以提高质量。
  • 编辑 PHI 验证器代理:确保所有 PHI 都已正确屏蔽。
  • 记录器:记录所有交互、输入、输出和错误,以便进行监控和调试。

2、用于实现的技术栈

该应用程序使用多种现代技术构建,可确保效率、可扩展性和易用性:

  • Streamlit:一个功能强大且易于使用的框架,用于用 Python 构建 Web 应用程序。它为用户提供了一个直观的界面来与 AI 系统交互,即使是没有技术专业知识的人也可以使用它。
  • Ollama:一个促进使用大型语言模型(如 Llama、Mistral、Gemma)的平台等等。它允许将 AI 模型无缝集成到应用程序中,从而实现复杂的自然语言处理任务。
  • Python:用于开发应用程序的主要编程语言。Python 丰富的库和框架生态系统使其成为 AI 和 Web 开发的理想选择。
  • Asyncio:用于使用 async/await 语法编写并发代码的 Python 库。它有助于有效地管理异步任务,这对于同时处理多个 AI 代理至关重要。

3、工作流程

应用程序的工作流程如下:

  • 用户交互:用户通过 Streamlit 界面与系统交互,选择任务并提供输入数据。
  • 任务委派:代理管理器接收任务请求并将其委派给适当的主代理。
  • 处理:主代理使用 LLaMA 模型处理输入数据以生成所需的输出。
  • 验证:然后将输出传递给相应的验证器代理,该代理评估结果的质量和准确性。提供 5 分制的验证分数以便快速评估。
  • 结果显示:最终验证的内容显示给用户,并提供导出结果或返回主页的选项。

4、代码实现

文件夹结构:

medical_ai_agents/
├── agents/
│ ├── __init__.py
│ ├── base_agent.py
│ ├── main_agents.py
│ └── validator_agents.py
├── core/
│ ├── __init__.py
│ ├── agent_manager.py
│ └── logger.py
├── utils/
│ ├── __init__.py
│ └── ollama_utils.py
├── app.py
└── requirements.txt

安装所需的依赖项:

pip install -r requirements.txt

4.1 base_agent.py

# base_agent.py

from abc import ABC, abstractmethod
from typing import Any, Dict
import ollama

class BaseAgent(ABC):
    def __init__(self, model_name: str = "llama3.2:3b"):
        self.model_name = model_name
        
    async def get_completion(self, prompt: str) -> str:
        try:
            response = ollama.chat(model=self.model_name, messages=[
                {'role': 'user', 'content': prompt}
            ])
            return response['message']['content']
        except Exception as e:
            raise Exception(f"Error getting completion: {str(e)}")

class MainAgent(BaseAgent):
    @abstractmethod
    async def process(self, input_data: Any) -> Dict[str, Any]:
        pass

class ValidatorAgent(BaseAgent):
    @abstractmethod
    async def validate(self, input_data: Any, output_data: Any) -> Dict[str, bool]:
        pass 

4.2 main_agent.py

from typing import Any, Dict
from .base_agent import MainAgent

class SummarizeAgent(MainAgent):
    async def process(self, input_data: str) -> Dict[str, Any]:
        prompt = f"Summarize the following medical text:\n\n{input_data}"
        summary = await self.get_completion(prompt)
        return {"summary": summary}

class WriteArticleAgent(MainAgent):
    async def process(self, input_data: Dict[str, str]) -> Dict[str, Any]:
        prompt = f"""Write a research article with the following:
        Topic: {input_data['topic']}
        Key points: {input_data['key_points']}"""
        article = await self.get_completion(prompt)
        return {"article": article}

class SanitizeDataAgent(MainAgent):
    async def process(self, input_data: str) -> Dict[str, Any]:
        prompt = """Mask all Protected Health Information (PHI) in the following text. 
        Replace with appropriate masks:
        - Patient names with [PATIENT_NAME]
        - Doctor/Provider names with [PROVIDER_NAME]
        - Dates with [DATE]
        - Locations/Addresses with [LOCATION]
        - Phone numbers with [PHONE]
        - Email addresses with [EMAIL]
        - Medical record numbers with [MRN]
        - Social Security numbers with [SSN]
        - Device identifiers with [DEVICE_ID]
        - Any other identifying numbers with [ID]
        - Physical health conditions with [HEALTH_CONDITION]
        - Medications with [MEDICATION]
        - Lab results with [LAB_RESULT]
        - Vital signs with [VITAL_SIGN]
        - Procedures with [PROCEDURE]

        Text to mask:\n\n""" + input_data
        sanitized_data = await self.get_completion(prompt)
        return {"sanitized_data": sanitized_data} 

4.3 validator_agent.py

from typing import Any, Dict
from .base_agent import ValidatorAgent

class SummarizeValidatorAgent(ValidatorAgent):
    async def validate(self, input_data: str, output_data: Dict[str, Any]) -> Dict[str, bool]:
        prompt = f"""Evaluate if this summary accurately represents the original text:
        Original: {input_data}
        Summary: {output_data['summary']}
        
        Provide:
        1. A score out of 5 (where 5 is perfect)
        2. 'valid' or 'invalid'
        3. Brief explanation
        
        Format: Score: X/5\nStatus: valid/invalid\nExplanation: ..."""
        
        result = await self.get_completion(prompt)
        is_valid = "valid" in result.lower()
        return {"is_valid": is_valid, "feedback": result}

class RefinerAgent(ValidatorAgent):
    async def validate(self, input_data: Dict[str, str], output_data: Dict[str, Any]) -> Dict[str, bool]:
        prompt = f"""Review this research article for quality and accuracy:
        Article: {output_data['article']}
        
        Provide:
        1. A score out of 5 (where 5 is perfect)
        2. 'valid' or 'invalid'
        3. Brief explanation
        
        Format: Score: X/5\nStatus: valid/invalid\nExplanation: ..."""
        
        result = await self.get_completion(prompt)
        is_valid = "valid" in result.lower()
        return {"is_valid": is_valid, "feedback": result}

class SanitizeValidatorAgent(ValidatorAgent):
    async def validate(self, input_data: str, output_data: Dict[str, Any]) -> Dict[str, bool]:
        prompt = f"""Verify if all Protected Health Information (PHI) has been properly masked in this text:
        Masked text: {output_data['sanitized_data']}
        
        Check for any unmasked:
        - Patient names
        - Doctor/Provider names
        - Dates
        - Locations/Addresses
        - Phone numbers
        - Email addresses
        - Medical record numbers
        - Social Security numbers
        - Device identifiers
        - Other identifying numbers
        - Physical health conditions
        - Medications
        - Lab results
        - Vital signs
        - Procedures
        
        Provide:
        1. A score out of 5 (where 5 means all PHI properly masked)
        2. 'valid' or 'invalid'
        3. List any found unmasked PHI
        
        Format: Score: X/5\nStatus: valid/invalid\nFindings: ..."""
        
        result = await self.get_completion(prompt)
        is_valid = "valid" in result.lower()
        return {"is_valid": is_valid, "feedback": result} 

4.4 agent_manager.py

from typing import Dict, Any
from agents.main_agents import SummarizeAgent, WriteArticleAgent, SanitizeDataAgent
from agents.validator_agents import SummarizeValidatorAgent, RefinerAgent, SanitizeValidatorAgent
from core.logger import Logger

class AgentManager:
    def __init__(self):
        self.logger = Logger()
        
        # Initialize main agents
        self.summarize_agent = SummarizeAgent()
        self.write_article_agent = WriteArticleAgent()
        self.sanitize_agent = SanitizeDataAgent()
        
        # Initialize validator agents
        self.summarize_validator = SummarizeValidatorAgent()
        self.refiner_agent = RefinerAgent()
        self.sanitize_validator = SanitizeValidatorAgent()

    async def process_task(self, task_type: str, input_data: Any) -> Dict[str, Any]:
        try:
            self.logger.log_input(task_type, input_data)
            
            if task_type == "summarize":
                result = await self.summarize_agent.process(input_data)
                validation = await self.summarize_validator.validate(input_data, result)
            
            elif task_type == "write_article":
                result = await self.write_article_agent.process(input_data)
                validation = await self.refiner_agent.validate(input_data, result)
            
            elif task_type == "sanitize":
                result = await self.sanitize_agent.process(input_data)
                validation = await self.sanitize_validator.validate(input_data, result)
            else:
                raise ValueError(f"Unknown task type: {task_type}")

            self.logger.log_output(task_type, result, validation)
            return {"result": result, "validation": validation}

        except Exception as e:
            self.logger.log_error(task_type, str(e))
            raise 

4.5 logger.py

import logging
from datetime import datetime
from typing import Any, Dict

class Logger:
    def __init__(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('medical_ai_agents.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def log_input(self, task_type: str, input_data: Any):
        self.logger.info(f"Task: {task_type} - Input received at {datetime.now()}")

    def log_output(self, task_type: str, result: Dict[str, Any], validation: Dict[str, bool]):
        self.logger.info(f"Task: {task_type} - Output generated at {datetime.now()}")
        self.logger.info(f"Validation result: {validation['is_valid']}")

    def log_error(self, task_type: str, error_message: str):
        self.logger.error(f"Task: {task_type} - Error: {error_message}") 

4.6 app.py

import streamlit as st
import asyncio
from core.agent_manager import AgentManager

# Set page configuration with custom theme
st.set_page_config(
    page_title="Medical AI Agents",
    layout="wide",
    initial_sidebar_state="expanded"
)

# Custom CSS for styling
st.markdown("""
    <style>
    .main-header {
        font-size: 2.5rem;
        color: white;
        text-align: center;
        padding: 1.5rem;
        margin-bottom: 1rem;
        font-weight: bold;
        background: linear-gradient(120deg, #1E88E5 0%, #1565C0 100%);
        border-radius: 10px;
        box-shadow: 0 2px 10px rgba(30,136,229,0.2);
    }
    .sub-header {
        font-size: 1.8rem;
        color: #0D47A1;
        padding: 0.5rem 0;
        border-bottom: 2px solid #1E88E5;
        margin-bottom: 1rem;
    }
    .task-container {
        background-color: #F8F9FA;
        padding: 2rem;
        border-radius: 10px;
        box-shadow: 0 2px 4px rgba(0,0,0,0.1);
    }
    .result-box {
        background-color: white;
        padding: 1.5rem;
        border-radius: 8px;
        border-left: 4px solid #1E88E5;
        margin: 1rem 0;
    }
    .validation-box {
        padding: 1rem;
        border-radius: 8px;
        margin-top: 1rem;
    }
    .stButton>button {
        background-color: #1E88E5;
        color: white;
        border-radius: 25px;
        padding: 0.5rem 2rem;
        border: none;
        box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        transition: all 0.3s ease;
    }
    .stButton>button:hover {
        background-color: #1565C0;
        box-shadow: 0 4px 8px rgba(0,0,0,0.2);
        transform: translateY(-2px);
    }
    .stTextArea>div>div {
        border-radius: 8px;
        border: 2px solid #E3F2FD;
    }
    .sidebar-content {
        padding: 1rem;
        background-color: #F8F9FA;
        border-radius: 8px;
    }
    </style>
    """, unsafe_allow_html=True)

@st.cache_resource
def get_agent_manager():
    return AgentManager()

def show_results_page(result_data):
    st.markdown("<h1 class='main-header'>Final Validated Content</h1>", unsafe_allow_html=True)
    
    # Add a subheader based on the content type
    if "summary" in result_data["result"]:
        st.markdown("<h2 class='sub-header'>Medical Text Summary</h2>", unsafe_allow_html=True)
    elif "article" in result_data["result"]:
        st.markdown("<h2 class='sub-header'>Research Article</h2>", unsafe_allow_html=True)
    elif "sanitized_data" in result_data["result"]:
        st.markdown("<h2 class='sub-header'>Redacted PHI Content</h2>", unsafe_allow_html=True)
    
    # Display content in a styled box
    st.markdown("<div class='result-box'>", unsafe_allow_html=True)
    if "summary" in result_data["result"]:
        st.write(result_data["result"]["summary"])
    elif "article" in result_data["result"]:
        st.write(result_data["result"]["article"])
    elif "sanitized_data" in result_data["result"]:
        st.write(result_data["result"]["sanitized_data"])
    st.markdown("</div>", unsafe_allow_html=True)
    
    # Action buttons in columns
    col1, col2, col3 = st.columns([1, 1, 1])
    with col2:
        # Export button
        if st.button("📥 Export Results"):
            export_data = ""
            if "summary" in result_data["result"]:
                export_data = result_data["result"]["summary"]
            elif "article" in result_data["result"]:
                export_data = result_data["result"]["article"]
            elif "sanitized_data" in result_data["result"]:
                export_data = result_data["result"]["sanitized_data"]
                
            st.download_button(
                label="💾 Download Content",
                data=export_data,
                file_name="final_content.txt",
                mime="text/plain"
            )
    
    with col3:
        # Return button
        if st.button("🏠 Return to Main Page"):
            st.session_state.show_results = False
            st.rerun()

def main():
    # Sidebar styling
    with st.sidebar:
        st.markdown("<h2 style='text-align: center; color: #1E88E5;'>Tasks</h2>", unsafe_allow_html=True)
        st.markdown("<div class='sidebar-content'>", unsafe_allow_html=True)
        task_type = st.radio(
            "",  # Empty label as we're using custom header
            ["summarize", "write_article", "Redact PHI"],
            format_func=lambda x: {
                "summarize": "📝 Summarize Medical Text",
                "write_article": "📚 Write Research Article",
                "Redact PHI": "🔒 Redact PHI"
            }[x]
        )
        st.markdown("</div>", unsafe_allow_html=True)
    
    # Main content - Single header for the entire page
    st.markdown("<h1 class='main-header'>Medical Multi-Agent System</h1>", unsafe_allow_html=True)
    
    # Initialize session state
    if 'show_results' not in st.session_state:
        st.session_state.show_results = False
    if 'result_data' not in st.session_state:
        st.session_state.result_data = None
    
    if st.session_state.show_results:
        show_results_page(st.session_state.result_data)
        return
    
    agent_manager = get_agent_manager()
    
    # Task containers with consistent styling
    st.markdown("<div class='task-container'>", unsafe_allow_html=True)
    
    if task_type == "summarize":
        st.markdown("<h2 class='sub-header'>📝 Summarize Medical Text</h2>", unsafe_allow_html=True)
        input_text = st.text_area("Enter medical text to summarize", height=200)
        col1, col2 = st.columns(2)
        
        with col1:
            if st.button("🔄 Generate Summary"):
                with st.spinner("Processing..."):
                    result = asyncio.run(agent_manager.process_task("summarize", input_text))
                    st.session_state.result_data = result
                    st.markdown("<div class='result-box'>", unsafe_allow_html=True)
                    st.subheader("Summary")
                    st.write(result["result"]["summary"])
                    st.markdown("</div>", unsafe_allow_html=True)
                    st.markdown("<div class='validation-box'>", unsafe_allow_html=True)
                    st.subheader("Validation")
                    
                    # Extract and display score
                    feedback = result["validation"]["feedback"]
                    if "Score:" in feedback:
                        score = feedback.split("Score:")[1].split("\n")[0].strip()
                        st.markdown(f"""
                            <div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>
                                <h3 style='margin: 0; color: #1565C0; text-align: center;'>Validation Score: {score}</h3>
                            </div>
                        """, unsafe_allow_html=True)
                    
                    st.write(feedback)
                    st.markdown("</div>", unsafe_allow_html=True)
        
        with col2:
            if st.session_state.result_data and st.button("👁️ View Edited Content"):
                st.session_state.show_results = True
                st.rerun()
    
    elif task_type == "write_article":
        st.markdown("<h2 class='sub-header'>📚 Write Research Article</h2>", unsafe_allow_html=True)
        topic = st.text_input("Enter research topic")
        key_points = st.text_area("Enter key points (one per line)", height=150)
        col1, col2 = st.columns(2)
        
        with col1:
            if st.button("📝 Generate Article"):
                with st.spinner("Processing..."):
                    input_data = {"topic": topic, "key_points": key_points}
                    result = asyncio.run(agent_manager.process_task("write_article", input_data))
                    st.session_state.result_data = result
                    st.markdown("<div class='result-box'>", unsafe_allow_html=True)
                    st.subheader("Article")
                    st.write(result["result"]["article"])
                    st.markdown("</div>", unsafe_allow_html=True)
                    st.markdown("<div class='validation-box'>", unsafe_allow_html=True)
                    st.subheader("Validation")
                    
                    # Extract and display score
                    feedback = result["validation"]["feedback"]
                    if "Score:" in feedback:
                        score = feedback.split("Score:")[1].split("\n")[0].strip()
                        st.markdown(f"""
                            <div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>
                                <h3 style='margin: 0; color: #1565C0; text-align: center;'>Validation Score: {score}</h3>
                            </div>
                        """, unsafe_allow_html=True)
                    
                    st.write(feedback)
                    st.markdown("</div>", unsafe_allow_html=True)
        
        with col2:
            if st.session_state.result_data and st.button("👁️ View Edited Content"):
                st.session_state.show_results = True
                st.rerun()
    
    elif task_type == "Redact PHI":
        st.markdown("<h2 class='sub-header'>🔒 Redact Protected Health Information (PHI)</h2>", unsafe_allow_html=True)
        input_text = st.text_area("Enter medical text to redact PHI", height=200)
        col1, col2 = st.columns(2)
        
        with col1:
            if st.button("🔐 Redact PHI"):
                with st.spinner("Processing..."):
                    result = asyncio.run(agent_manager.process_task("sanitize", input_text))
                    st.session_state.result_data = result
                    st.markdown("<div class='result-box'>", unsafe_allow_html=True)
                    st.subheader("Redacted Text")
                    st.write(result["result"]["sanitized_data"])
                    st.markdown("</div>", unsafe_allow_html=True)
                    st.markdown("<div class='validation-box'>", unsafe_allow_html=True)
                    st.subheader("Validation")
                    
                    # Extract and display score
                    feedback = result["validation"]["feedback"]
                    if "Score:" in feedback:
                        score = feedback.split("Score:")[1].split("\n")[0].strip()
                        st.markdown(f"""
                            <div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>
                                <h3 style='margin: 0; color: #1565C0; text-align: center;'>Validation Score: {score}</h3>
                            </div>
                        """, unsafe_allow_html=True)
                    
                    st.write(feedback)
                    st.markdown("</div>", unsafe_allow_html=True)
        
        with col2:
            if st.session_state.result_data and st.button("👁️ View Edited Content"):
                st.session_state.show_results = True
                st.rerun()
    
    st.markdown("</div>", unsafe_allow_html=True)

if __name__ == "__main__":
    main() 

5、运行

运行 Streamlit 应用程序:

streamlit run app.py

streamlit 屏幕已实例化

你现在可以在浏览器中查看 Streamlit 应用程序。本地 URL:http://localhost:8501

摘要生成页面:

摘要生成的验证结果页面:

查看已验证内容:

点击导出结果下载结果:

撰写研究文章任务:

生成的文章:

验证结果:

查看最终编辑内容:

编辑 PHI 任务

编辑文本

编辑文本的验证分数

查看最终编辑内容

6、未来改进空间

虽然当前系统非常强大,但未来仍有几个方面需要改进:

  • 增强验证指标:结合更详细的验证指标和反馈机制可以提高输出的可靠性。
  • 其他代理:扩展系统以包含更多专业代理,例如用于数据可视化或预测分析的代理,可以扩大其适用性。
  • 用户反馈循环:实施反馈循环,用户可以提供有关输出质量的输入,这有助于随着时间的推移完善模型。
  • 可扩展性:随着应用程序的增长,优化系统的可扩展性以处理更大的数据集和更多并发用户将是有益的。
  • 与其他工具集成:与其他医疗数据处理工具或数据库集成可以增强系统在实际应用中的实用性。

7、结束语

这个多代理 AI 系统展示了协作代理在处理复杂医疗文本方面的强大功能。通过利用 LLaMA 等先进模型并通过 Streamlit 提供用户友好的界面,该应用程序为摘要、文章撰写和 PHI 编辑等任务提供了实用的解决方案。随着人工智能的不断发展,这样的系统将在改变我们处理和解释医疗数据的方式方面发挥关键作用。


原文链接:Building a Multi-Agent AI System from scratch for Medical Text Processing

汇智网翻译整理,转载请标明出处