FA3-Datafetch/backend/app/clients/bloomberg_client.py
2026-01-12 19:20:18 +08:00

849 lines
32 KiB
Python

import os
import sys
import json
import uuid
import time
import requests
import websocket
import psycopg2
import pandas as pd
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from datetime import datetime, timedelta
from dotenv import load_dotenv
from pathlib import Path
import logging
# Configure logger
logger = logging.getLogger(__name__)
# Explicitly load .env
ROOT_DIR = Path(__file__).resolve().parent.parent.parent.parent
load_dotenv(ROOT_DIR / ".env")
# --- Configurations ---
CURRENCY_CONFIG = {
"Revenue": "SALES_REV_TURN",
"Net_Income": "EARN_FOR_COMMON",
"Cash_From_Operating": "CF_CASH_FROM_OPER",
"Capital_Expenditure": "CAPITAL_EXPEND",
"Free_Cash_Flow": "CF_FREE_CASH_FLOW",
"Dividends_Paid": "CF_DVD_PAID",
"Total_Assets": "BS_TOT_ASSET",
"Equity": "BS_TOT_EQY",
"Goodwill": "BS_GOODWILL",
"SG&A": "IS_SG_AND_A_EXPENSE",
"Selling&Marketing": "IS_SELLING_EXPENSES",
"General&Admin": "IS_GENERAL_AND_ADMIN_GAAP",
"R&D": "IS_RD_EXPEND",
"Depreciation": "IS_DEPR_EXP",
"Cash": "BS_CASH_NEAR_CASH_ITEM",
"Inventory": "BS_INVENTORIES",
"Accounts&Notes_Receivable": "BS_ACCT_NOTE_RCV",
"Prepaid": "BS_PREPAY",
"Property_Plant&Equipment": "BS_NET_FIX_ASSET",
"LT_Investment": "BS_LT_INVEST",
"Accounts_Payable": "BS_ACCT_PAYABLE",
"ST_Debt": "BS_ST_BORROW",
"LT_Debt": "BS_LT_BORROW",
"ST_Defer_Rev":"ST_DEFERRED_REVENUE",
"repurchase":"cf_proceeds_repurchase_equity"
}
NON_CURRENCY_CONFIG = {
"ROE": "RETURN_COM_EQY",
"ROA": "RETURN_ON_ASSET",
"ROCE": "RETURN_ON_CAPITAL_EMPLOYED",
"Gross_Margin": "GROSS_MARGIN",
"EBITDA_margin": "EBITDA_TO_REVENUE",
"Net_Profit_Margin": "PROF_MARGIN",
"Tax_Rate": "IS_STATUTORY_TAX_RATE",
"Inventory_Days": "INVENT_DAYS",
"Days_Sales_Outstanding": "ANNUALIZED_DAYS_SALES_OUTSTDG",
"Payables_Days": "ACCOUNTS_PAYABLE_TURNOVER_DAYS",
"Employee": "NUM_OF_EMPLOYEES",
"PE": "PE_RATIO",
"PB": "PX_TO_BOOK_RATIO",
"Shareholders": "BS_NUM_OF_SHAREHOLDERS",
"Dividend_Payout_Ratio":"DVD_PAYOUT_RATIO",
"Total_Debt_Ratio":"TOT_DEBT_TO_TOT_ASSET",
"Net_Fixed_Asset_Turnover":"NET_FIX_ASSET_TURN",
"Asset_Turnover":"ASSET_TURNOVER",
"NetIncome_Growth":"earn_for_com_growth",
"Revenue_Growth":"sales_growth",
}
PRICE_CONFIG = {
"Last_Price": "PX_LAST",
"Market_Cap":"cur_mkt_cap",
"Dividend_Yield": "DIVIDEND_12_MONTH_YIELD"
}
STOCKCARD_CONFIG = {
"period": 10,
"unit": 100000000
}
class BloombergClient:
"""
Backend Bloomberg Client.
Connects to remote Jupyter Kernel via WebSocket to execute 'blp' code.
Persists data to 'stockcard' table (EAV pattern) using direct DB connection.
"""
def __init__(self, jupyter_url=None, password=None):
self.jupyter_url = jupyter_url or os.getenv("JUPYTER_URL", "http://192.168.3.161:8888")
self.jupyter_url = self.jupyter_url.rstrip("/")
self.password = password or os.getenv("JUPYTER_PASSWORD", "Value609!")
self.session = requests.Session()
self.kernel_id = None
self.ws = None
# Authenticate and setup connection
try:
self._authenticate()
self._setup_kernel()
self._connect_websocket()
except Exception as e:
logger.error(f"Failed to initialize Bloomberg Client connection: {e}")
# Don't raise here, allow instantiation but methods might fail
# or caller handles initialization failure
def _authenticate(self):
"""Authenticate with JupyterHub/Lab using password to get session cookies."""
logger.info(f"Connecting to Jupyter at {self.jupyter_url}...")
try:
# 1. Get Login Page to fetch _xsrf (if needed)
r = self.session.get(f"{self.jupyter_url}/login")
if r.status_code != 200:
logger.warning(f"Failed to access login page: {r.status_code}")
xsrf = self.session.cookies.get("_xsrf", "")
# 2. Post Password
data = {"password": self.password}
if xsrf:
data["_xsrf"] = xsrf
login_resp = self.session.post(f"{self.jupyter_url}/login", data=data)
# 3. Verify Auth
api_resp = self.session.get(f"{self.jupyter_url}/api/status")
if api_resp.status_code == 200:
logger.info("✅ Authentication successful.")
else:
raise Exception(f"Authentication failed. Status: {api_resp.status_code}")
except Exception as e:
logger.error(f"❌ Error during authentication: {e}")
raise
def _setup_kernel(self):
"""Find an existing kernel or create a new one."""
try:
# List kernels
resp = self.session.get(f"{self.jupyter_url}/api/kernels")
kernels = resp.json()
# Try to find 'remote_env' or just pick the first available
target_kernel = None
for k in kernels:
if k.get('name') == 'remote_env': # Ideal match
target_kernel = k
break
if not target_kernel and kernels:
target_kernel = kernels[0] # Fallback
if target_kernel:
self.kernel_id = target_kernel['id']
logger.info(f"✅ Found existing kernel: {self.kernel_id} ({target_kernel.get('name')})")
else:
logger.info("No active kernels found. Starting a new one...")
# Start new kernel (assuming python3)
start_resp = self.session.post(f"{self.jupyter_url}/api/kernels", json={"name": "python3"})
if start_resp.status_code == 201:
self.kernel_id = start_resp.json()['id']
logger.info(f"✅ Started new kernel: {self.kernel_id}")
else:
raise Exception(f"Failed to start kernel: {start_resp.text}")
except Exception as e:
logger.error(f"❌ Error setting up kernel: {e}")
raise
def _connect_websocket(self):
"""Connect to the Kernel's WebSocket channel."""
base_ws_url = self.jupyter_url.replace("http", "ws")
ws_url = f"{base_ws_url}/api/kernels/{self.kernel_id}/channels"
# Extract cookies for WS connection
cookies = self.session.cookies.get_dict()
cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()])
try:
self.ws = websocket.create_connection(
ws_url,
header={"Cookie": cookie_str},
timeout=60 # Connection timeout
)
logger.info("✅ WebSocket connected.")
except Exception as e:
logger.error(f"❌ WebSocket connection failed: {e}")
raise
def execute_remote_code(self, code_str):
"""
Send code to remote kernel via WebSocket and wait for result.
Returns list of output strings (stdout).
"""
if not self.ws:
raise Exception("WebSocket not connected")
msg_id = str(uuid.uuid4())
msg = {
"header": {
"msg_id": msg_id,
"username": "client",
"session": str(uuid.uuid4()),
"msg_type": "execute_request",
"version": "5.3"
},
"parent_header": {},
"metadata": {},
"content": {
"code": code_str,
"silent": False,
"store_history": True,
"user_expressions": {},
"allow_stdin": False
}
}
self.ws.send(json.dumps(msg))
outputs = []
error = None
# Loop to receive messages until 'idle'
while True:
try:
resp = json.loads(self.ws.recv())
msg_type = resp['msg_type']
# Check for idle status (command finished)
if msg_type == 'status':
if resp['content']['execution_state'] == 'idle':
# Make sure it's the response to OUR request (simple check)
if resp['parent_header'].get('msg_id') == msg_id:
break
elif msg_type == 'stream':
# Stdout/Stderr
outputs.append(resp['content']['text'])
elif msg_type == 'error':
error = resp['content']['evalue']
logger.error(f"❌ Remote Execution Error: {error}")
for line in resp['content']['traceback']:
logger.error(line)
except Exception as e:
logger.error(f"Error receiving WS message: {e}")
break
if error:
raise Exception(f"Remote Code Execution Failed: {error}")
return "".join(outputs)
# --- Database Operations ---
def _get_db_connection(self):
"""Create a database connection."""
db_host = os.getenv("DB_HOST", "192.168.3.195")
db_user = os.getenv("DB_USER", "value")
db_pass = os.getenv("DB_PASSWORD", "Value609!")
db_name = os.getenv("DB_NAME", "fa3") # Default to fa3 now? fa legacy used 'FA'
db_port = os.getenv("DB_PORT", "5432")
try:
conn = psycopg2.connect(
host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port
)
return conn
except psycopg2.OperationalError as e:
logger.error(f"DB Connection Error: {e}")
return None
def _ensure_schema(self, conn):
"""Ensure the necessary tables exist."""
with conn.cursor() as cur:
# Create table if not exists (Inferred schema from legacy use)
cur.execute("""
CREATE TABLE IF NOT EXISTS stockcard (
id SERIAL PRIMARY KEY,
Company_code TEXT,
update_date DATE,
currency TEXT,
indicator TEXT,
value TEXT,
value_date DATE,
source TEXT
);
""")
# Ensure column exists if table was already created
try:
cur.execute("ALTER TABLE stockcard ADD COLUMN IF NOT EXISTS source TEXT;")
except Exception:
conn.rollback() # Should not happen with IF NOT EXISTS but good practice
# Migrate update_date to TIMESTAMP
try:
cur.execute("ALTER TABLE stockcard ALTER COLUMN update_date TYPE TIMESTAMP USING update_date::timestamp;")
except Exception as e:
# Likely already converted or failed, log but don't crash
logger.info(f"Schema migration note (update_date): {e}")
conn.rollback()
conn.commit()
def save_data(self, data_list):
"""Insert a list of data dictionaries into the database."""
if not data_list:
return
conn = self._get_db_connection()
if not conn: return
self._ensure_schema(conn)
try:
with conn.cursor() as cur:
args_list = []
for d in data_list:
# Format: (Company_code, update_date, currency, indicator, value, value_date, source)
args_list.append((
d['Company_code'],
d['update_date'],
d['currency'],
d['indicator'],
d['value'],
d['value_date'],
'bloomberg'
))
query = """
INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date, source)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
cur.executemany(query, args_list)
conn.commit()
logger.info(f"✅ Saved {len(data_list)} records to database.")
except Exception as e:
logger.error(f"Error saving to stockcard: {e}")
conn.rollback()
finally:
conn.close()
def run_cleanup(self):
"""Run deduplication and view refresh logic."""
conn = self._get_db_connection()
if not conn: return
try:
with conn.cursor() as cur:
# 1. Deduplication
cur.execute('''
WITH DuplicateRows AS (
SELECT
id,
ROW_NUMBER() OVER(
PARTITION BY company_code, currency, indicator, value_date
ORDER BY update_date DESC, id DESC
) as rn
FROM
stockcard
)
DELETE FROM stockcard
WHERE id IN (
SELECT id
FROM DuplicateRows
WHERE rn > 1
);
''')
# 2. Materialized View Refresh
cur.execute('''
CREATE MATERIALIZED VIEW IF NOT EXISTS public.unique_company_codes AS
SELECT
s.Company_code,
(SELECT cn_sub.value
FROM public.stockcard AS cn_sub
WHERE cn_sub.Company_code = s.Company_code
AND cn_sub.indicator = 'company_name'
ORDER BY cn_sub.value ASC
LIMIT 1) AS company_name
FROM
(SELECT DISTINCT Company_code FROM public.stockcard WHERE Company_code IS NOT NULL) s
ORDER BY
s.Company_code;
''')
# Try refresh
try:
cur.execute("REFRESH MATERIALIZED VIEW public.unique_company_codes;")
except:
pass
conn.commit()
logger.info("✅ Cleanup and View Refresh completed.")
except Exception as e:
logger.error(f"❌ Cleanup failed: {e}")
conn.rollback()
finally:
conn.close()
# --- Core Fetching Logic ---
def fetch_company(self, market, symbol, progress_callback=None, force_currency=None):
"""
Main entry point to fetch data for a single company.
"""
# Determine Bloomberg Ticker format
# If symbol already has Equity, use it. Else append.
if "Equity" in symbol:
company_code = symbol
bucket_code = symbol # Usage for bucket?
# If symbol comes in as '631 HK Equity', we might want to standardize?
# Assuming symbol is correct input.
else:
# Special case for China market: use 'CH Equity' instead of 'CN Equity'
mapped_market = "CH" if market == "CN" else market
# Canonical Code (Store in DB): Keep explicit zeros if provided in symbol
company_code = f"{symbol} {mapped_market} Equity"
# Query Ticker (Send to Bloomberg): Handle HK special case
query_ticker = company_code
if market == "HK" or (len(company_code.split()) > 1 and company_code.split()[1] == "HK"):
# Extract number part
parts = company_code.split()
ticker_part = parts[0]
if ticker_part.isdigit():
short_ticker = str(int(ticker_part))
# Reconstruct: 631 HK Equity
query_ticker = f"{short_ticker} {' '.join(parts[1:])}"
today_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"🚀 Starting fetch for: {company_code}")
if progress_callback: progress_callback("Starting Bloomberg session...", 0)
# 0. Prep Remote Environment
init_code = """
import json
import pandas as pd
from datetime import datetime
try:
from blp import blp
except ImportError:
print("Error: 'blp' module not found.")
# Ensure bquery is started
if 'bquery' not in globals():
try:
bquery = blp.BlpQuery().start()
print("BlpQuery started.")
except Exception as e:
print(f"Error starting BlpQuery: {e}")
"""
try:
self.execute_remote_code(init_code)
# 1. Fetch Basic Info
# Determine currency
if force_currency and force_currency != "Auto":
currency = force_currency
logger.info(f"Using forced currency: {currency}")
else:
currency = "USD"
if "JP" in market.upper(): currency = "JPY"
elif "VN" in market.upper(): currency = "VND"
elif "CN" in market.upper(): currency = "CNY"
elif "HK" in market.upper(): currency = "HKD"
logger.info(f"Using auto-detected currency: {currency}")
# 2. Fetch Basic Info
logger.info("Fetching Basic Data...")
if progress_callback: progress_callback("Fetching Company Basic Info...", 10)
try:
basic_data = self._fetch_basic_remote(company_code, currency, query_ticker=query_ticker)
logger.info(f"DEBUG: basic_data before save: {basic_data}")
self.save_data(basic_data)
except Exception as e:
logger.error(f"Error fetching basic data: {e}")
# 3. Fetch Currency Data
logger.info("Fetching Currency Data...")
if progress_callback: progress_callback(f"正在获取货币指标 ({currency})...", 30)
curr_data = []
try:
curr_data = self._fetch_series_remote(company_code, currency, CURRENCY_CONFIG, "currency", query_ticker=query_ticker)
self.save_data(curr_data)
except Exception as e:
logger.error(f"Error fetching currency series: {e}")
# 4. Fetch Non-Currency Data
logger.info("Fetching Non-Currency Data...")
if progress_callback: progress_callback("正在获取非货币指标...", 50)
try:
non_curr_data = self._fetch_series_remote(company_code, currency, NON_CURRENCY_CONFIG, "non_currency", query_ticker=query_ticker)
self.save_data(non_curr_data)
except Exception as e:
logger.error(f"Error fetching non-currency series: {e}")
# 5. Fetch Price Data (Aligned with Revenue Dates)
logger.info("Fetching Price Data (Aligned)...")
if progress_callback: progress_callback("正在获取价格指标...", 70)
# Extract Revenue dates
revenue_dates = []
if curr_data:
for item in curr_data:
# Check for "Revenue"
if item['indicator'].lower() == "revenue": # Robust case-insensitive check
if item['value_date']:
revenue_dates.append(item['value_date'])
# Remove duplicates, sort
revenue_dates = sorted(list(set(revenue_dates)), reverse=True)
if revenue_dates:
logger.info(f"Found {len(revenue_dates)} revenue reporting dates. Fetching aligned price data...")
try:
price_data = self._fetch_price_by_dates_remote(company_code, currency, revenue_dates, query_ticker=query_ticker)
self.save_data(price_data)
except Exception as e:
logger.error(f"Error fetching aligned price data: {e}")
else:
logger.warning("No revenue dates found. Falling back to yearly price fetch (Dec 31).")
# Generate last 10 years Dec 31
fallback_dates = []
current_year = datetime.now().year
for i in range(1, 11):
fallback_dates.append(f"{current_year - i}-12-31")
try:
price_data = self._fetch_price_by_dates_remote(company_code, currency, fallback_dates, query_ticker=query_ticker)
self.save_data(price_data)
except Exception as e:
logger.error(f"Error fetching fallback price data: {e}")
# 5. Cleanup
if progress_callback: progress_callback("Finalizing data...", 90)
self.run_cleanup()
logger.info(f"✅ Completed processing for {company_code}")
if progress_callback: progress_callback("Bloomberg data sync complete", 100)
except Exception as e:
logger.error(f"Fetch failed for {company_code}: {e}")
if progress_callback: progress_callback(f"Error: {e}", 0)
raise e
def _fetch_basic_remote(self, company_code, currency, query_ticker=None):
"""Generates code to fetch basic data"""
target_ticker = query_ticker if query_ticker else company_code
code = f"""
def get_basic():
company = "{company_code}"
query_ticker = "{target_ticker}"
curr = "{currency}"
res_list = []
# 1. BDP Query for most fields (except Market Cap)
fields = ["NAME", "PE_RATIO", "PX_TO_BOOK_RATIO", "PCT_REVENUE_FROM_FOREIGN_SOURCES", "DIVIDEND_12_MONTH_YIELD", "EQY_INIT_PO_DT"]
try:
# Use overrides for currency. Expects list of tuples.
# Try both CURRENCY (for price) and EQY_FUND_CRNCY (for fundamentals)
df = bquery.bdp([query_ticker], fields, overrides=[('CURRENCY', curr), ('EQY_FUND_CRNCY', curr)])
if not df.empty:
def safe_get(df, col):
if col in df.columns:
val = df[col].iloc[0]
if val is None: return None
if hasattr(val, 'strftime'): return val.strftime('%Y-%m-%d')
return str(val)
return None
res_list.append({{"indicator": "company_name", "value": safe_get(df, 'NAME')}})
res_list.append({{"indicator": "pe_ratio", "value": safe_get(df, 'PE_RATIO')}})
res_list.append({{"indicator": "pb_ratio", "value": safe_get(df, 'PX_TO_BOOK_RATIO')}})
# res_list.append({{"indicator": "market_cap", "value": safe_get(df, 'CUR_MKT_CAP')}}) # Moved to BDH
res_list.append({{"indicator": "Rev_Abroad", "value": safe_get(df, 'PCT_REVENUE_FROM_FOREIGN_SOURCES')}})
res_list.append({{"indicator": "dividend_yield", "value": safe_get(df, 'DIVIDEND_12_MONTH_YIELD')}})
res_list.append({{"indicator": "IPO_date", "value": safe_get(df, 'EQY_INIT_PO_DT')}})
except Exception as e:
print(f"Basic BDP Error: {{e}}")
# 2. BDH Query for Market Cap (To enforce Currency)
try:
import datetime
end_date = datetime.datetime.now().strftime('%Y%m%d')
start_date = (datetime.datetime.now() - datetime.timedelta(days=10)).strftime('%Y%m%d')
# Fetch CUR_MKT_CAP via BDH to respect currency option
df_cap = bquery.bdh([query_ticker], ['CUR_MKT_CAP'], start_date=start_date, end_date=end_date, options={{'currency': curr}})
if not df_cap.empty:
# BDH usually returns Market Cap in Millions.
# User simplified requirement: Keep it in Millions.
# Frontend will divide by 100 to get "Yi".
val_millions = df_cap.iloc[-1]['CUR_MKT_CAP']
res_list.append({{"indicator": "market_cap", "value": str(val_millions)}})
except Exception as e:
print(f"Basic BDH Market Cap Error: {{e}}")
# Format result
final_res = []
import datetime
today_dt = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
for item in res_list:
# Check against None string too if needed
if item['value'] and str(item['value']) != 'None':
final_res.append({{
"Company_code": company,
"update_date": today_dt,
"currency": curr,
"indicator": item['indicator'],
"value": item['value'],
"value_date": today_dt
}})
print("JSON_START")
print(json.dumps(final_res))
print("JSON_END")
get_basic()
"""
return self._execute_and_parse(code)
def _fetch_series_remote(self, company_code, currency, config_dict, result_type, query_ticker=None):
"""Generates code to fetch series data using BDH"""
target_ticker = query_ticker if query_ticker else company_code
config_json = json.dumps(config_dict)
period_years = STOCKCARD_CONFIG['period']
# Calculate start date
end_date = datetime.now()
start_date = end_date - timedelta(days=period_years*365)
# Format dates for BDH (YYYYMMDD)
start_date_str = start_date.strftime('%Y%m%d')
end_date_str = end_date.strftime('%Y%m%d')
# BDH Options
bdh_options = {
"periodicityAdjustment": "FISCAL",
"periodicitySelection": "YEARLY",
"currency": currency,
# "nonTradingDayFillOption": "NON_TRADING_WEEKDAYS",
# "nonTradingDayFillMethod": "PREVIOUS_VALUE"
}
bdh_opts_json = json.dumps(bdh_options)
code = f"""
def get_series():
company = "{company_code}"
query_ticker = "{target_ticker}"
curr = "{currency}"
config = {config_json}
mnemonic_map = {{v.upper(): k for k, v in config.items()}}
fields = list(mnemonic_map.keys())
res_list = []
try:
df = bquery.bdh(
[query_ticker],
fields,
start_date='{start_date_str}',
end_date='{end_date_str}',
options={bdh_opts_json}
)
if not df.empty:
for _, row in df.iterrows():
date_val = None
if 'date' in df.columns: date_val = row['date']
elif 'DATE' in df.columns: date_val = row['DATE']
if not date_val: continue
date_str = str(date_val)[:10]
for mnemonic, indicator_name in mnemonic_map.items():
col_name = mnemonic
# Handle multi-column result from bdh if any
# Usually bdh returns column names as requested fields
val = None
# Flexible column matching
if col_name in row:
val = row[col_name]
elif col_name in df.columns:
val = row[col_name]
else:
# Try case insensitive
for c in df.columns:
if c.upper() == col_name:
val = row[c]
break
if pd.isna(val) or val is None:
continue
val_str = str(val)
res_list.append({{
"Company_code": company,
"update_date": "{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"currency": curr,
"indicator": indicator_name,
"value": val_str,
"value_date": date_str
}})
except Exception as e:
print(f"BDH Error: {{e}}")
print("JSON_START")
print(json.dumps(res_list))
print("JSON_END")
get_series()
"""
return self._execute_and_parse(code)
def _fetch_price_by_dates_remote(self, company_code, currency, dates, query_ticker=None):
"""Generates code to fetch price/mkt_cap for specific dates"""
if not dates:
return []
target_ticker = query_ticker if query_ticker else company_code
dates_json = json.dumps(dates)
config_json = json.dumps(PRICE_CONFIG)
bdh_options = {
'currency': currency,
'nonTradingDayFillOption': 'ALL_CALENDAR_DAYS',
'nonTradingDayFillMethod': 'PREVIOUS_VALUE'
}
bdh_opts_json = json.dumps(bdh_options)
code = f"""
def get_price_by_dates():
company = "{company_code}"
query_ticker = "{target_ticker}"
curr = "{currency}"
dates = {dates_json}
config = {config_json}
mnemonic_map = {{v.upper(): k for k, v in config.items()}}
fields = list(mnemonic_map.keys())
res_list = []
for d_str in dates:
# d_str is 'YYYY-MM-DD', bdh needs 'YYYYMMDD'
d_param = d_str.replace('-', '')
for mnemonic, indicator_name in mnemonic_map.items():
field = mnemonic
try:
df = bquery.bdh(
[query_ticker],
[field],
start_date=d_param,
end_date=d_param,
options={bdh_opts_json}
)
if not df.empty:
for _, row in df.iterrows():
val = None
if field in df.columns:
val = row[field]
elif field in row:
val = row[field]
else:
# Try case insensitive
for c in df.columns:
if c.upper() == field:
val = row[c]
break
if pd.isna(val) or val is None:
continue
val_str = str(val)
res_list.append({{
"Company_code": company,
"update_date": "{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
"currency": curr,
"indicator": indicator_name,
"value": val_str,
"value_date": d_str
}})
except Exception as e:
# print(f"BDH Error for {{field}}: {{e}}")
pass
print("JSON_START")
print(json.dumps(res_list))
print("JSON_END")
get_price_by_dates()
"""
return self._execute_and_parse(code)
def _execute_and_parse(self, code):
"""Execute code and parse [JSON_START]...[JSON_END]"""
raw_output_list = self.execute_remote_code(code)
raw_output = "".join(raw_output_list) if isinstance(raw_output_list, list) else str(raw_output_list)
# Always log raw output first few chars for debug
logger.info(f"REMOTE RAW OUTPUT: {raw_output[:1000]}")
# Actually let's log everything if it fails to find JSON
# Simple parser
try:
start_idx = raw_output.find("JSON_START")
end_idx = raw_output.find("JSON_END")
if start_idx != -1 and end_idx != -1:
json_str = raw_output[start_idx + len("JSON_START"):end_idx].strip()
data = json.loads(json_str)
logger.info(f"✅ Parsed {len(data) if isinstance(data, list) else 1} items from remote.")
return data
else:
logger.warning(f"⚠️ No JSON output found in remote response. Raw: {raw_output}")
return []
except Exception as e:
logger.error(f"❌ Error parsing JSON from remote: {e}")
return []