776 lines
28 KiB
Python
776 lines
28 KiB
Python
|
|
import os
|
|
import sys
|
|
import json
|
|
import uuid
|
|
import time
|
|
import requests
|
|
import websocket
|
|
import psycopg2
|
|
import pandas as pd
|
|
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
|
|
from datetime import datetime
|
|
from dotenv import load_dotenv
|
|
from pathlib import Path
|
|
import logging
|
|
|
|
# Configure logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Explicitly load .env
|
|
ROOT_DIR = Path(__file__).resolve().parent.parent.parent.parent
|
|
load_dotenv(ROOT_DIR / ".env")
|
|
|
|
# --- Configurations ---
|
|
|
|
CURRENCY_CONFIG = {
|
|
"Revenue": "SALES_REV_TURN",
|
|
"Net_Income": "EARN_FOR_COMMON",
|
|
"Cash_From_Operating": "CF_CASH_FROM_OPER",
|
|
"Capital_Expenditure": "CAPITAL_EXPEND",
|
|
"Free_Cash_Flow": "CF_FREE_CASH_FLOW",
|
|
"Dividends_Paid": "CF_DVD_PAID",
|
|
"Total_Assets": "BS_TOT_ASSET",
|
|
"Equity": "BS_TOT_EQY",
|
|
"Goodwill": "BS_GOODWILL",
|
|
"SG&A": "IS_SG_AND_A_EXPENSE",
|
|
"Selling&Marketing": "IS_SELLING_EXPENSES",
|
|
"General&Admin": "IS_GENERAL_AND_ADMIN_GAAP",
|
|
"R&D": "IS_RD_EXPEND",
|
|
"Depreciation": "IS_DEPR_EXP",
|
|
"Cash": "BS_CASH_NEAR_CASH_ITEM",
|
|
"Inventory": "BS_INVENTORIES",
|
|
"Accounts&Notes_Receivable": "BS_ACCT_NOTE_RCV",
|
|
"Prepaid": "BS_PREPAY",
|
|
"Property_Plant&Equipment": "BS_NET_FIX_ASSET",
|
|
"LT_Investment": "BS_LT_INVEST",
|
|
"Accounts_Payable": "BS_ACCT_PAYABLE",
|
|
"ST_Debt": "BS_ST_BORROW",
|
|
"LT_Debt": "BS_LT_BORROW",
|
|
"ST_Defer_Rev":"ST_DEFERRED_REVENUE",
|
|
"repurchase":"cf_proceeds_repurchase_equity"
|
|
}
|
|
|
|
NON_CURRENCY_CONFIG = {
|
|
"ROE": "RETURN_COM_EQY",
|
|
"ROA": "RETURN_ON_ASSET",
|
|
"ROCE": "RETURN_ON_CAPITAL_EMPLOYED",
|
|
"Gross_Margin": "GROSS_MARGIN",
|
|
"EBITDA_margin": "EBITDA_TO_REVENUE",
|
|
"Net_Profit_Margin": "PROF_MARGIN",
|
|
"Tax_Rate": "IS_STATUTORY_TAX_RATE",
|
|
"Inventory_Days": "INVENT_DAYS",
|
|
"Days_Sales_Outstanding": "ANNUALIZED_DAYS_SALES_OUTSTDG",
|
|
"Payables_Days": "ACCOUNTS_PAYABLE_TURNOVER_DAYS",
|
|
"Employee": "NUM_OF_EMPLOYEES",
|
|
"PE": "PE_RATIO",
|
|
"PB": "PX_TO_BOOK_RATIO",
|
|
"Shareholders": "BS_NUM_OF_SHAREHOLDERS",
|
|
"Dividend_Payout_Ratio":"DVD_PAYOUT_RATIO",
|
|
"Total_Debt_Ratio":"TOT_DEBT_TO_TOT_ASSET",
|
|
"Net_Fixed_Asset_Turnover":"NET_FIX_ASSET_TURN",
|
|
"Asset_Turnover":"ASSET_TURNOVER",
|
|
"NetIncome_Growth":"earn_for_com_growth",
|
|
"Revenue_Growth":"sales_growth",
|
|
}
|
|
|
|
PRICE_CONFIG = {
|
|
"Last_Price": "PX_LAST",
|
|
"Market_Cap":"cur_mkt_cap",
|
|
"Dividend_Yield": "DIVIDEND_12_MONTH_YIELD"
|
|
}
|
|
|
|
STOCKCARD_CONFIG = {
|
|
"period": 15,
|
|
"unit": 100000000
|
|
}
|
|
|
|
|
|
class BloombergClient:
|
|
"""
|
|
Backend Bloomberg Client.
|
|
Connects to remote Jupyter Kernel via WebSocket to execute 'blp' code.
|
|
Persists data to 'stockcard' table (EAV pattern) using direct DB connection.
|
|
"""
|
|
def __init__(self, jupyter_url=None, password=None):
|
|
self.jupyter_url = jupyter_url or os.getenv("JUPYTER_URL", "http://192.168.3.161:8888")
|
|
self.jupyter_url = self.jupyter_url.rstrip("/")
|
|
self.password = password or os.getenv("JUPYTER_PASSWORD", "Value609!")
|
|
self.session = requests.Session()
|
|
self.kernel_id = None
|
|
self.ws = None
|
|
|
|
# Authenticate and setup connection
|
|
try:
|
|
self._authenticate()
|
|
self._setup_kernel()
|
|
self._connect_websocket()
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Bloomberg Client connection: {e}")
|
|
# Don't raise here, allow instantiation but methods might fail
|
|
# or caller handles initialization failure
|
|
|
|
def _authenticate(self):
|
|
"""Authenticate with JupyterHub/Lab using password to get session cookies."""
|
|
logger.info(f"Connecting to Jupyter at {self.jupyter_url}...")
|
|
try:
|
|
# 1. Get Login Page to fetch _xsrf (if needed)
|
|
r = self.session.get(f"{self.jupyter_url}/login")
|
|
if r.status_code != 200:
|
|
logger.warning(f"Failed to access login page: {r.status_code}")
|
|
|
|
xsrf = self.session.cookies.get("_xsrf", "")
|
|
|
|
# 2. Post Password
|
|
data = {"password": self.password}
|
|
if xsrf:
|
|
data["_xsrf"] = xsrf
|
|
|
|
login_resp = self.session.post(f"{self.jupyter_url}/login", data=data)
|
|
|
|
# 3. Verify Auth
|
|
api_resp = self.session.get(f"{self.jupyter_url}/api/status")
|
|
if api_resp.status_code == 200:
|
|
logger.info("✅ Authentication successful.")
|
|
else:
|
|
raise Exception(f"Authentication failed. Status: {api_resp.status_code}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error during authentication: {e}")
|
|
raise
|
|
|
|
def _setup_kernel(self):
|
|
"""Find an existing kernel or create a new one."""
|
|
try:
|
|
# List kernels
|
|
resp = self.session.get(f"{self.jupyter_url}/api/kernels")
|
|
kernels = resp.json()
|
|
|
|
# Try to find 'remote_env' or just pick the first available
|
|
target_kernel = None
|
|
for k in kernels:
|
|
if k.get('name') == 'remote_env': # Ideal match
|
|
target_kernel = k
|
|
break
|
|
|
|
if not target_kernel and kernels:
|
|
target_kernel = kernels[0] # Fallback
|
|
|
|
if target_kernel:
|
|
self.kernel_id = target_kernel['id']
|
|
logger.info(f"✅ Found existing kernel: {self.kernel_id} ({target_kernel.get('name')})")
|
|
else:
|
|
logger.info("No active kernels found. Starting a new one...")
|
|
# Start new kernel (assuming python3)
|
|
start_resp = self.session.post(f"{self.jupyter_url}/api/kernels", json={"name": "python3"})
|
|
if start_resp.status_code == 201:
|
|
self.kernel_id = start_resp.json()['id']
|
|
logger.info(f"✅ Started new kernel: {self.kernel_id}")
|
|
else:
|
|
raise Exception(f"Failed to start kernel: {start_resp.text}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"❌ Error setting up kernel: {e}")
|
|
raise
|
|
|
|
def _connect_websocket(self):
|
|
"""Connect to the Kernel's WebSocket channel."""
|
|
base_ws_url = self.jupyter_url.replace("http", "ws")
|
|
ws_url = f"{base_ws_url}/api/kernels/{self.kernel_id}/channels"
|
|
|
|
# Extract cookies for WS connection
|
|
cookies = self.session.cookies.get_dict()
|
|
cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()])
|
|
|
|
try:
|
|
self.ws = websocket.create_connection(
|
|
ws_url,
|
|
header={"Cookie": cookie_str},
|
|
timeout=60 # Connection timeout
|
|
)
|
|
logger.info("✅ WebSocket connected.")
|
|
except Exception as e:
|
|
logger.error(f"❌ WebSocket connection failed: {e}")
|
|
raise
|
|
|
|
def execute_remote_code(self, code_str):
|
|
"""
|
|
Send code to remote kernel via WebSocket and wait for result.
|
|
Returns list of output strings (stdout).
|
|
"""
|
|
if not self.ws:
|
|
raise Exception("WebSocket not connected")
|
|
|
|
msg_id = str(uuid.uuid4())
|
|
msg = {
|
|
"header": {
|
|
"msg_id": msg_id,
|
|
"username": "client",
|
|
"session": str(uuid.uuid4()),
|
|
"msg_type": "execute_request",
|
|
"version": "5.3"
|
|
},
|
|
"parent_header": {},
|
|
"metadata": {},
|
|
"content": {
|
|
"code": code_str,
|
|
"silent": False,
|
|
"store_history": True,
|
|
"user_expressions": {},
|
|
"allow_stdin": False
|
|
}
|
|
}
|
|
|
|
self.ws.send(json.dumps(msg))
|
|
|
|
outputs = []
|
|
error = None
|
|
|
|
# Loop to receive messages until 'idle'
|
|
while True:
|
|
try:
|
|
resp = json.loads(self.ws.recv())
|
|
msg_type = resp['msg_type']
|
|
|
|
# Check for idle status (command finished)
|
|
if msg_type == 'status':
|
|
if resp['content']['execution_state'] == 'idle':
|
|
# Make sure it's the response to OUR request (simple check)
|
|
if resp['parent_header'].get('msg_id') == msg_id:
|
|
break
|
|
|
|
elif msg_type == 'stream':
|
|
# Stdout/Stderr
|
|
outputs.append(resp['content']['text'])
|
|
|
|
elif msg_type == 'error':
|
|
error = resp['content']['evalue']
|
|
logger.error(f"❌ Remote Execution Error: {error}")
|
|
for line in resp['content']['traceback']:
|
|
logger.error(line)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error receiving WS message: {e}")
|
|
break
|
|
|
|
if error:
|
|
raise Exception(f"Remote Code Execution Failed: {error}")
|
|
|
|
return "".join(outputs)
|
|
|
|
# --- Database Operations ---
|
|
|
|
def _get_db_connection(self):
|
|
"""Create a database connection."""
|
|
db_host = os.getenv("DB_HOST", "192.168.3.195")
|
|
db_user = os.getenv("DB_USER", "value")
|
|
db_pass = os.getenv("DB_PASSWORD", "Value609!")
|
|
db_name = os.getenv("DB_NAME", "fa3") # Default to fa3 now? fa legacy used 'FA'
|
|
db_port = os.getenv("DB_PORT", "5432")
|
|
|
|
try:
|
|
conn = psycopg2.connect(
|
|
host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port
|
|
)
|
|
return conn
|
|
except psycopg2.OperationalError as e:
|
|
logger.error(f"DB Connection Error: {e}")
|
|
return None
|
|
|
|
def _ensure_schema(self, conn):
|
|
"""Ensure the necessary tables exist."""
|
|
with conn.cursor() as cur:
|
|
# Create table if not exists (Inferred schema from legacy use)
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS stockcard (
|
|
id SERIAL PRIMARY KEY,
|
|
Company_code TEXT,
|
|
update_date DATE,
|
|
currency TEXT,
|
|
indicator TEXT,
|
|
value TEXT,
|
|
value_date DATE,
|
|
source TEXT
|
|
);
|
|
""")
|
|
|
|
# Ensure column exists if table was already created
|
|
try:
|
|
cur.execute("ALTER TABLE stockcard ADD COLUMN IF NOT EXISTS source TEXT;")
|
|
except Exception:
|
|
conn.rollback() # Should not happen with IF NOT EXISTS but good practice
|
|
|
|
conn.commit()
|
|
|
|
def save_data(self, data_list):
|
|
"""Insert a list of data dictionaries into the database."""
|
|
if not data_list:
|
|
return
|
|
|
|
conn = self._get_db_connection()
|
|
if not conn: return
|
|
|
|
self._ensure_schema(conn)
|
|
|
|
try:
|
|
with conn.cursor() as cur:
|
|
args_list = []
|
|
for d in data_list:
|
|
# Format: (Company_code, update_date, currency, indicator, value, value_date, source)
|
|
args_list.append((
|
|
d['Company_code'],
|
|
d['update_date'],
|
|
d['currency'],
|
|
d['indicator'],
|
|
d['value'],
|
|
d['value_date'],
|
|
'bloomberg'
|
|
))
|
|
|
|
query = """
|
|
INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date, source)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
"""
|
|
cur.executemany(query, args_list)
|
|
conn.commit()
|
|
logger.info(f"✅ Saved {len(data_list)} records to database.")
|
|
except Exception as e:
|
|
logger.error(f"Error saving to stockcard: {e}")
|
|
conn.rollback()
|
|
finally:
|
|
conn.close()
|
|
|
|
def run_cleanup(self):
|
|
"""Run deduplication and view refresh logic."""
|
|
conn = self._get_db_connection()
|
|
if not conn: return
|
|
|
|
try:
|
|
with conn.cursor() as cur:
|
|
# 1. Deduplication
|
|
cur.execute('''
|
|
WITH DuplicateRows AS (
|
|
SELECT
|
|
id,
|
|
ROW_NUMBER() OVER(
|
|
PARTITION BY company_code, currency, indicator, value_date
|
|
ORDER BY update_date DESC
|
|
) as rn
|
|
FROM
|
|
stockcard
|
|
)
|
|
DELETE FROM stockcard
|
|
WHERE id IN (
|
|
SELECT id
|
|
FROM DuplicateRows
|
|
WHERE rn > 1
|
|
);
|
|
''')
|
|
|
|
# 2. Materialized View Refresh
|
|
cur.execute('''
|
|
CREATE MATERIALIZED VIEW IF NOT EXISTS public.unique_company_codes AS
|
|
SELECT
|
|
s.Company_code,
|
|
(SELECT cn_sub.value
|
|
FROM public.stockcard AS cn_sub
|
|
WHERE cn_sub.Company_code = s.Company_code
|
|
AND cn_sub.indicator = 'company_name'
|
|
ORDER BY cn_sub.value ASC
|
|
LIMIT 1) AS company_name
|
|
FROM
|
|
(SELECT DISTINCT Company_code FROM public.stockcard WHERE Company_code IS NOT NULL) s
|
|
ORDER BY
|
|
s.Company_code;
|
|
''')
|
|
# Try refresh
|
|
try:
|
|
cur.execute("REFRESH MATERIALIZED VIEW public.unique_company_codes;")
|
|
except:
|
|
pass
|
|
|
|
conn.commit()
|
|
logger.info("✅ Cleanup and View Refresh completed.")
|
|
except Exception as e:
|
|
logger.error(f"❌ Cleanup failed: {e}")
|
|
conn.rollback()
|
|
finally:
|
|
conn.close()
|
|
|
|
# --- Core Fetching Logic ---
|
|
|
|
def fetch_company(self, market, symbol, progress_callback=None):
|
|
"""
|
|
Main entry point to fetch data for a single company.
|
|
"""
|
|
# Determine Bloomberg Ticker format
|
|
# If symbol already has Equity, use it. Else append.
|
|
if "Equity" in symbol:
|
|
company_code = symbol
|
|
else:
|
|
# Special case for China market: use 'CH Equity' instead of 'CN Equity'
|
|
mapped_market = "CH" if market == "CN" else market
|
|
company_code = f"{symbol} {mapped_market} Equity"
|
|
|
|
today_str = datetime.now().strftime('%Y-%m-%d')
|
|
|
|
logger.info(f"🚀 Starting fetch for: {company_code}")
|
|
if progress_callback: progress_callback("Starting Bloomberg session...", 0)
|
|
|
|
# 0. Prep Remote Environment
|
|
init_code = """
|
|
import json
|
|
import pandas as pd
|
|
from datetime import datetime
|
|
try:
|
|
from blp import blp
|
|
except ImportError:
|
|
print("Error: 'blp' module not found.")
|
|
|
|
# Ensure bquery is started
|
|
if 'bquery' not in globals():
|
|
try:
|
|
bquery = blp.BlpQuery().start()
|
|
print("BlpQuery started.")
|
|
except Exception as e:
|
|
print(f"Error starting BlpQuery: {e}")
|
|
"""
|
|
try:
|
|
self.execute_remote_code(init_code)
|
|
|
|
# 1. Fetch Basic Info
|
|
logger.info("Fetching Basic Data...")
|
|
if progress_callback: progress_callback("Fetching Company Basic Info...", 10)
|
|
basic_data = self._fetch_basic_remote(company_code, "USD")
|
|
self.save_data(basic_data)
|
|
|
|
# Determine currency
|
|
currency = "USD"
|
|
if "JP" in market.upper(): currency = "JPY"
|
|
elif "VN" in market.upper(): currency = "VND"
|
|
elif "CN" in market.upper(): currency = "CNY"
|
|
elif "HK" in market.upper(): currency = "HKD"
|
|
|
|
logger.info(f"Using currency: {currency}")
|
|
|
|
# 2. Fetch Currency Data
|
|
logger.info("Fetching Currency Data...")
|
|
if progress_callback: progress_callback(f"正在获取货币指标 ({currency})...", 30)
|
|
curr_data = self._fetch_series_remote(company_code, currency, CURRENCY_CONFIG, "currency")
|
|
self.save_data(curr_data)
|
|
|
|
# 3. Fetch Non-Currency Data
|
|
logger.info("Fetching Non-Currency Data...")
|
|
if progress_callback: progress_callback("正在获取非货币指标...", 50)
|
|
non_curr_data = self._fetch_series_remote(company_code, currency, NON_CURRENCY_CONFIG, "non_currency")
|
|
self.save_data(non_curr_data)
|
|
|
|
# 4. Fetch Price Data (Aligned with Revenue Dates)
|
|
logger.info("Fetching Price Data (Aligned)...")
|
|
if progress_callback: progress_callback("正在获取价格指标...", 70)
|
|
|
|
# Extract Revenue dates
|
|
revenue_dates = []
|
|
rev_key = CURRENCY_CONFIG.get("Revenue", "SALES_REV_TURN")
|
|
# The saved data uses indicator name from config keys (e.g. "Revenue")
|
|
# So looking for "Revenue" in saved entries
|
|
|
|
for item in curr_data:
|
|
# Check for "Revenue" (case insensitive match if needed)
|
|
if item['indicator'].lower() == 'revenue':
|
|
if item['value_date']:
|
|
# Ensure YYYY-MM-DD
|
|
revenue_dates.append(item['value_date'])
|
|
|
|
# Remove specs, duplicates, sort
|
|
revenue_dates = sorted(list(set(revenue_dates)), reverse=True)
|
|
|
|
if revenue_dates:
|
|
logger.info(f"Found {len(revenue_dates)} revenue reporting dates. Fetching aligned price data...")
|
|
price_data = self._fetch_price_by_dates_remote(company_code, currency, revenue_dates)
|
|
self.save_data(price_data)
|
|
else:
|
|
logger.warning("No revenue dates found. Falling back to yearly price fetch.")
|
|
price_data = self._fetch_series_remote(company_code, currency, PRICE_CONFIG, "price")
|
|
self.save_data(price_data)
|
|
|
|
# 5. Cleanup
|
|
if progress_callback: progress_callback("Finalizing data...", 90)
|
|
self.run_cleanup()
|
|
logger.info(f"✅ Completed processing for {company_code}")
|
|
if progress_callback: progress_callback("Bloomberg data sync complete", 100)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fetch failed for {company_code}: {e}")
|
|
if progress_callback: progress_callback(f"Error: {e}", 0)
|
|
raise e
|
|
|
|
def _fetch_basic_remote(self, company_code, currency):
|
|
"""Generates code to fetch basic data"""
|
|
code = f"""
|
|
def get_basic():
|
|
company = "{company_code}"
|
|
curr = "{currency}"
|
|
res_list = []
|
|
|
|
# 1. BQL query
|
|
q = f"for(['{{company}}']) get(name,listing_date,pe_ratio,px_to_book_ratio,cur_mkt_cap(currency={{curr}}),PCT_REVENUE_FROM_FOREIGN_SOURCES)"
|
|
try:
|
|
df = bquery.bql(q)
|
|
if not df.empty:
|
|
def get_val(df, field_name):
|
|
if field_name == 'name':
|
|
rows = df[df['field'] == 'name']
|
|
elif 'cur_mkt_cap' in field_name:
|
|
rows = df[df['field'].str.contains('cur_mkt_cap')]
|
|
elif 'FOREIGN' in field_name:
|
|
rows = df[df['field'].str.contains('FOREIGN')]
|
|
else:
|
|
rows = df[df['field'] == field_name]
|
|
|
|
if not rows.empty:
|
|
val = rows['value'].iloc[0]
|
|
# Handle Timestamp object from remote pandas
|
|
if hasattr(val, 'strftime'):
|
|
return val.strftime('%Y-%m-%d')
|
|
return str(val) if val is not None else None
|
|
return None
|
|
|
|
res_list.append({{"indicator": "company_name", "value": get_val(df, 'name')}})
|
|
res_list.append({{"indicator": "pe_ratio", "value": get_val(df, 'pe_ratio')}})
|
|
res_list.append({{"indicator": "pb_ratio", "value": get_val(df, 'pb_ratio')}})
|
|
res_list.append({{"indicator": "market_cap", "value": get_val(df, 'cur_mkt_cap')}})
|
|
res_list.append({{"indicator": "Rev_Abroad", "value": get_val(df, 'FOREIGN')}})
|
|
except Exception as e:
|
|
print(f"Basic BQL Error: {{e}}")
|
|
|
|
# 2. BDP for IPO and Dividend
|
|
try:
|
|
did = bquery.bdp([company], ["DIVIDEND_12_MONTH_YIELD"])
|
|
if not did.empty and 'DIVIDEND_12_MONTH_YIELD' in did.columns:
|
|
res_list.append({{"indicator": "dividend_yield", "value": str(did['DIVIDEND_12_MONTH_YIELD'][0])}})
|
|
|
|
ipo = bquery.bdp([company], ["EQY_INIT_PO_DT"])
|
|
if not ipo.empty and 'EQY_INIT_PO_DT' in ipo.columns:
|
|
val = ipo['EQY_INIT_PO_DT'][0]
|
|
val_str = str(val)
|
|
if hasattr(val, 'strftime'):
|
|
val_str = val.strftime('%Y-%m-%d')
|
|
res_list.append({{"indicator": "IPO_date", "value": val_str}})
|
|
|
|
except Exception as e:
|
|
print(f"Basic BDP Error: {{e}}")
|
|
|
|
# Format result
|
|
final_res = []
|
|
today = datetime.now().strftime('%Y-%m-%d')
|
|
for item in res_list:
|
|
if item['value']:
|
|
final_res.append({{
|
|
"Company_code": company,
|
|
"update_date": today,
|
|
"currency": curr,
|
|
"indicator": item['indicator'],
|
|
"value": item['value'],
|
|
"value_date": today
|
|
}})
|
|
|
|
print("JSON_START")
|
|
print(json.dumps(final_res))
|
|
print("JSON_END")
|
|
|
|
get_basic()
|
|
"""
|
|
return self._execute_and_parse(code)
|
|
|
|
def _fetch_series_remote(self, company_code, currency, config_dict, result_type):
|
|
"""Generates code to fetch series data using BDH"""
|
|
|
|
config_json = json.dumps(config_dict)
|
|
period_years = STOCKCARD_CONFIG['period']
|
|
|
|
start_year = datetime.now().year - period_years
|
|
start_date = f"{start_year}0101"
|
|
end_date = datetime.now().strftime('%Y%m%d')
|
|
|
|
bdh_options = {
|
|
'periodicitySelection': 'YEARLY',
|
|
'currency': currency,
|
|
'nonTradingDayFillOption': 'ALL_CALENDAR_DAYS',
|
|
'nonTradingDayFillMethod': 'PREVIOUS_VALUE'
|
|
}
|
|
bdh_opts_json = json.dumps(bdh_options)
|
|
|
|
code = f"""
|
|
def get_series():
|
|
company = "{company_code}"
|
|
curr = "{currency}"
|
|
config = {config_json}
|
|
|
|
mnemonic_map = {{v.upper(): k for k, v in config.items()}}
|
|
fields = list(mnemonic_map.keys())
|
|
|
|
res_list = []
|
|
|
|
try:
|
|
df = bquery.bdh(
|
|
[company],
|
|
fields,
|
|
start_date='{start_date}',
|
|
end_date='{end_date}',
|
|
options={bdh_opts_json}
|
|
)
|
|
|
|
if not df.empty:
|
|
for _, row in df.iterrows():
|
|
date_val = None
|
|
if 'date' in df.columns: date_val = row['date']
|
|
elif 'DATE' in df.columns: date_val = row['DATE']
|
|
|
|
if not date_val: continue
|
|
|
|
date_str = str(date_val)[:10]
|
|
|
|
for mnemonic, indicator_name in mnemonic_map.items():
|
|
col_name = mnemonic
|
|
# Handle multi-column result from bdh if any
|
|
# Usually bdh returns column names as requested fields
|
|
|
|
val = None
|
|
# Flexible column matching
|
|
if col_name in row:
|
|
val = row[col_name]
|
|
elif col_name in df.columns:
|
|
val = row[col_name]
|
|
else:
|
|
# Try case insensitive
|
|
for c in df.columns:
|
|
if c.upper() == col_name:
|
|
val = row[c]
|
|
break
|
|
|
|
if pd.isna(val) or val is None:
|
|
continue
|
|
|
|
val_str = str(val)
|
|
|
|
res_list.append({{
|
|
"Company_code": company,
|
|
"update_date": "{datetime.now().strftime('%Y-%m-%d')}",
|
|
"currency": curr,
|
|
"indicator": indicator_name,
|
|
"value": val_str,
|
|
"value_date": date_str
|
|
}})
|
|
|
|
except Exception as e:
|
|
print(f"BDH Error: {{e}}")
|
|
|
|
print("JSON_START")
|
|
print(json.dumps(res_list))
|
|
print("JSON_END")
|
|
|
|
get_series()
|
|
"""
|
|
return self._execute_and_parse(code)
|
|
|
|
def _fetch_price_by_dates_remote(self, company_code, currency, dates):
|
|
"""Generates code to fetch price/mkt_cap for specific dates"""
|
|
if not dates:
|
|
return []
|
|
|
|
dates_json = json.dumps(dates)
|
|
config_json = json.dumps(PRICE_CONFIG)
|
|
|
|
bdh_options = {
|
|
'currency': currency,
|
|
'nonTradingDayFillOption': 'ALL_CALENDAR_DAYS',
|
|
'nonTradingDayFillMethod': 'PREVIOUS_VALUE'
|
|
}
|
|
bdh_opts_json = json.dumps(bdh_options)
|
|
|
|
code = f"""
|
|
def get_price_by_dates():
|
|
company = "{company_code}"
|
|
curr = "{currency}"
|
|
dates = {dates_json}
|
|
config = {config_json}
|
|
|
|
mnemonic_map = {{v.upper(): k for k, v in config.items()}}
|
|
fields = list(mnemonic_map.keys())
|
|
|
|
res_list = []
|
|
|
|
for d_str in dates:
|
|
# d_str is 'YYYY-MM-DD', bdh needs 'YYYYMMDD'
|
|
d_param = d_str.replace('-', '')
|
|
|
|
try:
|
|
df = bquery.bdh(
|
|
[company],
|
|
fields,
|
|
start_date=d_param,
|
|
end_date=d_param,
|
|
options={bdh_opts_json}
|
|
)
|
|
|
|
if not df.empty:
|
|
for _, row in df.iterrows():
|
|
# value_date is d_str
|
|
|
|
for mnemonic, indicator_name in mnemonic_map.items():
|
|
col_name = mnemonic
|
|
# bdh columns might be tuple or just string depending on request
|
|
# usually 'PX_LAST'
|
|
|
|
val = None
|
|
if col_name in df.columns:
|
|
val = row[col_name]
|
|
|
|
if pd.isna(val) or val is None:
|
|
continue
|
|
|
|
val_str = str(val)
|
|
|
|
res_list.append({{
|
|
"Company_code": company,
|
|
"update_date": "{datetime.now().strftime('%Y-%m-%d')}",
|
|
"currency": curr,
|
|
"indicator": indicator_name,
|
|
"value": val_str,
|
|
"value_date": d_str
|
|
}})
|
|
except Exception as e:
|
|
print(f"Error fetching price for {{d_str}}: {{e}}")
|
|
|
|
print("JSON_START")
|
|
print(json.dumps(res_list))
|
|
print("JSON_END")
|
|
|
|
get_price_by_dates()
|
|
"""
|
|
return self._execute_and_parse(code)
|
|
|
|
def _execute_and_parse(self, code):
|
|
"""Execute code and parse [JSON_START]...[JSON_END]"""
|
|
raw_output_list = self.execute_remote_code(code)
|
|
raw_output = "".join(raw_output_list) if isinstance(raw_output_list, list) else str(raw_output_list)
|
|
# logger.debug(f"Remote execution returned {len(raw_output)} chars.") # Optional debug
|
|
|
|
# Simple parser
|
|
try:
|
|
start_idx = raw_output.find("JSON_START")
|
|
end_idx = raw_output.find("JSON_END")
|
|
|
|
if start_idx != -1 and end_idx != -1:
|
|
json_str = raw_output[start_idx + len("JSON_START"):end_idx].strip()
|
|
data = json.loads(json_str)
|
|
logger.info(f"✅ Parsed {len(data) if isinstance(data, list) else 1} items from remote.")
|
|
return data
|
|
else:
|
|
logger.warning(f"⚠️ No JSON output found in remote response.")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"❌ Error parsing JSON from remote: {e}")
|
|
return []
|