from .base import BaseDataProvider from typing import Any, Dict, List, Optional import finnhub import pandas as pd from datetime import datetime, timedelta import asyncio import logging logger = logging.getLogger(__name__) class FinnhubProvider(BaseDataProvider): def _initialize(self): if not self.token: raise ValueError("Finnhub API key not provided.") self.client = finnhub.Client(api_key=self.token) async def get_stock_basic(self, stock_code: str) -> Optional[Dict[str, Any]]: async def _fetch(): try: profile = self.client.company_profile2(symbol=stock_code) if not profile: return None # Normalize data return { "ts_code": stock_code, "name": profile.get("name"), "area": profile.get("country"), "industry": profile.get("finnhubIndustry"), "exchange": profile.get("exchange"), "ipo_date": profile.get("ipo"), } except Exception as e: logger.error(f"Finnhub get_stock_basic failed for {stock_code}: {e}") return None loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _fetch) async def get_daily_price(self, stock_code: str, start_date: str, end_date: str) -> List[Dict[str, Any]]: async def _fetch(): try: start_ts = int(datetime.strptime(start_date, '%Y%m%d').timestamp()) end_ts = int(datetime.strptime(end_date, '%Y%m%d').timestamp()) res = self.client.stock_candles(stock_code, 'D', start_ts, end_ts) if res.get('s') != 'ok': return [] df = pd.DataFrame(res) if df.empty: return [] # Normalize data df['trade_date'] = pd.to_datetime(df['t'], unit='s').dt.strftime('%Y%m%d') df.rename(columns={ 'o': 'open', 'h': 'high', 'l': 'low', 'c': 'close', 'v': 'vol' }, inplace=True) return df[['trade_date', 'open', 'high', 'low', 'close', 'vol']].to_dict('records') except Exception as e: logger.error(f"Finnhub get_daily_price failed for {stock_code}: {e}") return [] loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _fetch) async def get_financial_statements(self, stock_code: str, report_dates: List[str]) -> List[Dict[str, Any]]: async def _fetch(): try: # Finnhub provides financials as a whole, not by specific date ranges in one call # We fetch all available and then filter. # Note: 'freq' can be 'annual' or 'quarterly'. We'll use annual. res = self.client.financials_reported(symbol=stock_code, freq='annual') if not res or not res.get('data'): return [] df = pd.DataFrame(res['data']) # Filter by requested dates years_to_fetch = {date[:4] for date in report_dates} df = df[df['year'].astype(str).isin(years_to_fetch)] # The data is deeply nested in 'report'. We need to extract and pivot it. all_reports = [] for index, row in df.iterrows(): report_data = {'ts_code': stock_code, 'end_date': row['endDate']} # Extract concepts from balance sheet, income statement, and cash flow for item in row['report'].get('bs', []): report_data[item['concept']] = item['value'] for item in row['report'].get('ic', []): report_data[item['concept']] = item['value'] for item in row['report'].get('cf', []): report_data[item['concept']] = item['value'] all_reports.append(report_data) # Further normalization of keys would be needed here to match a common format # e.g. 'AssetsTotal' -> 'total_assets' # This is a complex task and depends on the desired final schema. # We will now normalize and calculate derived metrics normalized_reports = [] for report in all_reports: normalized_report = { "ts_code": report.get("ts_code"), "end_date": report.get("end_date"), # Balance Sheet "total_assets": report.get("AssetsTotal"), "total_liabilities": report.get("LiabilitiesTotal"), "equity": report.get("StockholdersEquityTotal"), # Income Statement "revenue": report.get("RevenuesTotal"), "net_income": report.get("NetIncomeLoss"), "gross_profit": report.get("GrossProfit"), # Cash Flow "net_cash_flow_operating": report.get("NetCashFlowOperating"), } # Calculate derived metrics if normalized_report["revenue"] and normalized_report["revenue"] > 0: normalized_report["gross_margin"] = (normalized_report["gross_profit"] / normalized_report["revenue"]) if normalized_report["gross_profit"] else None normalized_report["net_margin"] = (normalized_report["net_income"] / normalized_report["revenue"]) if normalized_report["net_income"] else None if normalized_report["total_assets"] and normalized_report["total_assets"] > 0: normalized_report["roa"] = (normalized_report["net_income"] / normalized_report["total_assets"]) if normalized_report["net_income"] else None if normalized_report["equity"] and normalized_report["equity"] > 0: normalized_report["roe"] = (normalized_report["net_income"] / normalized_report["equity"]) if normalized_report["net_income"] else None normalized_reports.append(normalized_report) return normalized_reports except Exception as e: logger.error(f"Finnhub get_financial_statements failed for {stock_code}: {e}") return [] loop = asyncio.get_event_loop() return await loop.run_in_executor(None, _fetch)