Fundamental_Analysis/backend/app/data_manager.py
xucheng ca60410966 feat: 通用市场财务/快照/分析接口;增强数据源与配置读取
Backend
- router(financial): 新增通用路径 /{market}/{stock_code}、/snapshot、/analysis/stream
  - 用 MarketEnum 统一市场(cn/us/hk/jp)
  - 将 /china/{ts_code} 改为通用 get_financials,并规范 period,按年限裁剪
  - 新增通用昨日快照接口(CN 复用原逻辑,其他市场兜底近交易日收盘)
- data_manager: 仅从 config/config.json 读取各 provider API key,不再读取环境变量
  - series 构建更健壮:None/空结构判定;接受 numpy/pandas 数值类型并安全转 float
- provider(finnhub):
  - SDK 失败时使用 httpx 直连兜底(profile2、financials-reported)
  - 规范化年度报表,映射 revenue/net income/gross profit/assets/equity/goodwill/OCF/CapEx
  - 计算 gross/net margin、ROA、ROE;直接产出 series 结构
  - 增加关键步骤日志与异常保护
- provider(yfinance): 修正同步阻塞的获取逻辑,使用 run_in_executor 包装

Frontend
- hooks(useApi):
  - 将中国财务接口路径改为 /api/financials/cn
  - 新增 useFinancials 与 useSnapshot,统一多市场数据访问
- report/[symbol]/page.tsx:
  - 支持多市场(映射 usa→us、china→cn 等),统一 symbol 与分析流路径
  - 去除仅限中国市场的 UI 限制,财务/分析/图表对多市场可用
  - 使用新的分析与快照 API 路径
- lib/prisma.ts: 去除无关内容(微小空行调整)

Docs
- 重组文档目录:
  - docs/已完成任务/tasks.md(重命名自 docs/tasks.md)
  - docs/未完成任务/us_market_integration_tasks.md 新增

BREAKING CHANGE
- API 路径变更:
  - 财务数据:/api/financials/china/{ts_code} → /api/financials/{market}/{stock_code}
  - 快照:/api/financials/china/{ts_code}/snapshot → /api/financials/{market}/{stock_code}/snapshot
  - 分析流:/api/financials/china/{ts_code}/analysis/{type}/stream → /api/financials/{market}/{stock_code}/analysis/{type}/stream
- 前端需使用 useFinancials/useSnapshot 或更新为 /cn 路径以兼容中国市场
2025-11-06 20:01:08 +08:00

195 lines
8.4 KiB
Python

import yaml
import os
import json
from typing import Any, Dict, List, Optional
from numbers import Number
from app.data_providers.base import BaseDataProvider
from app.data_providers.tushare import TushareProvider
# from app.data_providers.ifind import TonghsProvider
from app.data_providers.yfinance import YfinanceProvider
from app.data_providers.finnhub import FinnhubProvider
import logging
logger = logging.getLogger(__name__)
class DataManager:
_instance = None
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(DataManager, cls).__new__(cls)
return cls._instance
def __init__(self, config_path: str = None):
if hasattr(self, '_initialized') and self._initialized:
return
if config_path is None:
# Assume the config file is in the 'config' directory at the root of the repo
# Find the project root by looking for the config directory
current_dir = os.path.dirname(__file__)
while current_dir != os.path.dirname(current_dir): # Not at filesystem root
if os.path.exists(os.path.join(current_dir, "config", "data_sources.yaml")):
REPO_ROOT = current_dir
break
current_dir = os.path.dirname(current_dir)
else:
# Fallback to the original calculation
REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
config_path = os.path.join(REPO_ROOT, "config", "data_sources.yaml")
with open(config_path, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
self.providers = {}
# Build provider base config ONLY from config/config.json (do not read env vars)
base_cfg: Dict[str, Any] = {"data_sources": {}}
try:
# Use the same REPO_ROOT calculation as data_sources.yaml
current_dir = os.path.dirname(__file__)
while current_dir != os.path.dirname(current_dir): # Not at filesystem root
if os.path.exists(os.path.join(current_dir, "config", "data_sources.yaml")):
REPO_ROOT = current_dir
break
current_dir = os.path.dirname(current_dir)
else:
# Fallback to the original calculation
REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
cfg_json_path = os.path.join(REPO_ROOT, "config", "config.json")
if os.path.exists(cfg_json_path):
with open(cfg_json_path, "r", encoding="utf-8") as jf:
cfg_json = json.load(jf)
ds_from_json = (cfg_json.get("data_sources") or {})
for name, node in ds_from_json.items():
if node.get("api_key"):
base_cfg["data_sources"][name] = {"api_key": node.get("api_key")}
logger.info(f"Loaded API key for provider '{name}' from config.json")
else:
logger.debug("config/config.json not found; skipping JSON token load.")
except Exception as e:
logger.warning(f"Failed to read tokens from config/config.json: {e}")
import traceback
traceback.print_exc()
try:
self._init_providers(base_cfg)
except Exception as e:
logger.error(f"Failed to initialize data providers: {e}")
self._initialized = True
def _init_providers(self, base_cfg: Dict[str, Any]) -> None:
"""
Initializes providers with the given base configuration.
This method should be called after the base config is loaded.
"""
provider_map = {
"tushare": TushareProvider,
# "ifind": TonghsProvider,
"yfinance": YfinanceProvider,
"finnhub": FinnhubProvider,
}
for name, provider_class in provider_map.items():
token = base_cfg.get("data_sources", {}).get(name, {}).get("api_key")
source_config = self.config['data_sources'].get(name, {})
# Initialize the provider if a token is found or not required
if token or not source_config.get('api_key_env'):
try:
self.providers[name] = provider_class(token=token)
except Exception as e:
logger.error(f"Failed to initialize provider '{name}': {e}")
else:
logger.warning(f"Provider '{name}' requires API key but none provided in config.json. Skipping.")
def _detect_market(self, stock_code: str) -> str:
if stock_code.endswith(('.SH', '.SZ')):
return 'CN'
elif stock_code.endswith('.HK'):
return 'HK'
elif stock_code.endswith('.T'): # Assuming .T for Tokyo
return 'JP'
else: # Default to US
return 'US'
async def get_data(self, method_name: str, stock_code: str, **kwargs):
market = self._detect_market(stock_code)
priority_list = self.config.get('markets', {}).get(market, {}).get('priority', [])
for provider_name in priority_list:
provider = self.providers.get(provider_name)
if not provider:
logger.warning(f"Provider '{provider_name}' not initialized.")
continue
try:
method = getattr(provider, method_name)
data = await method(stock_code=stock_code, **kwargs)
is_success = False
if data is None:
is_success = False
elif isinstance(data, list):
is_success = len(data) > 0
elif isinstance(data, dict):
is_success = len(data) > 0
else:
is_success = True
if is_success:
logger.info(f"Data successfully fetched from '{provider_name}' for '{stock_code}'.")
return data
except Exception as e:
logger.warning(f"Provider '{provider_name}' failed for '{stock_code}': {e}. Trying next provider.")
logger.error(f"All data providers failed for '{stock_code}' on method '{method_name}'.")
return None
async def get_financial_statements(self, stock_code: str, report_dates: List[str]) -> Dict[str, List[Dict[str, Any]]]:
data = await self.get_data('get_financial_statements', stock_code, report_dates=report_dates)
if data is None:
return {}
# Normalize to series format
if isinstance(data, dict):
# Already in series format (e.g., tushare)
return data
elif isinstance(data, list):
# Convert from flat format to series format
series: Dict[str, List[Dict[str, Any]]] = {}
for report in data:
year = str(report.get('year', report.get('end_date', '')[:4]))
if not year:
continue
for key, value in report.items():
if key in ['ts_code', 'stock_code', 'year', 'end_date', 'period', 'ann_date', 'f_ann_date', 'report_type']:
continue
# Accept numpy/pandas numeric types as well as builtin numbers
if value is not None and isinstance(value, Number):
if key not in series:
series[key] = []
if not any(d['year'] == year for d in series[key]):
# Store as builtin float to avoid JSON serialization issues
try:
numeric_value = float(value)
except Exception:
# Fallback: skip if cannot coerce to float
continue
series[key].append({"year": year, "value": numeric_value})
return series
else:
return {}
async def get_daily_price(self, stock_code: str, start_date: str, end_date: str) -> List[Dict[str, Any]]:
return await self.get_data('get_daily_price', stock_code, start_date=start_date, end_date=end_date)
async def get_stock_basic(self, stock_code: str) -> Optional[Dict[str, Any]]:
return await self.get_data('get_stock_basic', stock_code)
data_manager = DataManager()