FA3-Datafetch/backend/app/fetchers/bloomberg_fetcher.py
2026-01-12 09:33:52 +08:00

146 lines
5.3 KiB
Python

import pandas as pd
import os
import psycopg2
from .base import DataFetcher
from app.clients.bloomberg_client import BloombergClient
class BloombergFetcher(DataFetcher):
"""
Bloomberg Data Fetcher.
Fetches data via BloombergClient (Remote Jupyter) into 'stockcard' table.
Reads 'stockcard' table to provide standard DataFrames.
"""
def __init__(self, api_key: str = None, market: str = "US"):
super().__init__(api_key) # api_key unused for Bloomberg usually (password based)
self.market = market
self.client = BloombergClient() # Uses env vars
self._fetched_symbols = set() # Track symbols we've already fetched
def _get_db_connection(self):
"""Create a database connection."""
db_host = os.getenv("DB_HOST", "192.168.3.195")
db_user = os.getenv("DB_USER", "value")
db_pass = os.getenv("DB_PASSWORD", "Value609!")
db_name = os.getenv("DB_NAME", "fa3")
db_port = os.getenv("DB_PORT", "5432")
try:
conn = psycopg2.connect(
host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port
)
return conn
except Exception as e:
print(f"DB Connection Error: {e}")
return None
def _get_company_code(self, symbol: str) -> str:
# Match logic in Client
if "Equity" in symbol:
return symbol
return f"{symbol} {self.market} Equity"
def _get_data_from_db(self, symbol: str, indicators: list) -> pd.DataFrame:
"""
Query 'stockcard' for specific indicators and return pivoted DataFrame.
"""
company_code = self._get_company_code(symbol)
conn = self._get_db_connection()
if not conn: return pd.DataFrame()
try:
# placeholders for 'IN' clause
placeholders = ','.join(['%s'] * len(indicators))
query = f"""
SELECT indicator, value, value_date, currency
FROM stockcard
WHERE Company_code = %s AND indicator IN ({placeholders})
ORDER BY value_date DESC
"""
with conn.cursor() as cur:
cur.execute(query, (company_code, *indicators))
rows = cur.fetchall()
if not rows:
return pd.DataFrame()
df = pd.DataFrame(rows, columns=['indicator', 'value', 'value_date', 'currency'])
# Pivot
# Index: [value_date, currency] -> ensures we keep both JPY and USD rows for the same date
# Columns: indicator
# Values: value
df_pivot = df.pivot(index=['value_date', 'currency'], columns='indicator', values='value').reset_index()
df_pivot.rename(columns={'value_date': 'end_date'}, inplace=True)
# No need to manual merge currency back, it's already in the index
# Clean columns? No, they are standard from config.
return df_pivot
except Exception as e:
print(f"Error querying stockcard: {e}")
return pd.DataFrame()
finally:
conn.close()
def _ensure_data_fetched(self, symbol: str, progress_callback=None):
"""
Trigger fetch if needed. Only fetch once per symbol per instance.
"""
if symbol in self._fetched_symbols:
return # Already fetched for this symbol
try:
# Check if fetch_company supports progress_callback
import inspect
sig = inspect.signature(self.client.fetch_company)
if 'progress_callback' in sig.parameters:
self.client.fetch_company(self.market, symbol, progress_callback=progress_callback)
else:
self.client.fetch_company(self.market, symbol)
self._fetched_symbols.add(symbol)
except Exception as e:
print(f"Bloomberg fetch failed (ignoring, checking DB): {e}")
def sync_all_data(self, symbol: str, progress_callback=None, force_currency=None):
"""
Sync all data for the company.
Delegates to the universal client.
"""
self.client.fetch_company(self.market, symbol, progress_callback=progress_callback, force_currency=force_currency)
def get_income_statement(self, symbol: str) -> pd.DataFrame:
"""兼容性空方法"""
return pd.DataFrame()
def get_balance_sheet(self, symbol: str) -> pd.DataFrame:
"""兼容性空方法"""
return pd.DataFrame()
def get_cash_flow(self, symbol: str) -> pd.DataFrame:
"""兼容性空方法"""
return pd.DataFrame()
def get_market_metrics(self, symbol: str) -> pd.DataFrame:
"""兼容性空方法"""
return pd.DataFrame()
def get_dividends(self, symbol: str) -> pd.DataFrame:
"""兼容性空方法"""
return pd.DataFrame()
def get_repurchases(self, symbol: str) -> pd.DataFrame:
"""兼容性空方法"""
return pd.DataFrame()
def get_employee_count(self, symbol: str) -> pd.DataFrame:
"""兼容性空方法"""
return pd.DataFrame()
def get_financial_ratios(self, symbol: str) -> pd.DataFrame:
"""兼容性空方法"""
return pd.DataFrame()