基于CrewAI的数据科学自动化
在这篇文章中,我将演示如何使用AI代理创建一个端到端的数据科学管道,以自动化数据检索、预处理、训练和评估。
1、前提条件
创建并激活虚拟环境:
pip install virtualenv
virtualenv venv
source venv/bin/activate # 适用于MacOS/Linux
source venv\Scripts\activate # 适用于Windows
创建.env文件:
SERPER_API_KEY="输入密钥"
KAGGLE_USERNAME="输入Kaggle用户名"
KAGGLE_KEY="输入Kaggle令牌"
OPENAI_API_KEY="输入OpenAI API密钥"
安装包:
pip install apify_client apify_shared ipykernel crewai crewai-tools langchain langchain-anthropic langchain-cohere langchain-community langchain-core langchain-experimental langchain-google-community langchain-google-community[places] langchain-groq langchain-openai langchain-text-splitters langchainhub langdetect langsmith unstructured gradio scikit-learn pandas xgboost
2、数据收集代理
首先,我们创建一个自定义工具来简化从Kaggle下载数据集的过程。我们将定义一个名为 KaggleDatasetDownloader
的类,基于CrewAI的 BaseTool
。该工具使用Kaggle API进行身份验证并执行下载,从提供的URL中提取数据集所有者和名称。然后将数据集解压缩到指定目录( ./raw_data
)。我们将所有工具存储在工具文件夹中以供后续使用。
from crewai_tools import BaseTool
from kaggle.api.kaggle_api_extended import KaggleApi
class KaggleDatasetDownloader(BaseTool):
name: str = "Kaggle Dataset Downloader"
description: str = "使用提供的URL从Kaggle下载数据集。"
def _run(self, url: str) -> str:
try:
# API身份验证
api = KaggleApi()
api.authenticate()
# 从URL提取数据集信息
parts = url.split('/')
owner = parts[-2]
dataset_name = parts[-1]
# 下载数据集
api.dataset_download_files(f"{owner}/{dataset_name}", path='./raw_data', unzip=True)
return f"成功下载数据集:{owner}/{dataset_name}到./raw_data目录"
except Exception as e:
if '403' in str(e):
return "错误403:禁止访问。请检查您的Kaggle API凭证和数据集权限。"
else:
return f"下载数据集时出错:{str(e)}"
接下来,我们将创建名为 data_collection_agent
的代理。
from crewai import Agent
import os
from langchain_openai import ChatOpenAI
from tools.kaggle_dataset_tool import KaggleDatasetDownloader
from crewai_tools import SerperDevTool
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv()) # 重要的一行,如果无法加载API密钥
# 从.env文件获取API密钥
os.environ["SERPER_API_KEY"] = os.getenv('SERPER_API_KEY')
os.environ['KAGGLE_USERNAME'] = os.getenv('KAGGLE_USERNAME')
os.environ['KAGGLE_KEY'] = os.getenv('KAGGLE_KEY')
os.environ["OPENAI_API_KEY"] = os.getenv('OPENAI_API_KEY')
# LLM模型
llm = ChatOpenAI(model="gpt-4o", temperature=0)
# 加载CrewAI工具
serper_search_tool = SerperDevTool()
# 加载自定义工具
kaggle_tool = KaggleDatasetDownloader()
data_collection_agent = Agent(
role='Data Acquisition Specialist',
goal='Find and download appropriate datasets on a given topic',
backstory='Expert in acquiring datasets from various sources, specializing in climate data',
tools=[serper_search_tool, kaggle_tool],
llm=llm,
verbose=True
)
3、数据预处理代理
除了数据收集代理,我们还需要另一个代理来按顺序执行数据预处理任务。我们从构建名为 DataPreprocessor
的自定义工具开始。
DataPreprocessor
工具将处理缺失值、删除重复值、识别分类列并使用 LabelEncoder
将其转换为数值。处理后的文件将存储在指定目录中,并生成数据预处理步骤的详细报告。
import os
import pandas as pd
from crewai_tools import BaseTool
from sklearn.preprocessing import LabelEncoder
class DataPreprocessor(BaseTool):
name: str = "数据预处理器"
description: str = "通过处理缺失值、删除重复值和对分类变量进行编码来预处理数据。"
def _run(self, file_path: str) -> str:
# 加载数据
df = pd.read_csv(file_path)
# 获取初始信息
initial_shape = df.shape
initial_missing = df.isnull().sum().sum()
# 处理缺失值
df = df.dropna() # 或使用df.fillna()并采用适当策略
# 删除重复条目
df = df.drop_duplicates()
# 识别分类列
categorical_columns = df.select_dtypes(include=['object']).columns.tolist()
# 将分类变量转换为数值
label_encoder = LabelEncoder()
for col in categorical_columns:
df[col] = label_encoder.fit_transform(df[col])
# 获取最终信息
final_shape = df.shape
final_missing = df.isnull().sum().sum()
# 保存处理后的数据
processed_file_path = os.path.join('processed_data', 'processed_data.csv')
df.to_csv(processed_file_path, index=False)
return f"""
Data preprocessing completed:
- Initial shape: {initial_shape}
- Initial missing values: {initial_missing}
- Final shape: {final_shape}
- Final missing values: {final_missing}
- Categorical variables encoded: {categorical_columns}
- Duplicates removed
- Processed data saved to: {processed_file_path}
"""
data_preprocessing_agent
需要访问下载数据集的目录。我们使用内置的CrewAI工具 DirectoryReadTool
来访问 raw_data
文件夹。
from crewai_tools import DirectoryReadTool
docs_tool_a = DirectoryReadTool(directory='raw_data')
data_preprocessing_agent = Agent(
role="Data Preprocessing Specialist",
goal="Load, clean, and perform initial transformations on datasets",
backstory="Expert in data cleaning and preprocessing using pandas, numpy, and sklearn libraries",
llm=llm,
tools=[docs_tool_a, data_processing_tool],
# allow_code_execution=True
)
我们将为每个代理定义任务,并描述每个代理应执行的操作。 data_collection_task
将搜索至少三个数据集并使用 kaggle_tool
下载一个。 data_preprocessing_task
将加载数据集并执行基本的预处理步骤。
from crewai import Crew, Task
from textwrap import dedent
from data_agents import DataAgents
agents = DataAgents()
# 代理
data_collection_agent = agents.data_collection_agent() # 数据收集代理
data_preprocessing_agent = agents.data_processing_agent() # 数据处理代理
# 任务
data_collection_task = Task(
description="""
Search for three appropriate datasets on the topic of {topic} and download one using the Kaggle Dataset Downloader.
You can search for datasets using refined queries. Note that the Kaggle Dataset Downloader only requires one input, i.e., the URL.
""",
expected_output = 'Provide the full description of the downloaded dataset.',
agent=data_collection_agent,
)
data_preprocessing_task = Task(
description="""
Load the file, handle missing values, remove duplicates, and convert categorical variables to numerical values to make the dataset model-ready.
""",
expected_output='Processed dataset saved successfully',
agent=data_preprocessing_agent,
)
让我们使用CrewAI将这两个代理组合起来。我们将搜索关于住房的主题的数据集。
crew = Crew(
agents=[data_collection_agent, data_preprocessing_agent],
tasks=[data_collection_task, data_preprocessing_task],
verbose=2
)
result = crew.kickoff(inputs={'topic': 'housing'})
4、模型训练代理
一旦我们有了适合建模的数据集,我们可以继续创建我们的模型训练代理。我们将创建一个名为 TrainingModelTool
的自定义工具类。这里我们使用随机森林模型,但你可以随意更改并使用其他机器学习算法。
from crewai import Agent, Task, Crew
from crewai_tools import BaseTool
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.ensemble import RandomForestRegressor
import pickle
import os
class TrainingModelTool(BaseTool):
name: str = "随机森林模型训练器"
description: str = "使用随机森林模型进行房价预测并将其保存为pickle文件"
def _run(self, file_path, target_variable):
# 加载并准备数据
data = pd.read_csv(file_path)
X = data.drop(target_variable, axis=1)
y = data[target_variable]
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 缩放特征
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 训练随机森林模型
model = RandomForestRegressor(random_state=42)
model.fit(X_train_scaled, y_train)
# 进行预测
y_pred = model.predict(X_test_scaled)
# 评估模型
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_test, y_pred)
# 特征重要性
feature_importance = pd.DataFrame({
'feature': X.columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
# 创建'saved_model'文件夹(如果不存在)
os.makedirs('saved_model', exist_ok=True)
# 创建'train_test_data'文件夹(如果不存在)
os.makedirs('train_test_data', exist_ok=True)
# 将模型保存为pickle文件在'saved_model'文件夹中
model_filename = os.path.join('saved_model', 'random_forest_model.pkl')
with open(model_filename, 'wb') as file:
pickle.dump(model, file)
# 将缩放器保存为pickle文件在'saved_model'文件夹中
scaler_filename = os.path.join('saved_model', 'scaler.pkl')
with open(scaler_filename, 'wb') as file:
pickle.dump(scaler, file)
# 将训练数据和测试数据保存在'train_test_data'文件夹中
train_data = pd.concat([X_train, y_train], axis=1)
test_data = pd.concat([X_test, y_test], axis=1)
train_filename = os.path.join('train_test_data', 'train_data.csv')
test_filename = os.path.join('train_test_data', 'test_data.csv')
train_data.to_csv(train_filename, index=False)
test_data.to_csv(test_filename, index=False)
report = f"随机森林模型训练报告:\n\n"
report += f"均方根误差:{rmse:.2f}\n"
report += f"R平方得分:{r2:.4f}\n\n"
report += "最重要的前5个特征:\n"
for _, row in feature_importance.head().iterrows():
report += f"- {row['feature']}: {row['importance']:.4f}\n"
report += f"\n模型保存为:{os.path.abspath(model_filename)}\n"
report += f"缩放器保存为:{os.path.abspath(scaler_filename)}\n"
report += f"训练数据保存为:{os.path.abspath(train_filename)}\n"
report += f"测试数据保存为:{os.path.abspath(test_filename)}\n"
return report
现在,让我们创建模型训练代理。请注意,此函数也将添加到DataAgents类中,就像之前的两个代理一样。
from tools.model_training_tool import TrainingModelTool
from crewai_tools import DirectoryReadTool
# 工具以访问文件夹中的数据
docs_tool_b = DirectoryReadTool(directory='processed_data')
# 初始化工具
model_training_tool = TrainingModelTool()
def model_training_agent(self):
return Agent(
role="Random Forest Model Trainer",
goal="Train an Random Forest model for the dataset",
backstory="You are an expert in machine learning, specializing in Random Forest for regression/classification tasks.",
tools=[docs_tool_b, model_training_tool],
llm=llm
)
5、模型评估
让我们创建任务并执行这个单一代理。目标变量是 price
,这是我们为数据集提供的输入任务函数。如果训练成功,我们应该在 saved_model
文件夹中看到训练好的模型和缩放器保存为pickle文件,以及 reports
文件夹中的 training_report.txt
。
from crewai import Crew, Task
from textwrap import dedent
from data_agents import DataAgents
agents = DataAgents()
## 模型训练
# 代理
model_training_agent = agents.model_training_agent() # 模型训练代理
# 任务
target_variable = input(
dedent('price'))
model_training_task = Task(
description=dedent(f"""
Load the processed data from the directory. Train a Random Forest model and save the trained model.
Note that TrainingModelTool._run() has two positional arguments which are file_path and the {target_variable}
"""),
expected_output="Model trained successfully",
output_file='reports/training_report.txt',
agent=model_training_agent,
)
crew = Crew(
agents=[model_training_agent],
tasks=[model_training_task],
verbose=2
)
result = crew.kickoff()
6、结束语
使用CrewAI和自定义工具,我们已经自动化了数据收集、预处理和模型训练的数据科学管道步骤。这种方法不仅节省时间,还确保了数据科学工作流程的一致性和可重复性。
原文链接:Data Science Automation: A Step-by-Step Guide Using CrewAI
汇智网翻译整理,转载请标明出处