import os import sys import json import uuid import time import requests import websocket import psycopg2 import pandas as pd from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from datetime import datetime from dotenv import load_dotenv from pathlib import Path import logging # Configure logger logger = logging.getLogger(__name__) # Explicitly load .env ROOT_DIR = Path(__file__).resolve().parent.parent.parent.parent load_dotenv(ROOT_DIR / ".env") # --- Configurations --- CURRENCY_CONFIG = { "Revenue": "SALES_REV_TURN", "Net_Income": "EARN_FOR_COMMON", "Cash_From_Operating": "CF_CASH_FROM_OPER", "Capital_Expenditure": "CAPITAL_EXPEND", "Free_Cash_Flow": "CF_FREE_CASH_FLOW", "Dividends_Paid": "CF_DVD_PAID", "Total_Assets": "BS_TOT_ASSET", "Equity": "BS_TOT_EQY", "Goodwill": "BS_GOODWILL", "SG&A": "IS_SG_AND_A_EXPENSE", "Selling&Marketing": "IS_SELLING_EXPENSES", "General&Admin": "IS_GENERAL_AND_ADMIN_GAAP", "R&D": "IS_RD_EXPEND", "Depreciation": "IS_DEPR_EXP", "Cash": "BS_CASH_NEAR_CASH_ITEM", "Inventory": "BS_INVENTORIES", "Accounts&Notes_Receivable": "BS_ACCT_NOTE_RCV", "Prepaid": "BS_PREPAY", "Property_Plant&Equipment": "BS_NET_FIX_ASSET", "LT_Investment": "BS_LT_INVEST", "Accounts_Payable": "BS_ACCT_PAYABLE", "ST_Debt": "BS_ST_BORROW", "LT_Debt": "BS_LT_BORROW", "ST_Defer_Rev":"ST_DEFERRED_REVENUE", "repurchase":"cf_proceeds_repurchase_equity" } NON_CURRENCY_CONFIG = { "ROE": "RETURN_COM_EQY", "ROA": "RETURN_ON_ASSET", "ROCE": "RETURN_ON_CAPITAL_EMPLOYED", "Gross_Margin": "GROSS_MARGIN", "EBITDA_margin": "EBITDA_TO_REVENUE", "Net_Profit_Margin": "PROF_MARGIN", "Tax_Rate": "IS_STATUTORY_TAX_RATE", "Inventory_Days": "INVENT_DAYS", "Days_Sales_Outstanding": "ANNUALIZED_DAYS_SALES_OUTSTDG", "Payables_Days": "ACCOUNTS_PAYABLE_TURNOVER_DAYS", "Employee": "NUM_OF_EMPLOYEES", "PE": "PE_RATIO", "PB": "PX_TO_BOOK_RATIO", "Shareholders": "BS_NUM_OF_SHAREHOLDERS", "Dividend_Payout_Ratio":"DVD_PAYOUT_RATIO", "Total_Debt_Ratio":"TOT_DEBT_TO_TOT_ASSET", "Net_Fixed_Asset_Turnover":"NET_FIX_ASSET_TURN", "Asset_Turnover":"ASSET_TURNOVER", "NetIncome_Growth":"earn_for_com_growth", "Revenue_Growth":"sales_growth", } PRICE_CONFIG = { "Last_Price": "PX_LAST", "Market_Cap":"cur_mkt_cap", "Dividend_Yield": "DIVIDEND_12_MONTH_YIELD" } STOCKCARD_CONFIG = { "period": 15, "unit": 100000000 } class BloombergClient: """ Backend Bloomberg Client. Connects to remote Jupyter Kernel via WebSocket to execute 'blp' code. Persists data to 'stockcard' table (EAV pattern) using direct DB connection. """ def __init__(self, jupyter_url=None, password=None): self.jupyter_url = jupyter_url or os.getenv("JUPYTER_URL", "http://192.168.3.161:8888") self.jupyter_url = self.jupyter_url.rstrip("/") self.password = password or os.getenv("JUPYTER_PASSWORD", "Value609!") self.session = requests.Session() self.kernel_id = None self.ws = None # Authenticate and setup connection try: self._authenticate() self._setup_kernel() self._connect_websocket() except Exception as e: logger.error(f"Failed to initialize Bloomberg Client connection: {e}") # Don't raise here, allow instantiation but methods might fail # or caller handles initialization failure def _authenticate(self): """Authenticate with JupyterHub/Lab using password to get session cookies.""" logger.info(f"Connecting to Jupyter at {self.jupyter_url}...") try: # 1. Get Login Page to fetch _xsrf (if needed) r = self.session.get(f"{self.jupyter_url}/login") if r.status_code != 200: logger.warning(f"Failed to access login page: {r.status_code}") xsrf = self.session.cookies.get("_xsrf", "") # 2. Post Password data = {"password": self.password} if xsrf: data["_xsrf"] = xsrf login_resp = self.session.post(f"{self.jupyter_url}/login", data=data) # 3. Verify Auth api_resp = self.session.get(f"{self.jupyter_url}/api/status") if api_resp.status_code == 200: logger.info("✅ Authentication successful.") else: raise Exception(f"Authentication failed. Status: {api_resp.status_code}") except Exception as e: logger.error(f"❌ Error during authentication: {e}") raise def _setup_kernel(self): """Find an existing kernel or create a new one.""" try: # List kernels resp = self.session.get(f"{self.jupyter_url}/api/kernels") kernels = resp.json() # Try to find 'remote_env' or just pick the first available target_kernel = None for k in kernels: if k.get('name') == 'remote_env': # Ideal match target_kernel = k break if not target_kernel and kernels: target_kernel = kernels[0] # Fallback if target_kernel: self.kernel_id = target_kernel['id'] logger.info(f"✅ Found existing kernel: {self.kernel_id} ({target_kernel.get('name')})") else: logger.info("No active kernels found. Starting a new one...") # Start new kernel (assuming python3) start_resp = self.session.post(f"{self.jupyter_url}/api/kernels", json={"name": "python3"}) if start_resp.status_code == 201: self.kernel_id = start_resp.json()['id'] logger.info(f"✅ Started new kernel: {self.kernel_id}") else: raise Exception(f"Failed to start kernel: {start_resp.text}") except Exception as e: logger.error(f"❌ Error setting up kernel: {e}") raise def _connect_websocket(self): """Connect to the Kernel's WebSocket channel.""" base_ws_url = self.jupyter_url.replace("http", "ws") ws_url = f"{base_ws_url}/api/kernels/{self.kernel_id}/channels" # Extract cookies for WS connection cookies = self.session.cookies.get_dict() cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()]) try: self.ws = websocket.create_connection( ws_url, header={"Cookie": cookie_str}, timeout=60 # Connection timeout ) logger.info("✅ WebSocket connected.") except Exception as e: logger.error(f"❌ WebSocket connection failed: {e}") raise def execute_remote_code(self, code_str): """ Send code to remote kernel via WebSocket and wait for result. Returns list of output strings (stdout). """ if not self.ws: raise Exception("WebSocket not connected") msg_id = str(uuid.uuid4()) msg = { "header": { "msg_id": msg_id, "username": "client", "session": str(uuid.uuid4()), "msg_type": "execute_request", "version": "5.3" }, "parent_header": {}, "metadata": {}, "content": { "code": code_str, "silent": False, "store_history": True, "user_expressions": {}, "allow_stdin": False } } self.ws.send(json.dumps(msg)) outputs = [] error = None # Loop to receive messages until 'idle' while True: try: resp = json.loads(self.ws.recv()) msg_type = resp['msg_type'] # Check for idle status (command finished) if msg_type == 'status': if resp['content']['execution_state'] == 'idle': # Make sure it's the response to OUR request (simple check) if resp['parent_header'].get('msg_id') == msg_id: break elif msg_type == 'stream': # Stdout/Stderr outputs.append(resp['content']['text']) elif msg_type == 'error': error = resp['content']['evalue'] logger.error(f"❌ Remote Execution Error: {error}") for line in resp['content']['traceback']: logger.error(line) except Exception as e: logger.error(f"Error receiving WS message: {e}") break if error: raise Exception(f"Remote Code Execution Failed: {error}") return "".join(outputs) # --- Database Operations --- def _get_db_connection(self): """Create a database connection.""" db_host = os.getenv("DB_HOST", "192.168.3.195") db_user = os.getenv("DB_USER", "value") db_pass = os.getenv("DB_PASSWORD", "Value609!") db_name = os.getenv("DB_NAME", "fa3") # Default to fa3 now? fa legacy used 'FA' db_port = os.getenv("DB_PORT", "5432") try: conn = psycopg2.connect( host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port ) return conn except psycopg2.OperationalError as e: logger.error(f"DB Connection Error: {e}") return None def _ensure_schema(self, conn): """Ensure the necessary tables exist.""" with conn.cursor() as cur: # Create table if not exists (Inferred schema from legacy use) cur.execute(""" CREATE TABLE IF NOT EXISTS stockcard ( id SERIAL PRIMARY KEY, Company_code TEXT, update_date DATE, currency TEXT, indicator TEXT, value TEXT, value_date DATE, source TEXT ); """) # Ensure column exists if table was already created try: cur.execute("ALTER TABLE stockcard ADD COLUMN IF NOT EXISTS source TEXT;") except Exception: conn.rollback() # Should not happen with IF NOT EXISTS but good practice conn.commit() def save_data(self, data_list): """Insert a list of data dictionaries into the database.""" if not data_list: return conn = self._get_db_connection() if not conn: return self._ensure_schema(conn) try: with conn.cursor() as cur: args_list = [] for d in data_list: # Format: (Company_code, update_date, currency, indicator, value, value_date, source) args_list.append(( d['Company_code'], d['update_date'], d['currency'], d['indicator'], d['value'], d['value_date'], 'bloomberg' )) query = """ INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date, source) VALUES (%s, %s, %s, %s, %s, %s, %s) """ cur.executemany(query, args_list) conn.commit() logger.info(f"✅ Saved {len(data_list)} records to database.") except Exception as e: logger.error(f"Error saving to stockcard: {e}") conn.rollback() finally: conn.close() def run_cleanup(self): """Run deduplication and view refresh logic.""" conn = self._get_db_connection() if not conn: return try: with conn.cursor() as cur: # 1. Deduplication cur.execute(''' WITH DuplicateRows AS ( SELECT id, ROW_NUMBER() OVER( PARTITION BY company_code, currency, indicator, value_date ORDER BY update_date DESC ) as rn FROM stockcard ) DELETE FROM stockcard WHERE id IN ( SELECT id FROM DuplicateRows WHERE rn > 1 ); ''') # 2. Materialized View Refresh cur.execute(''' CREATE MATERIALIZED VIEW IF NOT EXISTS public.unique_company_codes AS SELECT s.Company_code, (SELECT cn_sub.value FROM public.stockcard AS cn_sub WHERE cn_sub.Company_code = s.Company_code AND cn_sub.indicator = 'company_name' ORDER BY cn_sub.value ASC LIMIT 1) AS company_name FROM (SELECT DISTINCT Company_code FROM public.stockcard WHERE Company_code IS NOT NULL) s ORDER BY s.Company_code; ''') # Try refresh try: cur.execute("REFRESH MATERIALIZED VIEW public.unique_company_codes;") except: pass conn.commit() logger.info("✅ Cleanup and View Refresh completed.") except Exception as e: logger.error(f"❌ Cleanup failed: {e}") conn.rollback() finally: conn.close() # --- Core Fetching Logic --- def fetch_company(self, market, symbol, progress_callback=None, force_currency=None): """ Main entry point to fetch data for a single company. """ # Determine Bloomberg Ticker format # If symbol already has Equity, use it. Else append. if "Equity" in symbol: company_code = symbol else: # Special case for China market: use 'CH Equity' instead of 'CN Equity' mapped_market = "CH" if market == "CN" else market company_code = f"{symbol} {mapped_market} Equity" today_str = datetime.now().strftime('%Y-%m-%d') logger.info(f"🚀 Starting fetch for: {company_code}") if progress_callback: progress_callback("Starting Bloomberg session...", 0) # 0. Prep Remote Environment init_code = """ import json import pandas as pd from datetime import datetime try: from blp import blp except ImportError: print("Error: 'blp' module not found.") # Ensure bquery is started if 'bquery' not in globals(): try: bquery = blp.BlpQuery().start() print("BlpQuery started.") except Exception as e: print(f"Error starting BlpQuery: {e}") """ try: self.execute_remote_code(init_code) # 1. Fetch Basic Info logger.info("Fetching Basic Data...") if progress_callback: progress_callback("Fetching Company Basic Info...", 10) basic_data = self._fetch_basic_remote(company_code, "USD") self.save_data(basic_data) # Determine currency if force_currency and force_currency != "Auto": currency = force_currency logger.info(f"Using forced currency: {currency}") else: currency = "USD" if "JP" in market.upper(): currency = "JPY" elif "VN" in market.upper(): currency = "VND" elif "CN" in market.upper(): currency = "CNY" elif "HK" in market.upper(): currency = "HKD" logger.info(f"Using auto-detected currency: {currency}") # 2. Fetch Currency Data logger.info("Fetching Currency Data...") if progress_callback: progress_callback(f"正在获取货币指标 ({currency})...", 30) curr_data = self._fetch_series_remote(company_code, currency, CURRENCY_CONFIG, "currency") self.save_data(curr_data) # 3. Fetch Non-Currency Data logger.info("Fetching Non-Currency Data...") if progress_callback: progress_callback("正在获取非货币指标...", 50) non_curr_data = self._fetch_series_remote(company_code, currency, NON_CURRENCY_CONFIG, "non_currency") self.save_data(non_curr_data) # 4. Fetch Price Data (Aligned with Revenue Dates) logger.info("Fetching Price Data (Aligned)...") if progress_callback: progress_callback("正在获取价格指标...", 70) # Extract Revenue dates revenue_dates = [] rev_key = CURRENCY_CONFIG.get("Revenue", "SALES_REV_TURN") # The saved data uses indicator name from config keys (e.g. "Revenue") # So looking for "Revenue" in saved entries for item in curr_data: # Check for "Revenue" (case insensitive match if needed) if item['indicator'].lower() == 'revenue': if item['value_date']: # Ensure YYYY-MM-DD revenue_dates.append(item['value_date']) # Remove specs, duplicates, sort revenue_dates = sorted(list(set(revenue_dates)), reverse=True) if revenue_dates: logger.info(f"Found {len(revenue_dates)} revenue reporting dates. Fetching aligned price data...") price_data = self._fetch_price_by_dates_remote(company_code, currency, revenue_dates) self.save_data(price_data) else: logger.warning("No revenue dates found. Falling back to yearly price fetch.") price_data = self._fetch_series_remote(company_code, currency, PRICE_CONFIG, "price") self.save_data(price_data) # 5. Cleanup if progress_callback: progress_callback("Finalizing data...", 90) self.run_cleanup() logger.info(f"✅ Completed processing for {company_code}") if progress_callback: progress_callback("Bloomberg data sync complete", 100) except Exception as e: logger.error(f"Fetch failed for {company_code}: {e}") if progress_callback: progress_callback(f"Error: {e}", 0) raise e def _fetch_basic_remote(self, company_code, currency): """Generates code to fetch basic data""" code = f""" def get_basic(): company = "{company_code}" curr = "{currency}" res_list = [] # 1. BQL query q = f"for(['{{company}}']) get(name,listing_date,pe_ratio,px_to_book_ratio,cur_mkt_cap(currency={{curr}}),PCT_REVENUE_FROM_FOREIGN_SOURCES)" try: df = bquery.bql(q) if not df.empty: def get_val(df, field_name): if field_name == 'name': rows = df[df['field'] == 'name'] elif 'cur_mkt_cap' in field_name: rows = df[df['field'].str.contains('cur_mkt_cap')] elif 'FOREIGN' in field_name: rows = df[df['field'].str.contains('FOREIGN')] else: rows = df[df['field'] == field_name] if not rows.empty: val = rows['value'].iloc[0] # Handle Timestamp object from remote pandas if hasattr(val, 'strftime'): return val.strftime('%Y-%m-%d') return str(val) if val is not None else None return None res_list.append({{"indicator": "company_name", "value": get_val(df, 'name')}}) res_list.append({{"indicator": "pe_ratio", "value": get_val(df, 'pe_ratio')}}) res_list.append({{"indicator": "pb_ratio", "value": get_val(df, 'pb_ratio')}}) res_list.append({{"indicator": "market_cap", "value": get_val(df, 'cur_mkt_cap')}}) res_list.append({{"indicator": "Rev_Abroad", "value": get_val(df, 'FOREIGN')}}) except Exception as e: print(f"Basic BQL Error: {{e}}") # 2. BDP for IPO and Dividend try: did = bquery.bdp([company], ["DIVIDEND_12_MONTH_YIELD"]) if not did.empty and 'DIVIDEND_12_MONTH_YIELD' in did.columns: res_list.append({{"indicator": "dividend_yield", "value": str(did['DIVIDEND_12_MONTH_YIELD'][0])}}) ipo = bquery.bdp([company], ["EQY_INIT_PO_DT"]) if not ipo.empty and 'EQY_INIT_PO_DT' in ipo.columns: val = ipo['EQY_INIT_PO_DT'][0] val_str = str(val) if hasattr(val, 'strftime'): val_str = val.strftime('%Y-%m-%d') res_list.append({{"indicator": "IPO_date", "value": val_str}}) except Exception as e: print(f"Basic BDP Error: {{e}}") # Format result final_res = [] today = datetime.now().strftime('%Y-%m-%d') for item in res_list: if item['value']: final_res.append({{ "Company_code": company, "update_date": today, "currency": curr, "indicator": item['indicator'], "value": item['value'], "value_date": today }}) print("JSON_START") print(json.dumps(final_res)) print("JSON_END") get_basic() """ return self._execute_and_parse(code) def _fetch_series_remote(self, company_code, currency, config_dict, result_type): """Generates code to fetch series data using BDH""" config_json = json.dumps(config_dict) period_years = STOCKCARD_CONFIG['period'] start_year = datetime.now().year - period_years start_date = f"{start_year}0101" end_date = datetime.now().strftime('%Y%m%d') bdh_options = { 'periodicitySelection': 'YEARLY', 'currency': currency, 'nonTradingDayFillOption': 'ALL_CALENDAR_DAYS', 'nonTradingDayFillMethod': 'PREVIOUS_VALUE' } bdh_opts_json = json.dumps(bdh_options) code = f""" def get_series(): company = "{company_code}" curr = "{currency}" config = {config_json} mnemonic_map = {{v.upper(): k for k, v in config.items()}} fields = list(mnemonic_map.keys()) res_list = [] try: df = bquery.bdh( [company], fields, start_date='{start_date}', end_date='{end_date}', options={bdh_opts_json} ) if not df.empty: for _, row in df.iterrows(): date_val = None if 'date' in df.columns: date_val = row['date'] elif 'DATE' in df.columns: date_val = row['DATE'] if not date_val: continue date_str = str(date_val)[:10] for mnemonic, indicator_name in mnemonic_map.items(): col_name = mnemonic # Handle multi-column result from bdh if any # Usually bdh returns column names as requested fields val = None # Flexible column matching if col_name in row: val = row[col_name] elif col_name in df.columns: val = row[col_name] else: # Try case insensitive for c in df.columns: if c.upper() == col_name: val = row[c] break if pd.isna(val) or val is None: continue val_str = str(val) res_list.append({{ "Company_code": company, "update_date": "{datetime.now().strftime('%Y-%m-%d')}", "currency": curr, "indicator": indicator_name, "value": val_str, "value_date": date_str }}) except Exception as e: print(f"BDH Error: {{e}}") print("JSON_START") print(json.dumps(res_list)) print("JSON_END") get_series() """ return self._execute_and_parse(code) def _fetch_price_by_dates_remote(self, company_code, currency, dates): """Generates code to fetch price/mkt_cap for specific dates""" if not dates: return [] dates_json = json.dumps(dates) config_json = json.dumps(PRICE_CONFIG) bdh_options = { 'currency': currency, 'nonTradingDayFillOption': 'ALL_CALENDAR_DAYS', 'nonTradingDayFillMethod': 'PREVIOUS_VALUE' } bdh_opts_json = json.dumps(bdh_options) code = f""" def get_price_by_dates(): company = "{company_code}" curr = "{currency}" dates = {dates_json} config = {config_json} mnemonic_map = {{v.upper(): k for k, v in config.items()}} fields = list(mnemonic_map.keys()) res_list = [] for d_str in dates: # d_str is 'YYYY-MM-DD', bdh needs 'YYYYMMDD' d_param = d_str.replace('-', '') try: df = bquery.bdh( [company], fields, start_date=d_param, end_date=d_param, options={bdh_opts_json} ) if not df.empty: for _, row in df.iterrows(): # value_date is d_str for mnemonic, indicator_name in mnemonic_map.items(): col_name = mnemonic # bdh columns might be tuple or just string depending on request # usually 'PX_LAST' val = None if col_name in df.columns: val = row[col_name] if pd.isna(val) or val is None: continue val_str = str(val) res_list.append({{ "Company_code": company, "update_date": "{datetime.now().strftime('%Y-%m-%d')}", "currency": curr, "indicator": indicator_name, "value": val_str, "value_date": d_str }}) except Exception as e: print(f"Error fetching price for {{d_str}}: {{e}}") print("JSON_START") print(json.dumps(res_list)) print("JSON_END") get_price_by_dates() """ return self._execute_and_parse(code) def _execute_and_parse(self, code): """Execute code and parse [JSON_START]...[JSON_END]""" raw_output_list = self.execute_remote_code(code) raw_output = "".join(raw_output_list) if isinstance(raw_output_list, list) else str(raw_output_list) # logger.debug(f"Remote execution returned {len(raw_output)} chars.") # Optional debug # Simple parser try: start_idx = raw_output.find("JSON_START") end_idx = raw_output.find("JSON_END") if start_idx != -1 and end_idx != -1: json_str = raw_output[start_idx + len("JSON_START"):end_idx].strip() data = json.loads(json_str) logger.info(f"✅ Parsed {len(data) if isinstance(data, list) else 1} items from remote.") return data else: logger.warning(f"⚠️ No JSON output found in remote response.") return [] except Exception as e: logger.error(f"❌ Error parsing JSON from remote: {e}") return []