从零构建医学多智能体AI

本文介绍使用Streamlit作为前端,并利用Ollama平台上的Llama-3.2:3b模型,实现用于总结医学文本、撰写研究文章以及屏蔽受保护健康信息(PHI)的多智能体系统。

从零构建医学多智能体AI

在快速发展的AI领域,多智能体系统因其通过协作处理复杂任务的能力而越来越受欢迎。本文探讨了为处理医学文本而设计的多智能体AI应用的架构、工作流程和未来前景。该系统使用Streamlit作为前端,并利用Ollama平台上的Llama-3.2:3b模型,包括用于总结医学文本、撰写研究文章以及屏蔽受保护健康信息(PHI)的智能体。

1、从零构建 vs. 使用框架

在决定从零开始构建多智能体系统还是使用现有的智能体框架(如CrewAI、Autogen或OpenAI Swarm)时,重要的是权衡每种方法的优点和权衡。以下是自定义构建系统与使用框架的优缺点对比:

从零开始构建多智能体系统的优点:

定制化和灵活性:

  • 量身定制:从零开始构建允许创建高度定制化的解决方案,以满足特定需求和要求。您对架构、设计和实现细节拥有完全控制权。
  • 灵活性:您可以实现现有框架可能不支持的独特功能和功能。

深入理解:

  • 深入了解:从头开发系统可以深入了解底层机制和架构,这有助于故障排除和优化。
  • 学习机会:这对开发人员来说是一个很好的学习经验,可以提高他们在系统设计、架构和AI方面的技能。

不依赖外部框架:

  • 独立性:您不会受到第三方框架限制或更新的影响,这些框架有时可能会引入破坏性更改或废弃功能。
  • 安全性和隐私:您对数据处理和安全措施拥有完全控制权,这对于像医学数据处理这样的敏感应用至关重要。

性能优化:

  • 性能调优:您可以根据特定的性能指标(如速度或资源使用情况)优化系统,而无需承受通用框架可能带来的额外开销。

成本效益:

  • 无许可费用:避免使用商业框架可能产生的潜在成本,这些框架可能会对高级功能或企业支持收费。
使用智能体框架(CrewAI、Autogen、OpenAI Swarm)的优点:

快速开发:

  • 节省时间:框架提供了预先构建的组件和功能,大大减少了开发时间。
  • 经过验证的解决方案:利用经过测试和优化的社区或公司提供的解决方案。

可扩展性:

  • 内置可扩展性:许多框架旨在解决可扩展性问题,使系统可以根据需要轻松扩展。
  • 云集成:通常带有内置的云服务支持,便于部署和扩展。

社区和支持:

  • 活跃的社区:访问开发人员和用户社区,他们可以提供支持、分享最佳实践并贡献框架的发展。
  • 文档和教程:全面的文档和教程可以帮助加速学习曲线。

高级功能:

  • 最先进的功能:框架通常包括开箱即用的自然语言处理、机器学习集成和多智能体协调等高级功能。
  • 持续更新:从框架维护者那里受益于持续改进和更新。

互操作性:

  • 与其他工具集成:框架通常提供与其他工具和服务的简单集成,增强系统的功能。

2、工作流架构

该应用程序的架构是模块化的,包含几个关键组件:

  • 前端(Streamlit):提供一个直观的Web界面供用户交互。用户可以选择任务、输入数据并查看结果。
  • 代理管理器:充当中央协调器,将任务委派给适当的主代理及其相应的验证代理。

主代理:

  • 摘要代理:生成医学文本的摘要。
  • 写文章代理:创建研究文章草稿。
  • 脱敏数据代理:屏蔽医学数据中的PHI。

验证代理:

  • 摘要验证代理:评估摘要的质量。
  • 精炼代理:提高草稿质量。
  • 脱敏PHI验证代理:确保所有PHI都已正确屏蔽。
  • 记录器:记录所有交互、输入、输出和错误以进行监控和调试。

3、技术栈

该应用程序使用了一组现代技术,以确保效率、可扩展性和易用性:

  • Streamlit: 一种强大的、易于使用的Python框架,用于构建Web应用程序。它为用户提供了一个直观的界面来与AI系统交互,即使是没有技术背景的人也能轻松使用。
  • Ollama: 一个平台,便于使用大型语言模型(如Llama、Mistral、Gemma等)。它允许无缝地将AI模型集成到应用程序中,实现复杂的自然语言处理任务。
  • Python: 开发应用程序的主要编程语言。Python丰富的库和框架生态系统使其成为AI和Web开发的理想选择。
  • Asyncio: Python库,用于使用async/await语法编写并发代码。它有助于高效地管理异步任务,这对于同时处理多个AI代理至关重要。

4、工作流程

应用程序的工作流程如下:

  • 用户交互:用户通过Streamlit界面与系统交互,选择任务并提供输入数据。
  • 任务委派:代理管理器接收任务请求并将任务委派给适当的主代理。
  • 处理:主代理使用LLaMA模型处理输入数据以生成所需输出。
  • 验证:输出然后传递给相应的验证代理,该代理评估结果的质量和准确性。提供一个五分制的验证分数以便快速评估。
  • 结果显示:最终验证的内容显示给用户,用户可以选择导出结果或返回主页。

5、代码实现

文件夹结构

medical_ai_agents/
├── agents/
│ ├── __init__.py
│ ├── base_agent.py
│ ├── main_agents.py
│ └── validator_agents.py
├── core/
│ ├── __init__.py
│ ├── agent_manager.py
│ └── logger.py
├── utils/
│ ├── __init__.py
│ └── ollama_utils.py
├── app.py
└── requirements.txt

安装所需依赖

pip install -r requirements.txt

创建代理。base_agent.py

from abc import ABC, abstractmethod  
from typing import Any, Dict  
import ollama  
  
class BaseAgent(ABC):  
    def __init__(self, model_name: str = "llama3.2:3b"):  
        self.model_name = model_name  
          
    async def get_completion(self, prompt: str) -> str:  
        try:  
            response = ollama.chat(model=self.model_name, messages=[  
                {'role': 'user', 'content': prompt}  
            ])  
            return response['message']['content']  
        except Exception as e:  
            raise Exception(f"Error getting completion: {str(e)}")  
  
class MainAgent(BaseAgent):  
    @abstractmethod  
    async def process(self, input_data: Any) -> Dict[str, Any]:  
        pass  
  
class ValidatorAgent(BaseAgent):  
    @abstractmethod  
    async def validate(self, input_data: Any, output_data: Any) -> Dict[str, bool]:  
        pass 

main_agent.py

from typing import Any, Dict  
from .base_agent import MainAgent  
  
class SummarizeAgent(MainAgent):  
    async def process(self, input_data: str) -> Dict[str, Any]:  
        prompt = f"Summarize the following medical text:\n\n{input_data}"  
        summary = await self.get_completion(prompt)  
        return {"summary": summary}  
  
class WriteArticleAgent(MainAgent):
    async def process(self, input_data: Dict[str, str]) -> Dict[str, Any]:
        prompt = f"""Write a research article with the following:
        Topic: {input_data['topic']}
        Key points: {input_data['key_points']}"""
        article = await self.get_completion(prompt)
        return {"article": article}

class SanitizeDataAgent(MainAgent):
    async def process(self, input_data: str) -> Dict[str, Any]:
        prompt = """Mask all Protected Health Information (PHI) in the following text. 
        Replace with appropriate masks:
        - Patient names with [PATIENT_NAME]
        - Doctor/Provider names with [PROVIDER_NAME]
        - Dates with [DATE]
        - Locations/Addresses with [LOCATION]
        - Phone numbers with [PHONE]
        - Email addresses with [EMAIL]
        - Medical record numbers with [MRN]
        - Social Security numbers with [SSN]
        - Device identifiers with [DEVICE_ID]
        - Any other identifying numbers with [ID]
        - Physical health conditions with [HEALTH_CONDITION]
        - Medications with [MEDICATION]
        - Lab results with [LAB_RESULT]
        - Vital signs with [VITAL_SIGN]
        - Procedures with [PROCEDURE]

        Text to mask:\n\n""" + input_data
        sanitized_data = await self.get_completion(prompt)
        return {"sanitized_data": sanitized_data} 

validator_agent.py

from typing import Any, Dict
from .base_agent import ValidatorAgent

class SummarizeValidatorAgent(ValidatorAgent):
    async def validate(self, input_data: str, output_data: Dict[str, Any]) -> Dict[str, bool]:
        prompt = f"""Evaluate if this summary accurately represents the original text:
        Original: {input_data}
        Summary: {output_data['summary']}
        
        Provide:
        1. A score out of 5 (where 5 is perfect)
        2. 'valid' or 'invalid'
        3. Brief explanation
        
        Format: Score: X/5\nStatus: valid/invalid\nExplanation: ..."""
        
        result = await self.get_completion(prompt)
        is_valid = "valid" in result.lower()
        return {"is_valid": is_valid, "feedback": result}

class RefinerAgent(ValidatorAgent):
    async def validate(self, input_data: Dict[str, str], output_data: Dict[str, Any]) -> Dict[str, bool]:
        prompt = f"""Review this research article for quality and accuracy:
        Article: {output_data['article']}
        
        Provide:
        1. A score out of 5 (where 5 is perfect)
        2. 'valid' or 'invalid'
        3. Brief explanation
        
        Format: Score: X/5\nStatus: valid/invalid\nExplanation: ..."""
        
        result = await self.get_completion(prompt)
        is_valid = "valid" in result.lower()
        return {"is_valid": is_valid, "feedback": result}

class SanitizeValidatorAgent(ValidatorAgent):
    async def validate(self, input_data: str, output_data: Dict[str, Any]) -> Dict[str, bool]:
        prompt = f"""Verify if all Protected Health Information (PHI) has been properly masked in this text:
        Masked text: {output_data['sanitized_data']}
        
        Check for any unmasked:
        - Patient names
        - Doctor/Provider names
        - Dates
        - Locations/Addresses
        - Phone numbers
        - Email addresses
        - Medical record numbers
        - Social Security numbers
        - Device identifiers
        - Other identifying numbers
        - Physical health conditions
        - Medications
        - Lab results
        - Vital signs
        - Procedures
        
        Provide:
        1. A score out of 5 (where 5 means all PHI properly masked)
        2. 'valid' or 'invalid'
        3. List any found unmasked PHI
        
        Format: Score: X/5\nStatus: valid/invalid\nFindings: ..."""
        
        result = await self.get_completion(prompt)
        is_valid = "valid" in result.lower()
        return {"is_valid": is_valid, "feedback": result} 

核心功能

agent_manager.py

from typing import Dict, Any  
from agents.main_agents import SummarizeAgent, WriteArticleAgent, SanitizeDataAgent  
from agents.validator_agents import SummarizeValidatorAgent, RefinerAgent, SanitizeValidatorAgent  
from core.logger import Logger  
  
class AgentManager:  
    def __init__(self):  
        self.logger = Logger()  
          
        # 初始化主要代理  
        self.summarize_agent = SummarizeAgent()  
        self.write_article_agent = WriteArticleAgent()  
        self.sanitize_agent = SanitizeDataAgent()  
          
        # 初始化验证代理  
        self.summarize_validator = SummarizeValidatorAgent()  
        self.refiner_agent = RefinerAgent()  
        self.sanitize_validator = SanitizeValidatorAgent()  
  
    async def process_task(self, task_type: str, input_data: Any) -> Dict[str, Any]:  
        try:  
            self.logger.log_input(task_type, input_data)  
              
            if task_type == "summarize":  
                result = await self.summarize_agent.process(input_data)  
                validation = await self.summarize_validator.validate(input_data, result)  
              
            elif task_type == "write_article":  
                result = await self.write_article_agent.process(input_data)  
                validation = await self.refiner_agent.validate(input_data, result)  
              
            elif task_type == "sanitize":  
                result = await self.sanitize_agent.process(input_data)  
                validation = await self.sanitize_validator.validate(input_data, result)  
            else:  
                raise ValueError(f"未知任务类型: {task_type}")  
  
            self.logger.log_output(task_type, result, validation)  
            return {"result": result, "validation": validation}  
  
        except Exception as e:  
            self.logger.log_error(task_type, str(e))  
            raise 

日志记录器

logger.py

import logging
from datetime import datetime
from typing import Any, Dict

class Logger:
    def __init__(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('medical_ai_agents.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def log_input(self, task_type: str, input_data: Any):
        self.logger.info(f"Task: {task_type} - Input received at {datetime.now()}")

    def log_output(self, task_type: str, result: Dict[str, Any], validation: Dict[str, bool]):
        self.logger.info(f"Task: {task_type} - Output generated at {datetime.now()}")
        self.logger.info(f"Validation result: {validation['is_valid']}")

    def log_error(self, task_type: str, error_message: str):
        self.logger.error(f"Task: {task_type} - Error: {error_message}") 

Streamlit 应用程序

app.py

import streamlit as st  
import asyncio  
from core.agent_manager import AgentManager  
  
# 设置页面配置并使用自定义主题  
st.set_page_config(  
    page_title="医疗AI代理",  
    layout="wide",  
    initial_sidebar_state="expanded"  
)  
  
# 自定义CSS样式  
st.markdown("""  
    <style>  
    .main-header {  
        font-size: 2.5rem;  
        color: white;  
        text-align: center;  
        padding: 1.5rem;  
        margin-bottom: 1rem;  
        font-weight: bold;  
        background: linear-gradient(120deg, #1E88E5 0%, #1565C0 100%);  
        border-radius: 10px;  
        box-shadow: 0 2px 10px rgba(30,136,229,0.2);  
    }  
    .sub-header {  
        font-size: 1.8rem;  
        color: #0D47A1;  
        padding: 0.5rem 0;  
        border-bottom: 2px solid #1E88E5;  
        margin-bottom: 1rem;  
    }  
    .task-container {  
        background-color: #F8F9FA;  
        padding: 2rem;  
        border-radius: 10px;  
        box-shadow: 0 2px 4px rgba(0,0,0,0.1);  
    }  
    .result-box {  
        background-color: white;  
        padding: 1.5rem;  
        border-radius: 8px;  
        border-left: 4px solid #1E88E5;  
        margin: 1rem 0;  
    }  
    .validation-box {  
        padding: 1rem;  
        border-radius: 8px;  
        margin-top: 1rem;  
    }  
    .stButton>button {  
        background-color: #1E88E5;  
        color: white;  
        border-radius: 25px;  
        padding: 0.5rem 2rem;  
        border: none;  
        box-shadow: 0 2px 4px rgba(0,0,0,0.1);  
        transition: all 0.3s ease;  
    }  
    .stButton>button:hover {  
        background-color: #1565C0;  
        box-shadow: 0 4px 8px rgba(0,0,0,0.2);  
        transform: translateY(-2px);  
    }  
    .stTextArea>div>div {  
        border-radius: 8px;  
        border: 2px solid #E3F2FD;  
    }  
    .sidebar-content {  
        padding: 1rem;  
        background-color: #F8F9FA;  
        border-radius: 8px;  
    }  
    </style>  
    """, unsafe_allow_html=True)  
  
@st.cache_resource  
def get_agent_manager():  
    return AgentManager()  
  
def show_results_page(result_data):  
    st.markdown("<h1 class='main-header'>最终验证内容</h1>", unsafe_allow_html=True)  
      
    # 根据内容类型添加子标题  
    if "summary" in result_data["result"]:  
        st.markdown("<h2 class='sub-header'>医学文本摘要</h2>", unsafe_allow_html=True)  
    elif "article" in result_data["result"]:  
        st.markdown("<h2 class='sub-header'>研究文章</h2>", unsafe_allow_html=True)  
    elif "sanitized_data" in result_data["result"]:  
        st.markdown("<h2 class='sub-header'>红化PHI内容</h2>", unsafe_allow_html=True)  
      
    # 在样式框中显示内容  
    st.markdown("<div class='result-box'>", unsafe_allow_html=True)  
    if "summary" in result_data["result"]:  
        st.write(result_data["result"]["summary"])  
    elif "article" in result_data["result"]:  
        st.write(result_data["result"]["article"])  
    elif "sanitized_data" in result_data["result"]:  
        st.write(result_data["result"]["sanitized_data"])  
    st.markdown("</div>", unsafe_allow_html=True)  
      
    # 动作按钮在列中  
    col1, col2, col3 = st.columns([1, 1, 1])  
    with col2:  
        # 导出按钮  
        if st.button("📥 导出结果"):  
            export_data = ""  
            if "summary" in result_data["result"]:  
                export_data += result_data["result"]["summary"] + "\n"  
            elif "article" in result_data["result"]:  
```markdown
[sult]:  
                export_data = result_data["result"]["summary"]  
            elif "article" in result_data["result"]:  
                export_data = result_data["result"]["article"]  
            elif "sanitized_data" in result_data["result"]:  
                export_data = result_data["result"]["sanitized_data"]  
                  
            st.download_button(  
                label="💾 下载内容",  
                data=export_data,  
                file_name="final_content.txt",  
                mime="text/plain"  
            )  
      
    with col3:  
        # 返回按钮  
        if st.button("🏠 返回主页"):  
            st.session_state.show_results = False  
            st.rerun()  
  
def main():  
    # 侧边栏样式  
    with st.sidebar:  
        st.markdown("<h2 style='text-align: center; color: #1E88E5;'>任务</h2>", unsafe_allow_html=True)  
        st.markdown("<div class='sidebar-content'>", unsafe_allow_html=True)  
        task_type = st.radio(  
            "",  # 空标签,因为我们使用自定义标题  
            ["summarize", "write_article", "Redact PHI"],  
            format_func=lambda x: {  
                "summarize": "📝 医学文本摘要",  
                "write_article": "📚 撰写研究文章",  
                "Redact PHI": "🔒 隐去受保护的健康信息 (PHI)"  
            }[x]  
        )  
        st.markdown("</div>", unsafe_allow_html=True)  
      
    # 主要内容 - 整页单个标题  
    st.markdown("<h1 class='main-header'>医学多代理系统</h1>", unsafe_allow_html=True)  
      
    # 初始化会话状态  
    if 'show_results' not in st.session_state:  
        st.session_state.show_results = False  
    if 'result_data' not in st.session_state:  
        st.session_state.result_data = None  
      
    if st.session_state.show_results:  
        show_results_page(st.session_state.result_data)  
        return  
      
    agent_manager = get_agent_manager()  
      
    # 任务容器,具有一致的样式  
    st.markdown("<div class='task-container'>", unsafe_allow_html=True)  
      
    if task_type == "summarize":  
        st.markdown("<h2 class='sub-header'>📝 医学文本摘要</h2>", unsafe_allow_html=True)  
        input_text = st.text_area("输入要摘要的医学文本", height=200)  
        col1, col2 = st.columns(2)  
          
        with col1:  
            if st.button("🔄 生成摘要"):  
                with st.spinner("处理中..."):  
                    result = asyncio.run(agent_manager.process_task("summarize", input_text))  
                    st.session_state.result_data = result  
                    st.markdown("<div class='result-box'>", unsafe_allow_html=True)  
                    st.subheader("摘要")  
                    st.write(result["result"]["summary"])  
                    st.markdown("</div>", unsafe_allow_html=True)  
                    st.markdown("<div class='validation-box'>", unsafe_allow_html=True)  
                    st.subheader("验证")  
                      
                    # 提取并显示分数  
                    feedback = result["validation"]["feedback"]  
                    if "Score:" in feedback:  
                        score = feedback.split("Score:")[1].split("\n")[0].strip()  
                        st.markdown(f"""  
                            <div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>  
                                <h3 style='margin: 0; color: #1565C0; text-align: center;'>验证分数: {score}</h3>  
                            </div>  
                        """, unsafe_allow_html=True)  
                      
                    st.write(feedback)  
                    st.markdown("</div>", unsafe_allow_html=True)  
          
        with col2:  
            if st.session_state.result_data and st.button("👁️ 查看编辑后的内容"):  
                st.session_state.show_results = True  
                st.rerun()  
      
    elif task_type == "write_article":  
        st.markdown("<h2 class='sub-header'>📚 撰写研究文章</h2>", unsafe_allow_html=True)  
        topic = st.text_input("输入研究主题")  
        key_points = st.text_area("输入关键点(每行一个)", height=150)  
        col1, col2 = st.columns(2)  
          
        with col1:  
            if st.button("📝 生成文章"):  
                with st.spinner("处理中..."):  
                    input_data = {"topic": topic, "key_points": key_points}  
                    result = asyncio.run(agent_manager.process_task("write_article", input_data))  
                    st.session_state.result_data = result  
                    st.markdown("<div class='result-box'>", unsafe_allow_html=True)  
                    st.subheader("文章")  
                    st.write(result["result"]["article"])  
                    st.markdown("</div>", unsafe_allow_html=True)  
                    st.markdown("<div class='validation-box'>", unsafe_allow_html=True)  
                    st.subheader("验证")  
                      
                    # 提取并显示分数  
                    feedback = result["validation"]["feedback"]  
                    if "Score:" in feedback:  
                        score = feedback.split("Score:")[1].split("\n")[0].strip()  
                        st.markdown(f"""  
                            <div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>  
                                <h3 style='margin: 0; color: #1565C0; text-align: center;'>验证分数: {score}</h3>  
                            </div>  
                        """, unsafe_allow_html=True)  
                      
                    st.write(feedback)  
                    st.markdown("</div>", unsafe_allow_html=True)  
          
        with col2:  
            if st.session_state.result_data and st.button("👁️ 查看编辑后的内容"):  
                st.session_state.show_results = True  
                st.rerun()  
      
    elif task_type == "Redact PHI":  
        st.markdown("<h2 class='sub-header'>🔒 隐去受保护的健康信息 (PHI)</h2>", unsafe_allow_html=True)  
        input_text = st.text_area("输入要隐去 PHI 的医学文本", height=200)  
        col1, col2 = st.columns(2)  
          
        with col1:  
            if st.button("🔐 隐去 PHI"):  
                with st.spinner("处理中..."):  
                    result = asyncio.run(agent_manager.process_task("sanitize", input_text))  
                    st.session_state.result_data = result  
                    st.markdown("<div class='result-box'>", unsafe_allow_html=True)  
                    st.subheader("隐去后的文本")  
                    st.write(result["result"]["sanitized_data"])  
                    st.markdown("</div>", unsafe_allow_html=True)  
                    st.markdown("<div class='validation-box'>", unsafe_allow_html=True)  
                    st.subheader("验证")  
                      
                    # 提取并显示分数  
                    feedback = result["validation"]["feedback"]  
                    if "Score:" in feedback:  
                        score = feedback.split("Score:")[1].split("\n")[0].strip()  
                        st.markdown(f"""  
                            <div style='background-color: #E3F2FD; padding: 1rem; border-radius: 8px; margin-bottom: 1rem;'>  
                                <h3 style='margin: 0; color: #1565C0; text-align: center;'>验证分数: {score}</h3>  
                            </div>  
                        """, unsafe_allow_html=True)  
                      
                    st.write(feedback)  
                    st.markdown("</div>", unsafe_allow_html=True)  
          
        with col2:  
            if st.session_state.result_data and st.button("👁️ 查看编辑后的内容"):  
                st.session_state.show_results = True  
                st.rerun()  
      
    st.markdown("</div>", unsafe_allow_html=True)  
  
if __name__ == "__main__":  
    main()

运行程序:

streamlit run app.py

7、结束语

这个多代理人工智能系统展示了协作代理在处理复杂医疗文本方面的强大功能。通过利用先进的模型如LLaMA并通过Streamlit提供用户友好的界面,该应用程序为总结、文章撰写和PHI脱敏等任务提供了实用的解决方案。随着人工智能的不断发展,像这样的系统将在转变我们处理和解释医疗数据的方式方面发挥关键作用。


原文链接:Building a Multi-Agent AI System from scratch for Medical Text Processing

汇智网翻译整理,转载请标明出处