Fundamental_Analysis/backend/app/services/config_manager.py
2025-10-28 23:31:28 +08:00

88 lines
3.7 KiB
Python

"""
Configuration Management Service
"""
import json
import os
from typing import Any, Dict
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.models.system_config import SystemConfig
from app.schemas.config import ConfigResponse, ConfigUpdateRequest, DatabaseConfig, GeminiConfig, DataSourceConfig
class ConfigManager:
"""Manages system configuration by merging a static JSON file with dynamic settings from the database."""
def __init__(self, db_session: AsyncSession, config_path: str = None):
self.db = db_session
if config_path is None:
# Default path: backend/ -> project_root/ -> config/config.json
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
self.config_path = os.path.join(project_root, "config", "config.json")
else:
self.config_path = config_path
def _load_base_config_from_file(self) -> Dict[str, Any]:
"""Loads the base configuration from the JSON file."""
if not os.path.exists(self.config_path):
return {}
try:
with open(self.config_path, "r", encoding="utf-8") as f:
return json.load(f)
except (IOError, json.JSONDecodeError):
return {}
async def _load_dynamic_config_from_db(self) -> Dict[str, Any]:
"""Loads dynamic configuration overrides from the database."""
db_configs = {}
result = await self.db.execute(select(SystemConfig))
for record in result.scalars().all():
db_configs[record.config_key] = record.config_value
return db_configs
def _merge_configs(self, base: Dict[str, Any], overrides: Dict[str, Any]) -> Dict[str, Any]:
"""Deeply merges the override config into the base config."""
for key, value in overrides.items():
if isinstance(value, dict) and isinstance(base.get(key), dict):
base[key] = self._merge_configs(base[key], value)
else:
base[key] = value
return base
async def get_config(self) -> ConfigResponse:
"""Gets the final, merged configuration."""
base_config = self._load_base_config_from_file()
db_config = await self._load_dynamic_config_from_db()
merged_config = self._merge_configs(base_config, db_config)
return ConfigResponse(
database=DatabaseConfig(**merged_config.get("database", {})),
gemini_api=GeminiConfig(**merged_config.get("llm", {}).get("gemini", {})),
data_sources={
k: DataSourceConfig(**v)
for k, v in merged_config.get("data_sources", {}).items()
}
)
async def update_config(self, config_update: ConfigUpdateRequest) -> ConfigResponse:
"""Updates configuration in the database and returns the new merged config."""
update_dict = config_update.dict(exclude_unset=True)
for key, value in update_dict.items():
existing_config = await self.db.get(SystemConfig, key)
if existing_config:
# Merge with existing DB value before updating
if isinstance(existing_config.config_value, dict) and isinstance(value, dict):
merged_value = self._merge_configs(existing_config.config_value, value)
existing_config.config_value = merged_value
else:
existing_config.config_value = value
else:
new_config = SystemConfig(config_key=key, config_value=value)
self.db.add(new_config)
await self.db.commit()
return await self.get_config()