用GenAI构建数据可视化系统
本文演示了如何构建一个数据可视化系统,该系统使用 LangChain 和 LLM 将查询转换为交互式可视化见解。该系统集成了 OpenAI 的 GPT-4o-mini 用于查询解释和数据转换、BigQuery 作为数据存储解决方案以及 Streamlit 以提供实时可视化。
我们将创建一个全栈数据分析系统,该系统:
- 将文本问题转换为优化的 SQL 查询
- 针对 BigQuery 执行查询并处理结果
- 提取相关数据特征(如地理数据的坐标)
- 根据查询结果自动检测适当的可视化类型
- 将数据转换为特定于格式的可视化结构
- 使用 Streamlit 提供实时交互式可视化
- 将见解解释为自然语言
- 使用交互式地图处理地理数据可视化,并使用动态图表处理基于图表的数据可视化。
1、先决条件
在开始之前,请确保你已安装以下内容:
- Python 3.7 或更高版本
- OpenAI API 密钥
- 访问 BigQuery 数据集
- 必要的 Python 库:使用以下 pip 命令安装它们:
pip install streamlit langchain openai google-cloud-bigquery pandas "langchain-google-community[bigquery]" python-dotenv langchain_openai plotly
2、架构概述
在深入实施之前,让我们先了解一下数据可视化代理的架构。
组件:
- 用户界面 (Streamlit):允许用户输入查询并显示结果。
- 文本到 SQL:使用 gpt-4o-mini 将查询转换为 SQL。
- 查询执行:针对数据库执行 SQL 查询。
- 坐标提取器:从查询结果中提取地理坐标以进行地图可视化。
- 可视化推荐器:使用 GPT-4o-mini 根据查询和结果建议最合适的可视化。
- 数据转换器:使用 GPT-4o-mini 将 SQL 查询结果转换为适合推荐可视化的格式。
- 可视化渲染器(Streamlit 和 Plotly):使用转换后的数据渲染推荐的可视化。
工作流:
- 输入:用户输入问题。
- 文本到 SQL 的转换:将问题转换为 SQL 查询。
- 查询执行:针对数据库执行 SQL 查询。
- 坐标提取器:从查询结果中提取地理坐标以进行地图可视化。
- 可视化推荐:根据问题、查询和结果,AI 推荐一种可视化类型。
- 数据转换:查询结果被转换为适合推荐的可视化格式。
- 答案生成:生成对用户问题的文本答案。
- 结果格式:答案、可视化和地理数据被编译成最终结果。
- 输出:用户界面显示答案、可视化和地图(如果适用)。
3、设置数据库
我们将使用由客户信息组成的示例数据集。以下是在 BigQuery 数据集中创建必要表的 DDL 语句:
CREATE TABLE `demo_data_set.customer` (
customer_key STRING NOT NULL,
first_name STRING,
last_name STRING,
source_system_name STRING,
dob DATE,
gender STRING,
create_timestamp TIMESTAMP
);
CREATE TABLE `demo_data_set.customer_address` (
customer_key STRING NOT NULL,
address_key STRING NOT NULL
);
CREATE TABLE `demo_data_set.address` (
address_key STRING NOT NULL,
full_address STRING,
state STRING,
country STRING,
latitude STRING,
longitude STRING
);
4、构建数据可视化代理
代理的工作流程涉及多个步骤,从将用户问题转换为 SQL 查询到推荐适当的可视化并呈现最终答案。
4.1 配置数据库连接和 LLM
service_account_file = os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
project = os.environ["GOOGLE_PROJECT"]
dataset = os.environ["BIGQUERY_DATASET"]
sql_url = (
f"bigquery://{project}/{dataset}?credentials_path={service_account_file}"
)
db = SQLDatabase.from_uri(sql_url)
model = ChatOpenAI(model="gpt-4o-mini", temperature=0,
max_tokens="10000", timeout=30000, verbose=True)
4.2 文本到 SQL
我们首先使用 LLM 模型将自然语言问题转换为 SQL 查询:
from langchain.chains import create_sql_query_chain
sql_prompt = PromptTemplate.from_template("""
You are a SQL expert with access to a BigQuery dataset containing customers and customer addresses.
Given an input question, generate a syntactically correct SQL query to answer it. Unless explicitly requested otherwise, limit the results to {top_k} rows.
Relevant Table Information:
{table_info}
Question: {input}
Guidelines:
1. Ensure that all attribute searches are case-insensitive.
2. ALWAYS add 'LIMIT {top_k}' at the end of the query unless:
- The question explicitly asks for all records
- The query uses GROUP BY and needs to show all groups
- The query is counting records (using COUNT)
- The query calculates aggregates that need all data
Address and Location Queries:
1. For questions about addresses, locations, or properties, always include latitude and longitude columns in the SELECT clause.
Double check the user's sql query for common mistakes, including:
- Using NOT IN with NULL values
- Using UNION when UNION ALL should have been used
- Using BETWEEN for exclusive ranges
- Data type mismatch in predicates
- Properly quoting identifiers
- Using the correct number of arguments for functions
- Casting to the correct data type
- Using the proper columns for joins
- Missing LIMIT clause when returning raw records
If there are any of the above mistakes, rewrite the query.
If there are no mistakes, just reproduce the original query with no further commentary.
Provide only the final SQL query as plain text without any formatting.
If the question is not about customers or addresses, respond with "I don't know"
""")
text_to_sql = create_sql_query_chain(
model, db, sql_prompt)
4.3 执行SQL查询
一旦我们有了SQL查询,就对数据库执行它:
@RunnableLambda
def execute_query(result, config):
dispatch_custom_event(
"process.execute_query",
{
"status": ""
},
config=config
)
return {
**result,
'result': db.run_no_throw(
command=result["query"], include_columns=True)
}
4.4 提取坐标
如果查询结果包含地理数据,我们将提取纬度和经度以进行地图可视化:
def extract_coordinates(self, result: dict):
try:
if isinstance(result, dict):
if "result" in result:
result = result["result"]
if isinstance(result, dict) and "result" in result:
result_str = result["result"]
else:
result_str = str(result)
else:
return None
else:
return None
try:
if isinstance(result_str, str):
result_data = eval(result_str)
else:
result_data = result_str
except Exception as e:
print(f"Error evaluating result string: {e}")
return None
if not isinstance(result_data, list):
return None
unique_lat_values = set()
unique_long_values = set()
for row in result_data:
if isinstance(row, dict):
if 'latitude' in row and row['latitude'] is not None:
try:
unique_lat_values.add(float(row['latitude']))
except (ValueError, TypeError):
pass
if 'longitude' in row and row['longitude'] is not None:
try:
unique_long_values.add(float(row['longitude']))
except (ValueError, TypeError):
pass
if unique_lat_values and unique_long_values:
return {
"latitude": list(unique_lat_values),
"longitude": list(unique_long_values)
}
return None
except Exception as e:
print(f"Error extracting coordinates: {e}")
return None
4.5 推荐可视化
我们使用 LLM 根据用户的问题、SQL 查询和结果推荐最合适的可视化:
viz_prompt = PromptTemplate.from_template("""
You are an AI assistant that recommends appropriate data visualizations for customer and address analytics. Based on the user's question, SQL query, and query results, suggest the most suitable type of graph or chart to visualize the data.
Available chart types and their best use cases:
- Bar Graphs (for 3+ categories):
* Comparing distributions across multiple categories
* Customer counts by source system
* Customer demographics across regions/states
* Age group distributions
* Monthly/yearly registration counts
- Horizontal Bar Graphs (for 2-3 categories or large value disparities):
* Binary comparisons (e.g., gender distribution)
* Limited category comparisons (2-3 items)
* Cases with large value differences between categories
- Line Graphs (for time series only):
* Customer registration trends over time
* Growth patterns by source system
* Any metric tracked over time periods
Note: X-axis MUST represent time (create_timestamp or similar)
- Pie Charts (for proportions, 3-7 categories max):
* Distribution percentages
* Market share analysis
* Proportional comparisons
Note: Total should sum to 100%
- Scatter Plots (for numeric relationships):
* Age vs other numeric metrics
* Timestamp patterns
* Distribution analysis
Note: Both axes must be numeric, non-categorical
Special Cases:
1. Geographic Data:
* If result contains latitude and longitude → No chart (will display map)
* For address/location questions → No chart (will display map)
2. Raw Data:
* Individual customer records → No chart (tabular display)
* Non-aggregated data → No chart (tabular display)
Tables in scope:
- customer: customer_key, first_name, last_name, source_system_name, dob, gender, create_timestamp
- customer_address: customer_key, address_key
- address: address_key, full_address, state, country, latitude, longitude
Question: {question}
SQL Query: {query}
SQL Result: {result}
Provide your response in the following format:
Recommended Visualization: [Chart type or "none"]. ONLY use the following names: bar, horizontal_bar, line, pie, scatter, none
Reason: [Brief explanation for your recommendation]
""")
@RunnableLambda
def transform_data_for_visualization_chain(args, config):
try:
dispatch_custom_event(
"process.transform_data_for_visualization_chain",
{
"status": ""
},
config=config
)
chart_type = args.get("visualization").get("type")
result = args.get("result")
if not chart_type or not result:
return {"chart_data": None}
if chart_type == 'bar':
transform_prompt = bar_prompt
elif chart_type == 'horizontal_bar':
transform_prompt = horizontal_bar_prompt
elif chart_type == 'pie':
transform_prompt = pie_prompt
elif chart_type == 'scatter':
transform_prompt = scatter_prompt
elif chart_type == 'line':
transform_prompt = line_prompt
else:
transform_prompt = None
assign_chart_type_and_result = RunnableLambda(
lambda args: {**args, "chart_type": args.get("visualization", {}).get(
"type"), "result": args.get("result")}
)
if transform_prompt:
transform_chain = (
assign_chart_type_and_result
| transform_prompt
| model
)
return transform_chain
return {"chart_data": None}
except Exception as e:
print(e)
print(f"Error in transform_data_for_visualization: {e}")
return {"chart_data": None}
viz_prompt
| model
| parse_visualization_response
4.6 转换数据以进行可视化
根据推荐的图表类型,我们会相应地转换数据:
def create_data_transform_prompt(self):
base_prompt = """You are a data transformation expert. Transform the SQL query result into the exact format needed for a {chart_type} chart.
SQL Query Result: {result}
Your response must be a valid JSON object containing ONLY the chart_data field with the exact structure shown in the example.
"""
chart_prompts = {
"bar": """For a bar chart, return JSON in this EXACT format:
{{
"chart_data": {{
"labels": ["Category1", "Category2", ...],
"values": [
{{
"data": [number1, number2, ...],
"label": "Metric Name"
}}
]
}}
}}
Example with SQL: "SELECT source_system_name, COUNT(*) as count FROM customer GROUP BY source_system_name"
{{
"chart_data": {{
"labels": ["System A", "System B", "System C"],
"values": [
{{
"data": [45, 32, 28],
"label": "Customer Count"
}}
]
}}
}}
Example with multiple series:
{{
"chart_data": {{
"labels": ["NSW", "VIC", "QLD"],
"values": [
{{
"data": [500000, 750000, 450000],
"label": "Total Customers"
}},
{{
"data": [35, 30, 28],
"label": "Average Age"
}}
]
}}
}}""",
"horizontal_bar": """For a horizontal bar chart, return JSON in this EXACT format:
{{
"chart_data": {{
"labels": ["Category1", "Category2", ...],
"values": [
{{
"data": [number1, number2, ...],
"label": "Metric Name"
}}
]
}}
}}
Example:
{{
"chart_data": {{
"labels": ["Male", "Female"],
"values": [
{{
"data": [75000, 78000],
"label": "Customer Count"
}}
]
}}
}}""",
"line": """For a line chart, return JSON in this EXACT format:
{{
"chart_data": {{
"xValues": ["2023-01", "2023-02", ...],
"yValues": [
{{
"data": [number1, number2, ...],
"label": "Metric Name"
}}
]
}}
}}
Example:
{{
"chart_data": {{
"xValues": ["2023-01", "2023-02", "2023-03", "2023-04"],
"yValues": [
{{
"data": [12500, 13600, 14800, 15200],
"label": "Monthly Registrations"
}}
]
}}
}}
Example with multiple series:
{{
"chart_data": {{
"xValues": ["2023-01", "2023-02", "2023-03"],
"yValues": [
{{
"data": [5000, 5500, 6000],
"label": "System A Customers"
}},
{{
"data": [4000, 4200, 4500],
"label": "System B Customers"
}}
]
}}
}}""",
"pie": """For a pie chart, return JSON in this EXACT format:
{{
"chart_data": [
{{
"value": number,
"label": "Category Name"
}}
]
}}
Example:
{{
"chart_data": [
{{
"value": 150,
"label": "System A"
}},
{{
"value": 45,
"label": "System B"
}},
{{
"value": 25,
"label": "System C"
}}
]
}}""",
"scatter": """For a scatter plot, return JSON in this EXACT format:
{{
"chart_data": {{
"series": [
{{
"data": [
{{
"x": number,
"y": number,
"id": number
}}
],
"label": "Series Name"
}}
]
}}
}}
Example:
{{
"chart_data": {{
"series": [
{{
"data": [
{{
"x": -33.865,
"y": 151.209,
"id": 1
}},
{{
"x": -37.813,
"y": 144.963,
"id": 2
}},
{{
"x": -27.470,
"y": 153.021,
"id": 3
}}
],
"label": "Customer Locations"
}}
]
}}
}}
Example with multiple series:
{{
"chart_data": {{
"series": [
{{
"data": [
{{
"x": -33.865,
"y": 151.209,
"id": 1
}},
{{
"x": -37.813,
"y": 144.963,
"id": 2
}}
],
"label": "Male Customers"
}},
{{
"data": [
{{
"x": -27.470,
"y": 153.021,
"id": 3
}},
{{
"x": -31.950,
"y": 115.860,
"id": 4
}}
],
"label": "Female Customers"
}}
]
}}
}}"""
}
bar_prompt = base_prompt + chart_prompts.get("bar")
horizontal_bar_prompt = base_prompt + \
chart_prompts.get("horizontal_bar")
pie_prompt = base_prompt + chart_prompts.get("pie")
scatter_prompt = base_prompt + chart_prompts.get("scatter")
line_prompt = base_prompt + chart_prompts.get("line")
return (
PromptTemplate.from_template(bar_prompt),
PromptTemplate.from_template(horizontal_bar_prompt),
PromptTemplate.from_template(pie_prompt),
PromptTemplate.from_template(scatter_prompt),
PromptTemplate.from_template(line_prompt)
)
bar_prompt, horizontal_bar_prompt, pie_prompt, scatter_prompt, line_prompt = create_data_transform_prompt()
@RunnableLambda
def transform_data_for_visualization_chain(args, config):
try:
dispatch_custom_event(
"process.transform_data_for_visualization_chain",
{
"status": ""
},
config=config
)
chart_type = args.get("visualization").get("type")
result = args.get("result")
if not chart_type or not result:
return {"chart_data": None}
if chart_type == 'bar':
transform_prompt = bar_prompt
elif chart_type == 'horizontal_bar':
transform_prompt = horizontal_bar_prompt
elif chart_type == 'pie':
transform_prompt = pie_prompt
elif chart_type == 'scatter':
transform_prompt = scatter_prompt
elif chart_type == 'line':
transform_prompt = line_prompt
else:
transform_prompt = None
assign_chart_type_and_result = RunnableLambda(
lambda args: {**args, "chart_type": args.get("visualization", {}).get(
"type"), "result": args.get("result")}
)
if transform_prompt:
transform_chain = (
assign_chart_type_and_result
| transform_prompt
| model
)
return transform_chain
return {"chart_data": None}
except Exception as e:
print(e)
print(f"Error in transform_data_for_visualization: {e}")
return {"chart_data": None}
4.7 生成答案
我们使用 LLM 模型生成用户问题的文本答案:
create_answer_prommpt = PromptTemplate.from_template(
"""Given the following user question, corresponding SQL query, and SQL result, answer the user question.
Question: {question}
SQL Query: {query}
SQL Result: {result}
Answer: """
)
create_answer_prommpt | model
4.8 格式化最终结果
接下来,我们将答案、可视化和任何坐标编译到最终结果中:
@RunnableLambda
def format_final_result(result, config):
try:
dispatch_custom_event(
"process.format_final_result",
{
"status": ""
},
config=config
)
if isinstance(result, str):
try:
result = json.loads(result)
except json.JSONDecodeError:
result = {"answer": result}
answer = ""
chart_data = None
chart_type = None
coordinates = None
# Extract chart data from AIMessage
if isinstance(result, dict):
coordinates = result.get("coordinates")
# Get chart type from visualization
visualization = result.get('visualization', {})
if isinstance(visualization, dict):
chart_type = visualization.get('type')
chart_data_msg = result.get('chart_data')
if hasattr(chart_data_msg, 'content'):
try:
content = chart_data_msg.content
content = content.replace(
'```json', '').replace('```', '').strip()
parsed_data = json.loads(content)
if isinstance(parsed_data, dict) and 'chart_data' in parsed_data:
chart_data = parsed_data['chart_data']
except json.JSONDecodeError:
print("Failed to parse chart data JSON")
chart_data = None
answer_msg = result.get('answer')
if hasattr(answer_msg, 'content'):
answer = answer_msg.content
elif isinstance(answer_msg, str):
answer = answer_msg
elif isinstance(answer_msg, dict) and 'content' in answer_msg:
answer = answer_msg['content']
else:
result_data = result.get("result", {})
if isinstance(result_data, dict) and "result" in result_data:
answer = str(result_data["result"])
else:
answer = str(result_data)
response_dict = {
"answer": answer,
"coordinates": coordinates,
"chart_data": chart_data,
"chart_type": chart_type
}
return json.dumps(response_dict)
except Exception as e:
print(f"Error in format_final_result: {e}")
return json.dumps({
"answer": "Error formatting result",
"coordinates": None,
"chart_data": None,
"chart_type": None
})
4.9 完整链组装
我们将所有步骤组装成一个有凝聚力的链:
chain = (
RunnablePassthrough().assign(query=text_to_sql)
| RunnablePassthrough().assign(
result=execute_query
)
| RunnablePassthrough().assign(
coordinates=lambda x: extract_coordinates(x)
)
| RunnablePassthrough.assign(
visualization=RunnableLambda(
lambda x: {
"question": x.get("question", ""),
"query": x["query"],
"result": x.get("result", {}).get("result")
}
)
| viz_prompt
| model
| parse_visualization_response
)
| RunnablePassthrough().assign(
chart_data=transform_data_for_visualization_chain
)
| RunnablePassthrough.assign(
answer=create_answer_prommpt | model
)
| format_final_result
| StrOutputParser()
)
5、与 Streamlit 集成
Streamlit 允许我们创建一个交互式 Web 应用程序来与我们的代理交互。它使用户能够与可视化代理交互、执行数据库查询和查看动态可视化——所有这些都通过自然语言对话完成。
虽然我不会在本文中提供 Streamlit 应用程序的完整源代码,但你可以在此处找到示例代码。在本节中,我们将特别关注如何使用 Lanchain Streaming API 和自定义事件处理程序扩展 Streamlit,以增强用户交互并提供实时反馈。
5.1 使用自定义事件处理程序扩展 Streamlit
使用 LangChain 的流式 API 时,defaultStreamlitCallbackHandler 只能显示第一个链的 LLM 状态(例如,文本到 SQL 步骤)。要在 Streamlit 界面中显示整个链中的 LLM 状态,我们需要创建一个可以接收和处理自定义事件的自定义事件处理程序。
5.2 导入正确的 Streamlit 回调处理程序
首先,从适当的模块导入 StreamlitCallbackHandler:
from langchain_community.callbacks.streamlit.streamlit_callback_handler import StreamlitCallbackHandler
接下来,创建一个从 StreamlitCallbackHandler 继承的自定义类来处理自定义事件:
class CustomStreamlitCallbackHandler(StreamlitCallbackHandler):
def on_custom_event(self, name: str, data: dict, **kwargs):
"""Handle custom events, update labels, and mark as complete if specified."""
if self._current_thought is not None:
custom_event_label = f"💡{name}"
self._current_thought.container.update(
new_label=custom_event_label)
content = f"**{name}:** {data}"
self._current_thought.container.markdown(content)
is_complete = data.get("is_complete", False)
if is_complete or name == "process.completed":
complete_label = f"✅ Complete, awaiting response"
self._current_thought.complete(final_label=complete_label)
else:
st.write(f"Custom Event Triggered Outside Thought Context: {data}")
def on_llm_end(self, response, **kwargs):
"""Override to ensure the label updates on LLM completion."""
super().on_llm_end(response, **kwargs)
if self._current_thought:
self._current_thought.complete(
final_label="✅ Complete, awaiting response")
def on_tool_end(self, output, **kwargs):
"""Override to ensure the label updates on tool completion."""
super().on_tool_end(output, **kwargs)
if self._current_thought:
self._current_thought.complete(final_label="✅ Tool Complete")
5.3 在 LangChain 代码中调度自定义事件
要使用自定义事件处理程序,请在适当的位置在 LangChain 代码中调度自定义事件。例如,在 execute_query 步骤中:
from langchain.callbacks import dispatch_custom_event
@RunnableLambda
def execute_query(query, config):
# Send custom event
dispatch_custom_event(
"process.execute_query",
{"status": "Executing SQL query..."},
config=config
)
try:
result = db.run_query(query)
dispatch_custom_event(
"process.execute_query",
{"status": "SQL query executed successfully.", "is_complete": True},
config=config
)
return result
except Exception as e:
dispatch_custom_event(
"process.execute_query",
{"status": f"Error executing SQL query: {e}", "is_complete": True},
config=config
)
return None
5.4 更新 Streamlit 应用以使用自定义处理程序
在你的 Streamlit 中app,初始化自定义回调处理程序并将其传递给你的 LangChain 链:
def process_query(question: str, assistant: DatabaseAssistant) -> Dict[str, Any]:
try:
chain = assistant.process_query_chain()
return chain.stream(
{"question": question, "top_k": 10},
{"callbacks": [get_streamlit_cb(st.container())]}
)
except Exception as e:
st.error(f"Error processing query: {str(e)}")
return {"answer": "Sorry, I encountered an error processing your query.", "chart_data": None, "chart_type": None}
with st.chat_message("assistant"):
response = process_query(prompt, assistant)
if response is not None:
for chunk in response:
handle_stream_response(
chunk, st.session_state.messages)
通过实现自定义事件处理程序,我们可以在整个处理链中提供实时状态更新,通过向用户告知每个步骤来增强用户体验。
6、运行应用程序
现在让我们运行应用程序🥳。你可以在我的 GitHub 存储库中找到完整的工作示例。要启动应用程序,请导航到包含 main.py 文件的目录并执行以下命令:
streamlit run main.py
查看如何根据 BigQuery 数据集中的数据生成数据和图表。该应用程序连接到 BigQuery 数据集,执行生成的 SQL 查询,并相应地显示动态可视化和地图。自定义事件处理程序在 Streamlit 界面中提供实时反馈,增强了交互性和用户体验。
查看下面的记录:
7、结束语
此可视化解决方案简化了数据分析工作流程,即使没有丰富的技术专业知识的用户也可以访问它。你可以通过合并其他可视化类型、集成更多数据源或改进其自然语言理解能力来进一步增强此代理。尽情探索它的可能性吧!
原文链接:Building a Real-Time Data Visualization Solution with Generative AI
汇智网翻译整理,转载请标明出处