159 lines
6.0 KiB
Python
159 lines
6.0 KiB
Python
|
|
import pandas as pd
|
|
import os
|
|
import psycopg2
|
|
from .base import DataFetcher
|
|
from app.clients.bloomberg_client import BloombergClient
|
|
|
|
class BloombergFetcher(DataFetcher):
|
|
"""
|
|
Bloomberg Data Fetcher.
|
|
Fetches data via BloombergClient (Remote Jupyter) into 'stockcard' table.
|
|
Reads 'stockcard' table to provide standard DataFrames.
|
|
"""
|
|
def __init__(self, api_key: str = None, market: str = "US"):
|
|
super().__init__(api_key) # api_key unused for Bloomberg usually (password based)
|
|
self.market = market
|
|
self.client = BloombergClient() # Uses env vars
|
|
self._fetched_symbols = set() # Track symbols we've already fetched
|
|
|
|
def _get_db_connection(self):
|
|
"""Create a database connection."""
|
|
db_host = os.getenv("DB_HOST", "192.168.3.195")
|
|
db_user = os.getenv("DB_USER", "value")
|
|
db_pass = os.getenv("DB_PASSWORD", "Value609!")
|
|
db_name = os.getenv("DB_NAME", "fa3")
|
|
db_port = os.getenv("DB_PORT", "5432")
|
|
|
|
try:
|
|
conn = psycopg2.connect(
|
|
host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port
|
|
)
|
|
return conn
|
|
except Exception as e:
|
|
print(f"DB Connection Error: {e}")
|
|
return None
|
|
|
|
def _get_company_code(self, symbol: str) -> str:
|
|
# Match logic in Client
|
|
if "Equity" in symbol:
|
|
return symbol
|
|
return f"{symbol} {self.market} Equity"
|
|
|
|
def _get_data_from_db(self, symbol: str, indicators: list) -> pd.DataFrame:
|
|
"""
|
|
Query 'stockcard' for specific indicators and return pivoted DataFrame.
|
|
"""
|
|
company_code = self._get_company_code(symbol)
|
|
conn = self._get_db_connection()
|
|
if not conn: return pd.DataFrame()
|
|
|
|
try:
|
|
# placeholders for 'IN' clause
|
|
placeholders = ','.join(['%s'] * len(indicators))
|
|
query = f"""
|
|
SELECT indicator, value, value_date, currency
|
|
FROM stockcard
|
|
WHERE Company_code = %s AND indicator IN ({placeholders})
|
|
ORDER BY value_date DESC
|
|
"""
|
|
|
|
with conn.cursor() as cur:
|
|
cur.execute(query, (company_code, *indicators))
|
|
rows = cur.fetchall()
|
|
|
|
if not rows:
|
|
return pd.DataFrame()
|
|
|
|
df = pd.DataFrame(rows, columns=['indicator', 'value', 'value_date', 'currency'])
|
|
|
|
# Pivot
|
|
# Index: [value_date, currency] -> ensures we keep both JPY and USD rows for the same date
|
|
# Columns: indicator
|
|
# Values: value
|
|
df_pivot = df.pivot(index=['value_date', 'currency'], columns='indicator', values='value').reset_index()
|
|
df_pivot.rename(columns={'value_date': 'end_date'}, inplace=True)
|
|
|
|
# No need to manual merge currency back, it's already in the index
|
|
|
|
# Clean columns? No, they are standard from config.
|
|
return df_pivot
|
|
|
|
except Exception as e:
|
|
print(f"Error querying stockcard: {e}")
|
|
return pd.DataFrame()
|
|
finally:
|
|
conn.close()
|
|
|
|
def _ensure_data_fetched(self, symbol: str, progress_callback=None):
|
|
"""
|
|
Trigger fetch if needed. Only fetch once per symbol per instance.
|
|
"""
|
|
if symbol in self._fetched_symbols:
|
|
return # Already fetched for this symbol
|
|
|
|
try:
|
|
# Check if fetch_company supports progress_callback
|
|
import inspect
|
|
sig = inspect.signature(self.client.fetch_company)
|
|
if 'progress_callback' in sig.parameters:
|
|
self.client.fetch_company(self.market, symbol, progress_callback=progress_callback)
|
|
else:
|
|
self.client.fetch_company(self.market, symbol)
|
|
|
|
self._fetched_symbols.add(symbol)
|
|
except Exception as e:
|
|
print(f"Bloomberg fetch failed (ignoring, checking DB): {e}")
|
|
|
|
def sync_all_data(self, symbol: str, progress_callback=None, force_currency=None, frequency="Annual"):
|
|
"""
|
|
Sync all data for the company.
|
|
Delegates to the universal client.
|
|
frequency: 'Annual' (default) or 'Quarter'.
|
|
"""
|
|
period = "YEARLY"
|
|
if frequency == "Quarterly" or frequency == "Quarter":
|
|
period = "QUARTERLY"
|
|
elif frequency == "Semiannual" or frequency == "Semiannually":
|
|
period = "SEMI_ANNUALLY"
|
|
|
|
# Pass period to client
|
|
if 'period' in self.client.fetch_company.__code__.co_varnames:
|
|
self.client.fetch_company(self.market, symbol, progress_callback=progress_callback, force_currency=force_currency, period=period)
|
|
else:
|
|
# Fallback for old client code (should not happen if client updated first)
|
|
self.client.fetch_company(self.market, symbol, progress_callback=progress_callback, force_currency=force_currency)
|
|
|
|
def get_income_statement(self, symbol: str) -> pd.DataFrame:
|
|
"""兼容性空方法"""
|
|
return pd.DataFrame()
|
|
|
|
def get_balance_sheet(self, symbol: str) -> pd.DataFrame:
|
|
"""兼容性空方法"""
|
|
return pd.DataFrame()
|
|
|
|
def get_cash_flow(self, symbol: str) -> pd.DataFrame:
|
|
"""兼容性空方法"""
|
|
return pd.DataFrame()
|
|
|
|
def get_market_metrics(self, symbol: str) -> pd.DataFrame:
|
|
"""兼容性空方法"""
|
|
return pd.DataFrame()
|
|
|
|
def get_dividends(self, symbol: str) -> pd.DataFrame:
|
|
"""兼容性空方法"""
|
|
return pd.DataFrame()
|
|
|
|
def get_repurchases(self, symbol: str) -> pd.DataFrame:
|
|
"""兼容性空方法"""
|
|
return pd.DataFrame()
|
|
|
|
def get_employee_count(self, symbol: str) -> pd.DataFrame:
|
|
"""兼容性空方法"""
|
|
return pd.DataFrame()
|
|
|
|
def get_financial_ratios(self, symbol: str) -> pd.DataFrame:
|
|
"""兼容性空方法"""
|
|
return pd.DataFrame()
|
|
|