import pandas as pd import os import psycopg2 from .base import DataFetcher from app.clients.bloomberg_client import BloombergClient class BloombergFetcher(DataFetcher): """ Bloomberg Data Fetcher. Fetches data via BloombergClient (Remote Jupyter) into 'stockcard' table. Reads 'stockcard' table to provide standard DataFrames. """ def __init__(self, api_key: str = None, market: str = "US"): super().__init__(api_key) # api_key unused for Bloomberg usually (password based) self.market = market self.client = BloombergClient() # Uses env vars self._fetched_symbols = set() # Track symbols we've already fetched def _get_db_connection(self): """Create a database connection.""" db_host = os.getenv("DB_HOST", "192.168.3.195") db_user = os.getenv("DB_USER", "value") db_pass = os.getenv("DB_PASSWORD", "Value609!") db_name = os.getenv("DB_NAME", "fa3") db_port = os.getenv("DB_PORT", "5432") try: conn = psycopg2.connect( host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port ) return conn except Exception as e: print(f"DB Connection Error: {e}") return None def _get_company_code(self, symbol: str) -> str: # Match logic in Client if "Equity" in symbol: return symbol return f"{symbol} {self.market} Equity" def _get_data_from_db(self, symbol: str, indicators: list) -> pd.DataFrame: """ Query 'stockcard' for specific indicators and return pivoted DataFrame. """ company_code = self._get_company_code(symbol) conn = self._get_db_connection() if not conn: return pd.DataFrame() try: # placeholders for 'IN' clause placeholders = ','.join(['%s'] * len(indicators)) query = f""" SELECT indicator, value, value_date, currency FROM stockcard WHERE Company_code = %s AND indicator IN ({placeholders}) ORDER BY value_date DESC """ with conn.cursor() as cur: cur.execute(query, (company_code, *indicators)) rows = cur.fetchall() if not rows: return pd.DataFrame() df = pd.DataFrame(rows, columns=['indicator', 'value', 'value_date', 'currency']) # Pivot # Index: [value_date, currency] -> ensures we keep both JPY and USD rows for the same date # Columns: indicator # Values: value df_pivot = df.pivot(index=['value_date', 'currency'], columns='indicator', values='value').reset_index() df_pivot.rename(columns={'value_date': 'end_date'}, inplace=True) # No need to manual merge currency back, it's already in the index # Clean columns? No, they are standard from config. return df_pivot except Exception as e: print(f"Error querying stockcard: {e}") return pd.DataFrame() finally: conn.close() def _ensure_data_fetched(self, symbol: str, progress_callback=None): """ Trigger fetch if needed. Only fetch once per symbol per instance. """ if symbol in self._fetched_symbols: return # Already fetched for this symbol try: # Check if fetch_company supports progress_callback import inspect sig = inspect.signature(self.client.fetch_company) if 'progress_callback' in sig.parameters: self.client.fetch_company(self.market, symbol, progress_callback=progress_callback) else: self.client.fetch_company(self.market, symbol) self._fetched_symbols.add(symbol) except Exception as e: print(f"Bloomberg fetch failed (ignoring, checking DB): {e}") def sync_all_data(self, symbol: str, progress_callback=None, force_currency=None): """ Sync all data for the company. Delegates to the universal client. """ self.client.fetch_company(self.market, symbol, progress_callback=progress_callback, force_currency=force_currency) def get_income_statement(self, symbol: str) -> pd.DataFrame: """兼容性空方法""" return pd.DataFrame() def get_balance_sheet(self, symbol: str) -> pd.DataFrame: """兼容性空方法""" return pd.DataFrame() def get_cash_flow(self, symbol: str) -> pd.DataFrame: """兼容性空方法""" return pd.DataFrame() def get_market_metrics(self, symbol: str) -> pd.DataFrame: """兼容性空方法""" return pd.DataFrame() def get_dividends(self, symbol: str) -> pd.DataFrame: """兼容性空方法""" return pd.DataFrame() def get_repurchases(self, symbol: str) -> pd.DataFrame: """兼容性空方法""" return pd.DataFrame() def get_employee_count(self, symbol: str) -> pd.DataFrame: """兼容性空方法""" return pd.DataFrame() def get_financial_ratios(self, symbol: str) -> pd.DataFrame: """兼容性空方法""" return pd.DataFrame()