Fundamental_Analysis/backend/app/services/config_manager.py
xucheng b5a4d2212c feat: 实现动态分析配置并优化前端UI
本次提交引入了一系列重要功能,核心是实现了财务分析模块的动态配置,并对配置和报告页面的用户界面进行了改进。

主要变更:

- **动态配置:**
  - 后端实现了 `ConfigManager` 服务,用于动态管理 `analysis-config.json` 和 `config.json`。
  - 添加了用于读取和更新配置的 API 端点。
  - 开发了前端 `/config` 页面,允许用户实时查看和修改分析配置。

- **后端增强:**
  - 更新了 `AnalysisClient` 和 `CompanyProfileClient` 以使用新的配置系统。
  - 重构了财务数据相关的路由。

- **前端改进:**
  - 新增了可复用的 `Checkbox` UI 组件。
  - 使用更直观和用户友好的界面重新设计了配置页面。
  - 改进了财务报告页面的布局和数据展示。

- **文档与杂务:**
  - 更新了设计和需求文档以反映新功能。
  - 更新了前后端依赖。
  - 修改了开发脚本 `dev.sh`。
2025-10-30 14:50:36 +08:00

305 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Configuration Management Service
"""
import json
import os
import asyncio
from typing import Any, Dict
import asyncpg
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.models.system_config import SystemConfig
from app.schemas.config import ConfigResponse, ConfigUpdateRequest, DatabaseConfig, NewApiConfig, DataSourceConfig, ConfigTestResponse
class ConfigManager:
"""Manages system configuration by merging a static JSON file with dynamic settings from the database."""
def __init__(self, db_session: AsyncSession, config_path: str = None):
self.db = db_session
if config_path is None:
# Default path: backend/app/services -> project_root/config/config.json
# __file__ = backend/app/services/config_manager.py
# go up three levels to project root
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
self.config_path = os.path.join(project_root, "config", "config.json")
else:
self.config_path = config_path
def _load_base_config_from_file(self) -> Dict[str, Any]:
"""Loads the base configuration from the JSON file."""
if not os.path.exists(self.config_path):
return {}
try:
with open(self.config_path, "r", encoding="utf-8") as f:
return json.load(f)
except (IOError, json.JSONDecodeError):
return {}
async def _load_dynamic_config_from_db(self) -> Dict[str, Any]:
"""Loads dynamic configuration overrides from the database.
当数据库表尚未创建(如开发环境未运行迁移)时,优雅降级为返回空覆盖配置,避免接口 500。
"""
try:
db_configs: Dict[str, Any] = {}
result = await self.db.execute(select(SystemConfig))
for record in result.scalars().all():
db_configs[record.config_key] = record.config_value
return db_configs
except Exception:
# 表不存在或其他数据库错误时,忽略动态配置覆盖
return {}
def _merge_configs(self, base: Dict[str, Any], overrides: Dict[str, Any]) -> Dict[str, Any]:
"""Deeply merges the override config into the base config."""
for key, value in overrides.items():
if isinstance(value, dict) and isinstance(base.get(key), dict):
base[key] = self._merge_configs(base[key], value)
else:
base[key] = value
return base
async def get_config(self) -> ConfigResponse:
"""Gets the final, merged configuration."""
base_config = self._load_base_config_from_file()
db_config = await self._load_dynamic_config_from_db()
merged_config = self._merge_configs(base_config, db_config)
# 兼容两种位置:优先使用 new_api其次回退到 llm.new_api
new_api_src = merged_config.get("new_api") or merged_config.get("llm", {}).get("new_api", {})
return ConfigResponse(
database=DatabaseConfig(**merged_config.get("database", {})),
new_api=NewApiConfig(**(new_api_src or {})),
data_sources={
k: DataSourceConfig(**v)
for k, v in merged_config.get("data_sources", {}).items()
}
)
async def update_config(self, config_update: ConfigUpdateRequest) -> ConfigResponse:
"""Updates configuration in the database and returns the new merged config."""
try:
update_dict = config_update.dict(exclude_unset=True)
# 验证配置数据
self._validate_config_data(update_dict)
for key, value in update_dict.items():
existing_config = await self.db.get(SystemConfig, key)
if existing_config:
# Merge with existing DB value before updating
if isinstance(existing_config.config_value, dict) and isinstance(value, dict):
merged_value = self._merge_configs(existing_config.config_value, value)
existing_config.config_value = merged_value
else:
existing_config.config_value = value
else:
new_config = SystemConfig(config_key=key, config_value=value)
self.db.add(new_config)
await self.db.commit()
return await self.get_config()
except Exception as e:
await self.db.rollback()
raise e
def _validate_config_data(self, config_data: Dict[str, Any]) -> None:
"""Validate configuration data before saving."""
if "database" in config_data:
db_config = config_data["database"]
if "url" in db_config:
url = db_config["url"]
if not url.startswith(("postgresql://", "postgresql+asyncpg://")):
raise ValueError("数据库URL必须以 postgresql:// 或 postgresql+asyncpg:// 开头")
if "new_api" in config_data:
new_api_config = config_data["new_api"]
if "api_key" in new_api_config and len(new_api_config["api_key"]) < 10:
raise ValueError("New API Key长度不能少于10个字符")
if "base_url" in new_api_config and new_api_config["base_url"]:
base_url = new_api_config["base_url"]
if not base_url.startswith(("http://", "https://")):
raise ValueError("New API Base URL必须以 http:// 或 https:// 开头")
if "data_sources" in config_data:
for source_name, source_config in config_data["data_sources"].items():
if "api_key" in source_config and len(source_config["api_key"]) < 10:
raise ValueError(f"{source_name} API Key长度不能少于10个字符")
async def test_config(self, config_type: str, config_data: Dict[str, Any]) -> ConfigTestResponse:
"""Test a specific configuration."""
try:
if config_type == "database":
return await self._test_database(config_data)
elif config_type == "new_api":
return await self._test_new_api(config_data)
elif config_type == "tushare":
return await self._test_tushare(config_data)
elif config_type == "finnhub":
return await self._test_finnhub(config_data)
else:
return ConfigTestResponse(
success=False,
message=f"不支持的配置类型: {config_type}"
)
except Exception as e:
return ConfigTestResponse(
success=False,
message=f"测试失败: {str(e)}"
)
async def _test_database(self, config_data: Dict[str, Any]) -> ConfigTestResponse:
"""Test database connection."""
db_url = config_data.get("url")
if not db_url:
return ConfigTestResponse(
success=False,
message="数据库URL不能为空"
)
try:
# 解析数据库URL
if db_url.startswith("postgresql+asyncpg://"):
db_url = db_url.replace("postgresql+asyncpg://", "postgresql://")
# 测试连接
conn = await asyncpg.connect(db_url)
await conn.close()
return ConfigTestResponse(
success=True,
message="数据库连接成功"
)
except Exception as e:
return ConfigTestResponse(
success=False,
message=f"数据库连接失败: {str(e)}"
)
async def _test_new_api(self, config_data: Dict[str, Any]) -> ConfigTestResponse:
"""Test New API (OpenAI-compatible) connection."""
api_key = config_data.get("api_key")
base_url = config_data.get("base_url")
if not api_key or not base_url:
return ConfigTestResponse(
success=False,
message="New API Key和Base URL均不能为空"
)
try:
async with httpx.AsyncClient(timeout=10.0) as client:
# Test API availability by listing models
response = await client.get(
f"{base_url.rstrip('/')}/models",
headers={"Authorization": f"Bearer {api_key}"}
)
if response.status_code == 200:
return ConfigTestResponse(
success=True,
message="New API连接成功"
)
else:
return ConfigTestResponse(
success=False,
message=f"New API测试失败: HTTP {response.status_code} - {response.text}"
)
except Exception as e:
return ConfigTestResponse(
success=False,
message=f"New API连接失败: {str(e)}"
)
async def _test_tushare(self, config_data: Dict[str, Any]) -> ConfigTestResponse:
"""Test Tushare API connection."""
api_key = config_data.get("api_key")
if not api_key:
return ConfigTestResponse(
success=False,
message="Tushare API Key不能为空"
)
try:
async with httpx.AsyncClient(timeout=10.0) as client:
# 测试API可用性
response = await client.post(
"http://api.tushare.pro",
json={
"api_name": "stock_basic",
"token": api_key,
"params": {"list_status": "L"},
"fields": "ts_code"
}
)
if response.status_code == 200:
data = response.json()
if data.get("code") == 0:
return ConfigTestResponse(
success=True,
message="Tushare API连接成功"
)
else:
return ConfigTestResponse(
success=False,
message=f"Tushare API错误: {data.get('msg', '未知错误')}"
)
else:
return ConfigTestResponse(
success=False,
message=f"Tushare API测试失败: HTTP {response.status_code}"
)
except Exception as e:
return ConfigTestResponse(
success=False,
message=f"Tushare API连接失败: {str(e)}"
)
async def _test_finnhub(self, config_data: Dict[str, Any]) -> ConfigTestResponse:
"""Test Finnhub API connection."""
api_key = config_data.get("api_key")
if not api_key:
return ConfigTestResponse(
success=False,
message="Finnhub API Key不能为空"
)
try:
async with httpx.AsyncClient(timeout=10.0) as client:
# 测试API可用性
response = await client.get(
f"https://finnhub.io/api/v1/quote",
params={"symbol": "AAPL", "token": api_key}
)
if response.status_code == 200:
data = response.json()
if "c" in data: # 检查是否有价格数据
return ConfigTestResponse(
success=True,
message="Finnhub API连接成功"
)
else:
return ConfigTestResponse(
success=False,
message="Finnhub API响应格式错误"
)
else:
return ConfigTestResponse(
success=False,
message=f"Finnhub API测试失败: HTTP {response.status_code}"
)
except Exception as e:
return ConfigTestResponse(
success=False,
message=f"Finnhub API连接失败: {str(e)}"
)