import os import sys import json import uuid import time import requests import websocket import psycopg2 import pandas as pd from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT from datetime import datetime, timedelta from dotenv import load_dotenv from pathlib import Path import logging # Configure logger logger = logging.getLogger(__name__) # Explicitly load .env ROOT_DIR = Path(__file__).resolve().parent.parent.parent.parent load_dotenv(ROOT_DIR / ".env") # --- Configurations --- CURRENCY_CONFIG = { "Revenue": "SALES_REV_TURN", "Net_Income": "EARN_FOR_COMMON", "Cash_From_Operating": "CF_CASH_FROM_OPER", "Capital_Expenditure": "CAPITAL_EXPEND", "Free_Cash_Flow": "CF_FREE_CASH_FLOW", "Dividends_Paid": "CF_DVD_PAID", "Total_Assets": "BS_TOT_ASSET", "Equity": "BS_TOT_EQY", "Goodwill": "BS_GOODWILL", "SG&A": "IS_SG_AND_A_EXPENSE", "Selling&Marketing": "IS_SELLING_EXPENSES", "General&Admin": "IS_GENERAL_AND_ADMIN_GAAP", "R&D": "IS_RD_EXPEND", "Depreciation": "IS_DEPR_EXP", "Cash": "BS_CASH_NEAR_CASH_ITEM", "Inventory": "BS_INVENTORIES", "Accounts&Notes_Receivable": "BS_ACCT_NOTE_RCV", "Prepaid": "BS_PREPAY", "Property_Plant&Equipment": "BS_NET_FIX_ASSET", "LT_Investment": "BS_LT_INVEST", "Accounts_Payable": "BS_ACCT_PAYABLE", "ST_Debt": "BS_ST_BORROW", "LT_Debt": "BS_LT_BORROW", "ST_Defer_Rev":"ST_DEFERRED_REVENUE", "repurchase":"cf_proceeds_repurchase_equity" } NON_CURRENCY_CONFIG = { "ROE": "RETURN_COM_EQY", "ROA": "RETURN_ON_ASSET", "ROCE": "RETURN_ON_CAPITAL_EMPLOYED", "Gross_Margin": "GROSS_MARGIN", "EBITDA_margin": "EBITDA_TO_REVENUE", "Net_Profit_Margin": "PROF_MARGIN", "Tax_Rate": "IS_STATUTORY_TAX_RATE", "Inventory_Days": "INVENT_DAYS", "Days_Sales_Outstanding": "ANNUALIZED_DAYS_SALES_OUTSTDG", "Payables_Days": "ACCOUNTS_PAYABLE_TURNOVER_DAYS", "Employee": "NUM_OF_EMPLOYEES", "PE": "PE_RATIO", "PB": "PX_TO_BOOK_RATIO", "Shareholders": "BS_NUM_OF_SHAREHOLDERS", "Dividend_Payout_Ratio":"DVD_PAYOUT_RATIO", "Total_Debt_Ratio":"TOT_DEBT_TO_TOT_ASSET", "Net_Fixed_Asset_Turnover":"NET_FIX_ASSET_TURN", "Asset_Turnover":"ASSET_TURNOVER", "NetIncome_Growth":"earn_for_com_growth", "Revenue_Growth":"sales_growth", } PRICE_CONFIG = { "Last_Price": "PX_LAST", "Market_Cap":"cur_mkt_cap", "Dividend_Yield": "DIVIDEND_12_MONTH_YIELD" } STOCKCARD_CONFIG = { "period": 10, "unit": 100000000 } class BloombergClient: """ Backend Bloomberg Client. Connects to remote Jupyter Kernel via WebSocket to execute 'blp' code. Persists data to 'stockcard' table (EAV pattern) using direct DB connection. """ def __init__(self, jupyter_url=None, password=None): self.jupyter_url = jupyter_url or os.getenv("JUPYTER_URL", "http://192.168.3.161:8888") self.jupyter_url = self.jupyter_url.rstrip("/") self.password = password or os.getenv("JUPYTER_PASSWORD", "Value609!") self.session = requests.Session() self.kernel_id = None self.ws = None # Authenticate and setup connection try: self._authenticate() self._setup_kernel() self._connect_websocket() except Exception as e: logger.error(f"Failed to initialize Bloomberg Client connection: {e}") # Don't raise here, allow instantiation but methods might fail # or caller handles initialization failure def _authenticate(self): """Authenticate with JupyterHub/Lab using password to get session cookies.""" logger.info(f"Connecting to Jupyter at {self.jupyter_url}...") try: # 1. Get Login Page to fetch _xsrf (if needed) r = self.session.get(f"{self.jupyter_url}/login") if r.status_code != 200: logger.warning(f"Failed to access login page: {r.status_code}") xsrf = self.session.cookies.get("_xsrf", "") # 2. Post Password data = {"password": self.password} if xsrf: data["_xsrf"] = xsrf login_resp = self.session.post(f"{self.jupyter_url}/login", data=data) # 3. Verify Auth api_resp = self.session.get(f"{self.jupyter_url}/api/status") if api_resp.status_code == 200: logger.info("✅ Authentication successful.") else: raise Exception(f"Authentication failed. Status: {api_resp.status_code}") except Exception as e: logger.error(f"❌ Error during authentication: {e}") raise def _setup_kernel(self): """Find an existing kernel or create a new one.""" try: # List kernels resp = self.session.get(f"{self.jupyter_url}/api/kernels") kernels = resp.json() # Try to find 'remote_env' or just pick the first available target_kernel = None for k in kernels: if k.get('name') == 'remote_env': # Ideal match target_kernel = k break if not target_kernel and kernels: target_kernel = kernels[0] # Fallback if target_kernel: self.kernel_id = target_kernel['id'] logger.info(f"✅ Found existing kernel: {self.kernel_id} ({target_kernel.get('name')})") else: logger.info("No active kernels found. Starting a new one...") # Start new kernel (assuming python3) start_resp = self.session.post(f"{self.jupyter_url}/api/kernels", json={"name": "python3"}) if start_resp.status_code == 201: self.kernel_id = start_resp.json()['id'] logger.info(f"✅ Started new kernel: {self.kernel_id}") else: raise Exception(f"Failed to start kernel: {start_resp.text}") except Exception as e: logger.error(f"❌ Error setting up kernel: {e}") raise def _connect_websocket(self): """Connect to the Kernel's WebSocket channel.""" base_ws_url = self.jupyter_url.replace("http", "ws") ws_url = f"{base_ws_url}/api/kernels/{self.kernel_id}/channels" # Extract cookies for WS connection cookies = self.session.cookies.get_dict() cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()]) try: self.ws = websocket.create_connection( ws_url, header={"Cookie": cookie_str}, timeout=60 # Connection timeout ) logger.info("✅ WebSocket connected.") except Exception as e: logger.error(f"❌ WebSocket connection failed: {e}") raise def execute_remote_code(self, code_str): """ Send code to remote kernel via WebSocket and wait for result. Returns list of output strings (stdout). """ if not self.ws: raise Exception("WebSocket not connected") msg_id = str(uuid.uuid4()) msg = { "header": { "msg_id": msg_id, "username": "client", "session": str(uuid.uuid4()), "msg_type": "execute_request", "version": "5.3" }, "parent_header": {}, "metadata": {}, "content": { "code": code_str, "silent": False, "store_history": True, "user_expressions": {}, "allow_stdin": False } } self.ws.send(json.dumps(msg)) outputs = [] error = None # Loop to receive messages until 'idle' while True: try: resp = json.loads(self.ws.recv()) msg_type = resp['msg_type'] # Check for idle status (command finished) if msg_type == 'status': if resp['content']['execution_state'] == 'idle': # Make sure it's the response to OUR request (simple check) if resp['parent_header'].get('msg_id') == msg_id: break elif msg_type == 'stream': # Stdout/Stderr outputs.append(resp['content']['text']) elif msg_type == 'error': error = resp['content']['evalue'] logger.error(f"❌ Remote Execution Error: {error}") for line in resp['content']['traceback']: logger.error(line) except Exception as e: logger.error(f"Error receiving WS message: {e}") break if error: raise Exception(f"Remote Code Execution Failed: {error}") return "".join(outputs) # --- Database Operations --- def _get_db_connection(self): """Create a database connection.""" db_host = os.getenv("DB_HOST", "192.168.3.195") db_user = os.getenv("DB_USER", "value") db_pass = os.getenv("DB_PASSWORD", "Value609!") db_name = os.getenv("DB_NAME", "fa3") # Default to fa3 now? fa legacy used 'FA' db_port = os.getenv("DB_PORT", "5432") try: conn = psycopg2.connect( host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port ) return conn except psycopg2.OperationalError as e: logger.error(f"DB Connection Error: {e}") return None def _ensure_schema(self, conn, table_name="stockcard"): """Ensure the necessary tables exist.""" with conn.cursor() as cur: # Create table if not exists (Inferred schema from legacy use) # Use format string for table name as it cannot be parameterized directly in DDL # Validate table_name to prevent SQL injection (simple whitelist or strict formatting) if table_name not in ["stockcard", "stockcard_quarter", "stockcard_semiannual"]: raise ValueError(f"Invalid table name: {table_name}") cur.execute(f""" CREATE TABLE IF NOT EXISTS {table_name} ( id SERIAL PRIMARY KEY, Company_code TEXT, update_date DATE, currency TEXT, indicator TEXT, value TEXT, value_date DATE, source TEXT ); """) # Ensure column exists if table was already created try: cur.execute(f"ALTER TABLE {table_name} ADD COLUMN IF NOT EXISTS source TEXT;") except Exception: conn.rollback() # Should not happen with IF NOT EXISTS but good practice # Migrate update_date to TIMESTAMP try: cur.execute(f"ALTER TABLE {table_name} ALTER COLUMN update_date TYPE TIMESTAMP USING update_date::timestamp;") except Exception as e: # Likely already converted or failed, log but don't crash logger.info(f"Schema migration note (update_date) for {table_name}: {e}") conn.rollback() conn.commit() def save_data(self, data_list, table_name="stockcard"): """Insert a list of data dictionaries into the database.""" if not data_list: return conn = self._get_db_connection() if not conn: return self._ensure_schema(conn, table_name) try: with conn.cursor() as cur: args_list = [] for d in data_list: # Format: (Company_code, update_date, currency, indicator, value, value_date, source) args_list.append(( d['Company_code'], d['update_date'], d['currency'], d['indicator'], d['value'], d['value_date'], 'bloomberg' )) query = f""" INSERT INTO {table_name} (Company_code, update_date, currency, indicator, value, value_date, source) VALUES (%s, %s, %s, %s, %s, %s, %s) """ cur.executemany(query, args_list) conn.commit() logger.info(f"✅ Saved {len(data_list)} records to database table '{table_name}'.") except Exception as e: logger.error(f"Error saving to {table_name}: {e}") conn.rollback() finally: conn.close() def run_cleanup(self, table_name="stockcard"): """Run deduplication and view refresh logic.""" conn = self._get_db_connection() if not conn: return if table_name not in ["stockcard", "stockcard_quarter", "stockcard_semiannual"]: return try: with conn.cursor() as cur: # 1. Deduplication cur.execute(f''' WITH DuplicateRows AS ( SELECT id, ROW_NUMBER() OVER( PARTITION BY company_code, currency, indicator, value_date ORDER BY update_date DESC, id DESC ) as rn FROM {table_name} ) DELETE FROM {table_name} WHERE id IN ( SELECT id FROM DuplicateRows WHERE rn > 1 ); ''') # 2. Materialized View Refresh (Only for main table for now, or unified?) # For now, unique_company_codes likely relies on the main table for list of companies. # We can skip updating the view if it's just meant for company discovery, # assuming stockcard (annual) covers all companies. if table_name == "stockcard": cur.execute(''' CREATE MATERIALIZED VIEW IF NOT EXISTS public.unique_company_codes AS SELECT s.Company_code, (SELECT cn_sub.value FROM public.stockcard AS cn_sub WHERE cn_sub.Company_code = s.Company_code AND cn_sub.indicator = 'company_name' ORDER BY cn_sub.value ASC LIMIT 1) AS company_name FROM (SELECT DISTINCT Company_code FROM public.stockcard WHERE Company_code IS NOT NULL) s ORDER BY s.Company_code; ''') # Try refresh try: cur.execute("REFRESH MATERIALIZED VIEW public.unique_company_codes;") except: pass conn.commit() logger.info(f"✅ Cleanup completed for {table_name}.") except Exception as e: logger.error(f"❌ Cleanup failed for {table_name}: {e}") conn.rollback() finally: conn.close() # --- Core Fetching Logic --- def fetch_company(self, market, symbol, progress_callback=None, force_currency=None, period="YEARLY"): """ Main entry point to fetch data for a single company. period: 'YEARLY' or 'QUARTERLY' """ # Determine Bloomberg Ticker format # If symbol already has Equity, use it. Else append. if "Equity" in symbol: company_code = symbol bucket_code = symbol # Usage for bucket? # If symbol comes in as '631 HK Equity', we might want to standardize? # Assuming symbol is correct input. else: # Special case for China market: use 'CH Equity' instead of 'CN Equity' mapped_market = "CH" if market == "CN" else market # Canonical Code (Store in DB): Keep explicit zeros if provided in symbol company_code = f"{symbol} {mapped_market} Equity" # Query Ticker (Send to Bloomberg): Handle HK special case query_ticker = company_code if market == "HK" or (len(company_code.split()) > 1 and company_code.split()[1] == "HK"): # Extract number part parts = company_code.split() ticker_part = parts[0] if ticker_part.isdigit(): short_ticker = str(int(ticker_part)) # Reconstruct: 631 HK Equity query_ticker = f"{short_ticker} {' '.join(parts[1:])}" today_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # Determine Table Name table_name = "stockcard" if period == "QUARTERLY": table_name = "stockcard_quarter" elif period == "SEMI_ANNUALLY": table_name = "stockcard_semiannual" logger.info(f"🚀 Starting fetch for: {company_code} (Period: {period}, Table: {table_name})") if progress_callback: progress_callback(f"Starting Bloomberg session ({period})...", 0) # 0. Prep Remote Environment init_code = """ import json import pandas as pd from datetime import datetime try: from blp import blp except ImportError: print("Error: 'blp' module not found.") # Ensure bquery is started if 'bquery' not in globals(): try: bquery = blp.BlpQuery().start() print("BlpQuery started.") except Exception as e: print(f"Error starting BlpQuery: {e}") """ try: self.execute_remote_code(init_code) # 1. Fetch Basic Info # Determine currency if force_currency and force_currency != "Auto": currency = force_currency logger.info(f"Using forced currency: {currency}") else: currency = "USD" if "JP" in market.upper(): currency = "JPY" elif "VN" in market.upper(): currency = "VND" elif "CN" in market.upper() or "CH" in market.upper(): currency = "CNY" elif "HK" in market.upper(): currency = "HKD" logger.info(f"Using auto-detected currency: {currency}") # 2. Fetch Basic Info logger.info("Fetching Basic Data...") if progress_callback: progress_callback("Fetching Company Basic Info...", 10) try: # Basic data is always same regardless of period (it's metadata) # We save it to the target table to ensure FK/Consistency if needed, # or just save to main table? # For simplicity and self-containment, save to target table too. basic_data = self._fetch_basic_remote(company_code, currency, query_ticker=query_ticker) logger.info(f"DEBUG: basic_data before save: {basic_data}") self.save_data(basic_data, table_name=table_name) except Exception as e: logger.error(f"Error fetching basic data: {e}") # 3. Fetch Currency Data logger.info("Fetching Currency Data...") if progress_callback: progress_callback(f"正在获取货币指标 ({period})...", 30) curr_data = [] try: curr_data = self._fetch_series_remote(company_code, currency, CURRENCY_CONFIG, "currency", query_ticker=query_ticker, period=period) self.save_data(curr_data, table_name=table_name) except Exception as e: logger.error(f"Error fetching currency series: {e}") # 4. Fetch Non-Currency Data logger.info("Fetching Non-Currency Data...") if progress_callback: progress_callback(f"正在获取非货币指标 ({period})...", 50) try: non_curr_data = self._fetch_series_remote(company_code, currency, NON_CURRENCY_CONFIG, "non_currency", query_ticker=query_ticker, period=period) self.save_data(non_curr_data, table_name=table_name) except Exception as e: logger.error(f"Error fetching non-currency series: {e}") # 5. Fetch Price Data (Aligned with Revenue Dates) logger.info("Fetching Price Data (Aligned)...") if progress_callback: progress_callback("正在获取价格指标...", 70) # Extract Revenue dates revenue_dates = [] if curr_data: for item in curr_data: # Check for "Revenue" if item['indicator'].lower() == "revenue": # Robust case-insensitive check if item['value_date']: revenue_dates.append(item['value_date']) # Remove duplicates, sort revenue_dates = sorted(list(set(revenue_dates)), reverse=True) if revenue_dates: logger.info(f"Found {len(revenue_dates)} revenue reporting dates. Fetching aligned price data...") try: price_data = self._fetch_price_by_dates_remote(company_code, currency, revenue_dates, query_ticker=query_ticker) self.save_data(price_data, table_name=table_name) except Exception as e: logger.error(f"Error fetching aligned price data: {e}") else: logger.warning("No revenue dates found. Falling back to yearly price fetch (Dec 31).") # Generate last 10 years Dec 31 fallback_dates = [] current_year = datetime.now().year for i in range(1, 11): fallback_dates.append(f"{current_year - i}-12-31") try: price_data = self._fetch_price_by_dates_remote(company_code, currency, fallback_dates, query_ticker=query_ticker) self.save_data(price_data, table_name=table_name) except Exception as e: logger.error(f"Error fetching fallback price data: {e}") # 5. Cleanup if progress_callback: progress_callback("Finalizing data...", 90) self.run_cleanup(table_name=table_name) logger.info(f"✅ Completed processing for {company_code}") if progress_callback: progress_callback("Bloomberg data sync complete", 100) except Exception as e: logger.error(f"Fetch failed for {company_code}: {e}") if progress_callback: progress_callback(f"Error: {e}", 0) raise e def _fetch_basic_remote(self, company_code, currency, query_ticker=None): """Generates code to fetch basic data""" target_ticker = query_ticker if query_ticker else company_code code = f""" def get_basic(): company = "{company_code}" query_ticker = "{target_ticker}" curr = "{currency}" res_list = [] # 1. BDP Query for most fields (except Market Cap) fields = ["NAME", "PE_RATIO", "PX_TO_BOOK_RATIO", "PCT_REVENUE_FROM_FOREIGN_SOURCES", "DIVIDEND_12_MONTH_YIELD", "EQY_INIT_PO_DT"] try: # Use overrides for currency. Expects list of tuples. # Try both CURRENCY (for price) and EQY_FUND_CRNCY (for fundamentals) df = bquery.bdp([query_ticker], fields, overrides=[('CURRENCY', curr), ('EQY_FUND_CRNCY', curr)]) if not df.empty: def safe_get(df, col): if col in df.columns: val = df[col].iloc[0] if val is None: return None if hasattr(val, 'strftime'): return val.strftime('%Y-%m-%d') return str(val) return None res_list.append({{"indicator": "company_name", "value": safe_get(df, 'NAME')}}) res_list.append({{"indicator": "pe_ratio", "value": safe_get(df, 'PE_RATIO')}}) res_list.append({{"indicator": "pb_ratio", "value": safe_get(df, 'PX_TO_BOOK_RATIO')}}) # res_list.append({{"indicator": "market_cap", "value": safe_get(df, 'CUR_MKT_CAP')}}) # Moved to BDH res_list.append({{"indicator": "Rev_Abroad", "value": safe_get(df, 'PCT_REVENUE_FROM_FOREIGN_SOURCES')}}) res_list.append({{"indicator": "dividend_yield", "value": safe_get(df, 'DIVIDEND_12_MONTH_YIELD')}}) res_list.append({{"indicator": "IPO_date", "value": safe_get(df, 'EQY_INIT_PO_DT')}}) except Exception as e: print(f"Basic BDP Error: {{e}}") # 2. BDH Query for Market Cap (To enforce Currency) try: import datetime end_date = datetime.datetime.now().strftime('%Y%m%d') start_date = (datetime.datetime.now() - datetime.timedelta(days=10)).strftime('%Y%m%d') # Fetch CUR_MKT_CAP via BDH to respect currency option df_cap = bquery.bdh([query_ticker], ['CUR_MKT_CAP'], start_date=start_date, end_date=end_date, options={{'currency': curr}}) if not df_cap.empty: # BDH usually returns Market Cap in Millions. # User simplified requirement: Keep it in Millions. # Frontend will divide by 100 to get "Yi". val_millions = df_cap.iloc[-1]['CUR_MKT_CAP'] res_list.append({{"indicator": "market_cap", "value": str(val_millions)}}) except Exception as e: print(f"Basic BDH Market Cap Error: {{e}}") # Format result final_res = [] import datetime today_dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') for item in res_list: # Check against None string too if needed if item['value'] and str(item['value']) != 'None': final_res.append({{ "Company_code": company, "update_date": today_dt, "currency": curr, "indicator": item['indicator'], "value": item['value'], "value_date": today_dt }}) print("JSON_START") print(json.dumps(final_res)) print("JSON_END") get_basic() """ return self._execute_and_parse(code) def _fetch_series_remote(self, company_code, currency, config_dict, result_type, query_ticker=None, period="YEARLY"): """Generates code to fetch series data using BDH""" target_ticker = query_ticker if query_ticker else company_code config_json = json.dumps(config_dict) period_years = STOCKCARD_CONFIG['period'] # Calculate start date end_date = datetime.now() if period == "QUARTERLY": # Recent 12 quarters = 3 years = ~1095 days. # Adding small buffer to 1100 days to be safe. start_date = end_date - timedelta(days=1100) elif period == "SEMI_ANNUALLY": # Recent 12 periods = 6 years = ~2200 days. start_date = end_date - timedelta(days=2200) else: start_date = end_date - timedelta(days=period_years*365) # Format dates for BDH (YYYYMMDD) start_date_str = start_date.strftime('%Y%m%d') end_date_str = end_date.strftime('%Y%m%d') # BDH Options bdh_options = { "periodicityAdjustment": "FISCAL", "periodicitySelection": period, # YEARLY or QUARTERLY "currency": currency, # "nonTradingDayFillOption": "NON_TRADING_WEEKDAYS", # "nonTradingDayFillMethod": "PREVIOUS_VALUE" } bdh_opts_json = json.dumps(bdh_options) code = f""" def get_series(): company = "{company_code}" query_ticker = "{target_ticker}" curr = "{currency}" config = {config_json} mnemonic_map = {{v.upper(): k for k, v in config.items()}} fields = list(mnemonic_map.keys()) res_list = [] try: df = bquery.bdh( [query_ticker], fields, start_date='{start_date_str}', end_date='{end_date_str}', options={bdh_opts_json} ) if not df.empty: for _, row in df.iterrows(): date_val = None if 'date' in df.columns: date_val = row['date'] elif 'DATE' in df.columns: date_val = row['DATE'] if not date_val: continue date_str = str(date_val)[:10] for mnemonic, indicator_name in mnemonic_map.items(): col_name = mnemonic # Handle multi-column result from bdh if any # Usually bdh returns column names as requested fields val = None # Flexible column matching if col_name in row: val = row[col_name] elif col_name in df.columns: val = row[col_name] else: # Try case insensitive for c in df.columns: if c.upper() == col_name: val = row[c] break if pd.isna(val) or val is None: continue val_str = str(val) res_list.append({{ "Company_code": company, "update_date": "{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", "currency": curr, "indicator": indicator_name, "value": val_str, "value_date": date_str }}) except Exception as e: print(f"BDH Error: {{e}}") print("JSON_START") print(json.dumps(res_list)) print("JSON_END") get_series() """ return self._execute_and_parse(code) def _fetch_price_by_dates_remote(self, company_code, currency, dates, query_ticker=None): """Generates code to fetch price/mkt_cap for specific dates""" if not dates: return [] target_ticker = query_ticker if query_ticker else company_code dates_json = json.dumps(dates) config_json = json.dumps(PRICE_CONFIG) bdh_options = { 'currency': currency, 'nonTradingDayFillOption': 'ALL_CALENDAR_DAYS', 'nonTradingDayFillMethod': 'PREVIOUS_VALUE' } bdh_opts_json = json.dumps(bdh_options) code = f""" def get_price_by_dates(): company = "{company_code}" query_ticker = "{target_ticker}" curr = "{currency}" dates = {dates_json} config = {config_json} mnemonic_map = {{v.upper(): k for k, v in config.items()}} fields = list(mnemonic_map.keys()) res_list = [] for d_str in dates: # d_str is 'YYYY-MM-DD', bdh needs 'YYYYMMDD' d_param = d_str.replace('-', '') for mnemonic, indicator_name in mnemonic_map.items(): field = mnemonic try: df = bquery.bdh( [query_ticker], [field], start_date=d_param, end_date=d_param, options={bdh_opts_json} ) if not df.empty: for _, row in df.iterrows(): val = None if field in df.columns: val = row[field] elif field in row: val = row[field] else: # Try case insensitive for c in df.columns: if c.upper() == field: val = row[c] break if pd.isna(val) or val is None: continue val_str = str(val) res_list.append({{ "Company_code": company, "update_date": "{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", "currency": curr, "indicator": indicator_name, "value": val_str, "value_date": d_str }}) except Exception as e: # print(f"BDH Error for {{field}}: {{e}}") pass print("JSON_START") print(json.dumps(res_list)) print("JSON_END") get_price_by_dates() """ return self._execute_and_parse(code) def _execute_and_parse(self, code): """Execute code and parse [JSON_START]...[JSON_END]""" raw_output_list = self.execute_remote_code(code) raw_output = "".join(raw_output_list) if isinstance(raw_output_list, list) else str(raw_output_list) # Always log raw output first few chars for debug logger.info(f"REMOTE RAW OUTPUT: {raw_output[:1000]}") # Actually let's log everything if it fails to find JSON # Simple parser try: start_idx = raw_output.find("JSON_START") end_idx = raw_output.find("JSON_END") if start_idx != -1 and end_idx != -1: json_str = raw_output[start_idx + len("JSON_START"):end_idx].strip() data = json.loads(json_str) logger.info(f"✅ Parsed {len(data) if isinstance(data, list) else 1} items from remote.") return data else: logger.warning(f"⚠️ No JSON output found in remote response. Raw: {raw_output}") return [] except Exception as e: logger.error(f"❌ Error parsing JSON from remote: {e}") return []