增加用bloomberg读数据,并存到数据库中,其他数据源暂时不变

This commit is contained in:
xucheng 2026-01-09 21:52:16 +08:00
parent 62f671bb35
commit 3290b3bdf2
4 changed files with 1235 additions and 106 deletions

9
.env
View File

@ -5,4 +5,11 @@ OPENAI_API_KEY=AIzaSyAT5QXWoACp87oqg1OK4USTIwc2RsJHvIc
OPENAI_BASE_URL=https://generativelanguage.googleapis.com/v1beta/openai/
LLM_MODEL=gemini-2.5-flash
IFIND_REFRESH_TOKEN=eyJzaWduX3RpbWUiOiIyMDI1LTEyLTIxIDE3OjQwOjU2In0=.eyJ1aWQiOiI3MjMwNDQwNzciLCJ1c2VyIjp7ImFjY2Vzc1Rva2VuIjoiYzk5NjdlNGUzNmJjZDliNzI2ZjM5YmQ5MGFkOWQwY2Q4NzE3OTllZi5zaWduc19Oekl6TURRME1EYzMiLCJhY2Nlc3NUb2tlbkV4cGlyZWRUaW1lIjoiMjAyNS0xMi0yMSAxNzo0MDo1NiIsImFjY291bnQiOiJ3eGhsdHowMDEiLCJhdXRoVXNlckluZm8iOnsiRVRyYW5zZmVyIjp0cnVlLCJFZXhjZWxQYXllcnMiOiIxNzk4NjgzMDAyMDAwIn0sImNvZGVDU0kiOltdLCJjb2RlWnpBdXRoIjpbXSwiaGFzQUlQcmVkaWN0IjpmYWxzZSwiaGFzQUlUYWxrIjpmYWxzZSwiaGFzQ0lDQyI6ZmFsc2UsImhhc0NTSSI6ZmFsc2UsImhhc0V2ZW50RHJpdmUiOmZhbHNlLCJoYXNGVFNFIjpmYWxzZSwiaGFzRmFzdCI6ZmFsc2UsImhhc0Z1bmRWYWx1YXRpb24iOmZhbHNlLCJoYXNISyI6dHJ1ZSwiaGFzTE1FIjpmYWxzZSwiaGFzTGV2ZWwyIjpmYWxzZSwiaGFzUmVhbENNRSI6ZmFsc2UsImhhc1RyYW5zZmVyIjpmYWxzZSwiaGFzVVMiOmZhbHNlLCJoYXNVU0FJbmRleCI6ZmFsc2UsImhhc1VTREVCVCI6ZmFsc2UsIm1hcmtldEF1dGgiOnsiRENFIjpmYWxzZX0sIm1heE9uTGluZSI6MSwibm9EaXNrIjpmYWxzZSwicHJvZHVjdFR5cGUiOiJTVVBFUkNPTU1BTkRQUk9EVUNUIiwicmVmcmVzaFRva2VuIjoiIiwicmVmcmVzaFRva2VuRXhwaXJlZFRpbWUiOiIyMDI2LTEyLTMxIDEwOjEwOjAyIiwic2Vzc3Npb24iOiI4ZmYxNzVmYzNmZDkxMzA3YTRlOWIzMTE3Njc4NDlhNiIsInNpZEluZm8iOns2NDoiMTExMTExMTExMTExMTExMTExMTExMTExIiwxOiIxMDEiLDI6IjEiLDY3OiIxMDExMTExMTExMTExMTExMTExMTExMTEiLDM6IjEiLDY5OiIxMTExMTExMTExMTExMTExMTExMTExMTExIiw1OiIxIiw2OiIxIiw3MToiMTExMTExMTExMTExMTExMTExMTExMTAwIiw3OiIxMTExMTExMTExMSIsODoiMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDEiLDEzODoiMTExMTExMTExMTExMTExMTExMTExMTExMSIsMTM5OiIxMTExMTExMTExMTExMTExMTExMTExMTExIiwxNDA6IjExMTExMTExMTExMTExMTExMTExMTExMTEiLDE0MToiMTExMTExMTExMTExMTExMTExMTExMTExMSIsMTQyOiIxMTExMTExMTExMTExMTExMTExMTExMTExIiwxNDM6IjExIiw4MDoiMTExMTExMTExMTExMTExMTExMTExMTExIiw4MToiMTExMTExMTExMTExMTExMTExMTExMTExIiw4MjoiMTExMTExMTExMTExMTExMTExMTEwMTEwIiw4MzoiMTExMTExMTExMTExMTExMTExMDAwMDAwIiw4NToiMDExMTExMTExMTExMTExMTExMTExMTExIiw4NzoiMTExMTExMTEwMDExMTExMDExMTExMTExIiw4OToiMTExMTExMTEwMTEwMTAwMDAwMDAxMTExIiw5MDoiMTExMTEwMTExMTExMTExMTEwMDAxMTExMTAiLDkzOiIxMTExMTExMTExMTExMTExMTAwMDAxMTExIiw5NDoiMTExMTExMTExMTExMTExMTExMTExMTExMSIsOTY6IjExMTExMTExMTExMTExMTExMTExMTExMTEiLDk5OiIxMDAiLDEwMDoiMTExMTAxMTExMTExMTExMTExMCIsMTAyOiIxIiw0NDoiMTEiLDEwOToiMSIsNTM6IjExMTExMTExMTExMTExMTExMTExMTExMSIsNTQ6IjExMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwIiw1NzoiMDAwMDAwMDAwMDAwMDAwMDAwMDAxMDAwMDAwMDAiLDYyOiIxMTExMTExMTExMTExMTExMTExMTExMTEiLDYzOiIxMTExMTExMTExMTExMTExMTExMTExMTEifSwidGltZXN0YW1wIjoiMTc2NjMxMDA1NjAzNyIsInRyYW5zQXV0aCI6ZmFsc2UsInR0bFZhbHVlIjowLCJ1aWQiOiI3MjMwNDQwNzciLCJ1c2VyVHlwZSI6IkZSRUVJQUwiLCJ3aWZpbmRMaW1pdE1hcCI6e319fQ==.03DB82A62F865C511B2C2BD464B535B62B0081C47AE33540D5D236CEF0C1FE9D
GEMINI_API_KEY=AIzaSyAT5QXWoACp87oqg1OK4USTIwc2RsJHvIc
GEMINI_API_KEY=AIzaSyAT5QXWoACp87oqg1OK4USTIwc2RsJHvIc
# 数据库配置
DB_HOST=192.168.3.195
DB_PORT=5432
DB_USER=value
DB_PASSWORD=Value609!
DB_NAME=FA

574
legacy/bloomberg.py Normal file
View File

@ -0,0 +1,574 @@
from datetime import datetime
import pandas as pd
from blp import blp
import psycopg2
import json
currency_config = {
"Revenue": "SALES_REV_TURN",
"Net_Income": "EARN_FOR_COMMON",
"Cash_From_Operating": "CF_CASH_FROM_OPER",
"Capital_Expenditure": "CAPITAL_EXPEND",
"Free_Cash_Flow": "CF_FREE_CASH_FLOW",
"Dividends_Paid": "CF_DVD_PAID",
"Total_Assets": "BS_TOT_ASSET",
"Equity": "BS_TOT_EQY",
"Goodwill": "BS_GOODWILL",
"SG&A": "IS_SG_AND_A_EXPENSE",
"Selling&Marketing": "IS_SELLING_EXPENSES",
"General&Admin": "IS_GENERAL_AND_ADMIN_GAAP",
"R&D": "IS_RD_EXPEND",
"Depreciation": "IS_DEPR_EXP",
"Cash": "BS_CASH_NEAR_CASH_ITEM",
"Inventory": "BS_INVENTORIES",
"Accounts&Notes_Receivable": "BS_ACCT_NOTE_RCV",
"Prepaid": "BS_PREPAY",
"Property_Plant&Equipment": "BS_NET_FIX_ASSET",
"LT_Investment": "BS_LT_INVEST",
"Accounts_Payable": "BS_ACCT_PAYABLE",
"ST_Debt": "BS_ST_BORROW",
"LT_Debt": "BS_LT_BORROW",
"ST_Defer_Rev":"ST_DEFERRED_REVENUE",
"repurchase":"cf_proceeds_repurchase_equity"
}
non_currency_config = {
"ROE": "RETURN_COM_EQY",
"ROA": "RETURN_ON_ASSET",
"ROCE": "RETURN_ON_CAPITAL_EMPLOYED",
"Gross_Margin": "GROSS_MARGIN",
"EBITDA_margin": "EBITDA_TO_REVENUE",
"Net_Profit_Margin": "PROF_MARGIN",
"Tax_Rate": "IS_STATUTORY_TAX_RATE",
"Inventory_Days": "INVENT_DAYS",
"Days_Sales_Outstanding": "ANNUALIZED_DAYS_SALES_OUTSTDG",
"Payables_Days": "ACCOUNTS_PAYABLE_TURNOVER_DAYS",
"Employee": "NUM_OF_EMPLOYEES",
"PE": "px_last/is_eps",
"PB": "PX_TO_BOOK_RATIO",
"Shareholders": "BS_NUM_OF_SHAREHOLDERS",
"Dividend_Payout_Ratio":"DVD_PAYOUT_RATIO",
"Total_Debt_Ratio":"TOT_DEBT_TO_TOT_ASSET",
"Net_Fixed_Asset_Turnover":"NET_FIX_ASSET_TURN",
"Asset_Turnover":"ASSET_TURNOVER",
"NetIncome_Growth":"earn_for_com_growth",
"Revenue_Growth":"sales_growth",
}
price_config= {
"Last_Price": "PX_LAST",
"Market_Cap":"cur_mkt_cap",
}
# 基础配置
stockcard_CONFIG = {
"period": 10,
"unit": 100000000
}
def process_stockcard_data(bquery, cursor, conn, company_code, period, currency):
"""
处理单个公司的股票卡片数据包括基本信息非货币数据货币数据和价格数据并将其存入数据库
参数:
bquery: Bloomberg查询对象
cursor: 数据库游标
conn: 数据库连接
company_code (str): 公司代码例如 'IBM US Equity'
period (int): 查询的年度周期数
currency (str): 货币代码例如 'USD', 'CNY'
"""
try:
# 由于原代码中此处后续是文件读取和数据库连接操作,
# 为了保证资源的正确释放,添加 finally 子句
# 同时添加 except 子句来捕获可能的异常
# 处理输入信息
company_code = company_code + " Equity"
period = int(period)
currency = currency
# 获取基本信息
stockcard_data = get_basic_stock_data(bquery, company_code, currency)
# 插入数据到数据库
for data in stockcard_data:
cursor.execute("""
INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
data['Company_code'],
data['update_date'],
data['currency'],
data['indicator'],
data['value'],
data['value_date']
))
conn.commit()
# 处理非货币数据
process_non_currency_data(bquery, cursor, conn, company_code, period, currency)
# 处理货币数据
process_currency_data(bquery, cursor, conn, company_code, period, currency)
# 处理价格数据
process_price_data(bquery, cursor, conn, company_code, period, currency)
print(f"处理完成: {company_code}")
except Exception as e:
# 这里可以添加具体的异常处理逻辑,例如记录日志等
print(f"捕获到异常: {str(e)}")
print(f"处理数据时发生错误: {str(e)}")
raise
def get_basic_stock_data(bquery, company_code, currency):
"""
从Bloomberg获取股票的基础数据包括公司名称IPO日期市盈率市净率股息收益率市值和海外收入占比
参数:
bquery: Bloomberg查询对象
company_code (str): 公司代码例如 'IBM US Equity'
currency (str): 货币代码用于市值查询例如 'USD', 'CNY'
返回:
list: 包含股票基础数据的字典列表每个字典代表一个指标及其值
"""
print(f"Processing company: {company_code}") # 调试输出
query = f"for(['{company_code.strip()}']) get(name,listing_date,pe_ratio,px_to_book_ratio,cur_mkt_cap(currency={currency}),PCT_REVENUE_FROM_FOREIGN_SOURCES)"
result = bquery.bql(query)
did_result = bquery.bdp([f"{company_code}"], ["DIVIDEND_12_MONTH_YIELD"])
IPO_date = bquery.bdp([f"{company_code}"], ["EQY_INIT_PO_DT"])
return [
{
'Company_code': company_code,
'update_date': datetime.now().strftime('%Y-%m-%d'),
'currency': currency,
'indicator': 'company_name',
'value': result[result['field'] == 'name']['value'].iloc[0],
'value_date': datetime.now().strftime('%Y-%m-%d')
},
{
'Company_code': company_code,
'update_date': datetime.now().strftime('%Y-%m-%d'),
'currency': currency,
'indicator': 'IPO_date',
'value': str(IPO_date['EQY_INIT_PO_DT'][0]),
'value_date': datetime.now().strftime('%Y-%m-%d')
},
{
'Company_code': company_code,
'update_date': datetime.now().strftime('%Y-%m-%d'),
'currency': currency,
'indicator': 'pe_ratio',
'value': str(result[result['field'] == 'pe_ratio']['value'].iloc[0]),
'value_date': datetime.now().strftime('%Y-%m-%d')
},
{
'Company_code': company_code,
'update_date': datetime.now().strftime('%Y-%m-%d'),
'currency': currency,
'indicator': 'pb_ratio',
'value': str(result[result['field'] == 'px_to_book_ratio']['value'].iloc[0]),
'value_date': datetime.now().strftime('%Y-%m-%d')
},
{
'Company_code': company_code,
'update_date': datetime.now().strftime('%Y-%m-%d'),
'currency': currency,
'indicator': 'dividend_yield',
'value': str(did_result['DIVIDEND_12_MONTH_YIELD'][0]),
'value_date': datetime.now().strftime('%Y-%m-%d')
},
{
'Company_code': company_code,
'update_date': datetime.now().strftime('%Y-%m-%d'),
'currency': currency,
'indicator': 'market_cap',
'value': str(result[result['field'].str.contains('cur_mkt_cap')]['value'].iloc[0]),
'value_date': datetime.now().strftime('%Y-%m-%d')
},
{
'Company_code': company_code,
'update_date': datetime.now().strftime('%Y-%m-%d'),
'currency': currency,
'indicator': 'Rev_Abroad',
'value': str(result[result['field'].str.contains('FOREIGN')]['value'].iloc[0]),
'value_date': datetime.now().strftime('%Y-%m-%d')
}
]
def process_currency_data(bquery, cursor, conn, company_code, period, currency):
"""
处理货币相关的财务数据通过Bloomberg BQL查询获取指定公司周期和货币的财务指标数据
并将这些数据批量插入到 `stockcard` 表中
参数:
bquery: Bloomberg查询对象
cursor: 数据库游标
conn: 数据库连接
company_code (str): 公司代码例如 'IBM US Equity'
period (int): 查询的年度周期数
currency (str): 货币代码例如 'USD', 'CNY'
"""
data_to_insert = []
for key, value_bql in currency_config.items():
time_series_query = f"for(['{company_code.strip()}']) get({value_bql}(currency={currency},fa_period_offset=range(-{period}A, 0A),fa_period_type=A))"
print(f"Executing currency series query for {key} ({currency}): {time_series_query}")
result = bquery.bql(time_series_query)
# 检查结果是否为空或不包含预期字段
if result.empty or 'value' not in result.columns or 'secondary_value' not in result.columns:
print(f"No data found for currency indicator: {key}")
continue
# 提取PERIOD_END_DATE对应的值和日期
filtered_results = result[result["secondary_name"] == 'PERIOD_END_DATE']
if filtered_results.empty:
print(f"No PERIOD_END_DATE found for currency indicator: {key}")
continue
for _, row in filtered_results.iterrows():
date_str = row["secondary_value"]
value = row["value"]
try:
# 确保日期格式正确
formatted_date = datetime.strptime(date_str[:10], '%Y-%m-%d').strftime('%Y-%m-%d')
# 转换值为浮点数处理None、空字符串和'None'字符串
numeric_value = float(value) if value is not None and str(value).strip() and str(value).strip().lower() != 'none' else None
data_to_insert.append((
company_code,
datetime.now().strftime('%Y-%m-%d'),
currency,
key,
numeric_value,
formatted_date
))
except (ValueError, TypeError) as e:
print(f"Skipping invalid currency data for {key} on {date_str}: {value}. Error: {e}")
continue
if data_to_insert:
insert_query = """
INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date)
VALUES (%s, %s, %s, %s, %s, %s)
"""
try:
cursor.executemany(insert_query, data_to_insert)
conn.commit()
print(f"Successfully inserted {len(data_to_insert)} currency data points for {company_code}.")
except Exception as e:
conn.rollback() # 回滚所有数据
print(f"Failed to insert currency data for {company_code}. Error: {e}")
def process_price_data(bquery, cursor, conn, company_code, period, currency):
"""
处理价格数据并存入数据库
通过Bloomberg BQL查询获取指定公司周期和货币的价格指标数据
然后将这些数据批量插入到 `stockcard` 表中
参数:
bquery: Bloomberg查询对象
cursor: 数据库游标
conn: 数据库连接
company_code (str): 公司代码例如 'IBM US Equity'
period (int): 查询的年度周期数
currency (str): 货币代码 (例如: 'USD', 'CNY')
"""
data_to_insert = []
for key, value_bql in price_config.items():
# 注意价格数据查询通常是按日或按年per='Y' 表示按年频率
time_series_query = f"for(['{company_code.strip()}']) get({value_bql}(currency={currency},fill='PREV', per='Y', start='-{period}Y'))"
print(f"Executing price series query for {key}: {time_series_query}")
result = bquery.bql(time_series_query)
if result.empty or 'value' not in result.columns or 'secondary_value' not in result.columns:
print(f"No price data found for indicator: {key}")
continue
# 提取DATE对应的值和日期
filtered_results = result[result["secondary_name"] == 'DATE']
if filtered_results.empty:
print(f"No DATE found for price indicator: {key}")
continue
for _, row in filtered_results.iterrows():
date_str = row["secondary_value"]
value = row["value"]
try:
formatted_date = datetime.strptime(date_str[:10], '%Y-%m-%d').strftime('%Y-%m-%d')
numeric_value = float(value) if value is not None and str(value).strip() and str(value).strip().lower() != 'none' else None
data_to_insert.append((
company_code,
datetime.now().strftime('%Y-%m-%d'),
currency,
key,
numeric_value,
formatted_date
))
except (ValueError, TypeError) as e:
print(f"Skipping invalid price data for {key} on {date_str}: {value}. Error: {e}")
continue
if data_to_insert:
insert_query = """
INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date)
VALUES (%s, %s, %s, %s, %s, %s)
"""
try:
cursor.executemany(insert_query, data_to_insert)
conn.commit()
print(f"Successfully inserted {len(data_to_insert)} price data points for {company_code}.")
except Exception as e:
conn.rollback() # 回滚所有数据
print(f"Failed to insert price data for {company_code}. Error: {e}")
def process_non_currency_data(bquery, cursor, conn, company_code, period, currency):
"""
处理非货币相关的财务数据通过Bloomberg BQL查询获取指定公司和周期的非货币财务指标数据
并将这些数据批量插入到 `stockcard` 表中
参数:
bquery: Bloomberg查询对象
cursor: 数据库游标
conn: 数据库连接
company_code (str): 公司代码例如 'IBM US Equity'
period (int): 查询的年度周期数
currency (str): 货币代码 (例如: 'USD', 'CNY')此参数在此函数中可能不直接用于BQL查询但作为数据记录的一部分
"""
data_to_insert = []
for key, value_bql in non_currency_config.items():
time_series_query = f"for(['{company_code.strip()}']) get({value_bql}(fa_period_offset=range(-{period}A, 0A),fa_period_type=A))"
print(f"Executing non-currency series query for {key}: {time_series_query}")
result = bquery.bql(time_series_query)
if result.empty or 'value' not in result.columns or 'secondary_value' not in result.columns:
print(f"No data found for non-currency indicator: {key}")
continue
# 提取PERIOD_END_DATE对应的值和日期
filtered_results = result[result["secondary_name"] == 'PERIOD_END_DATE']
if filtered_results.empty:
print(f"No PERIOD_END_DATE found for non-currency indicator: {key}")
continue
for _, row in filtered_results.iterrows():
date_str = row["secondary_value"]
value = row["value"]
try:
formatted_date = datetime.strptime(date_str[:10], '%Y-%m-%d').strftime('%Y-%m-%d')
numeric_value = float(value) if value is not None and str(value).strip() and str(value).strip().lower() != 'none' else None
data_to_insert.append((
company_code,
datetime.now().strftime('%Y-%m-%d'),
currency, # currency 即使不用于BQL查询仍作为记录的一部分
key,
numeric_value,
formatted_date
))
except (ValueError, TypeError) as e:
print(f"Skipping invalid non-currency data for {key} on {date_str}: {value}. Error: {e}")
continue
if data_to_insert:
insert_query = """
INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date)
VALUES (%s, %s, %s, %s, %s, %s)
"""
try:
cursor.executemany(insert_query, data_to_insert)
conn.commit()
print(f"Successfully inserted {len(data_to_insert)} non-currency data points for {company_code}.")
except Exception as e:
conn.rollback() # 回滚所有数据
print(f"Failed to insert non-currency data for {company_code}. Error: {e}")
def get_waiting_list(cursor, conn):
"""
从数据库中获取待处理的公司列表这些公司在 `waiting_list` 表中 `status` 字段为 0
本函数仅负责查询并返回列表不负责更新 `status` 字段
参数:
cursor: 数据库游标
conn: 数据库连接
返回:
pd.DataFrame: 包含待处理公司代码和对应货币的DataFrame列名为 "member_ticker" "currency"
如果等待列表为空则返回空的DataFrame
"""
try:
# 查询数据库是否存在等待列表,并按更新日期排序,限制数量
query = "SELECT id, company_code, currency FROM waiting_list WHERE status = 0 ORDER BY update_date ASC LIMIT 300"
cursor.execute(query)
result = cursor.fetchall()
if result:
print(f"读到等待列表,待处理公司数量: {len(result)}")
# 获取company_code和currency列表
companies_and_currencies = [(row[0], row[1], row[2]) for row in result]
# 返回DataFrame不再在此处更新status
return pd.DataFrame(companies_and_currencies, columns=["id", "member_ticker", "currency"])
else:
print("等待列表为空")
return pd.DataFrame(columns=["id", "member_ticker", "currency"])
except Exception as e:
print(f"获取等待列表失败: {str(e)}")
return pd.DataFrame(columns=["id", "member_ticker", "currency"])
def main():
"""
主函数负责初始化Bloomberg查询和数据库连接处理等待列表中的公司数据并更新状态
流程:
1. 初始化Bloomberg查询服务
2. 连接数据库
3. 获取等待列表
4. 处理每家公司数据
5. 更新处理状态
6. 关闭所有资源
"""
# 数据库连接配置
DB_HOST = "aws-0-ap-northeast-1.pooler.supabase.com"
DB_PORT = "5432"
DB_NAME = "postgres"
DB_USER = "postgres.kzexzbtpbnufbvrvkuae"
DB_PASSWORD = "cAuNDnJv0aj1NW9l"
# 初始化Bloomberg查询对象、数据库连接和游标
bquery = None
conn = None
cursor = None
try:
try:
bquery = blp.BlpQuery().start()
except Exception as e:
print(f"初始化bloomberg失败: {str(e)}")
raise # 重新抛出异常,确保外部调用者知道初始化失败
try:
# 构建连接字符串
conn_string = f"host={DB_HOST} port={DB_PORT} dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD} gssencmode=disable"
# 建立连接
conn = psycopg2.connect(conn_string)
cursor = conn.cursor()
print("成功连接到 Supabase Session Pooler")
except Exception as e:
print(f"连接Supabase Session Pooler或查询失败: {e}")
# 获取等待列表
waiting_list = get_waiting_list(cursor, conn)
if not waiting_list.empty:
print(f"开始处理 {len(waiting_list)} 家公司数据")
# 处理每家公司数据
for _, row in waiting_list.iterrows():
id = int(row["id"])
company_code = str(row["member_ticker"]) # 确保 company_code 是字符串
currency = row["currency"]
try:
process_stockcard_data(bquery, cursor, conn, company_code, stockcard_CONFIG["period"], currency)
# 更新处理状态和更新日期
try:
cursor.execute("UPDATE waiting_list SET status = 1, update_date = NOW() WHERE id = %s", (id,))
conn.commit()
print(f"成功更新等待列表ID: {id} 的状态和日期。")
except Exception as e:
conn.rollback()
print(f"更新等待列表ID: {id} 失败: {str(e)}")
print(f"成功处理并更新状态: {company_code}")
except Exception as e:
print(f"处理 {company_code} 时出错: {str(e)}")
conn.rollback()
# 执行删除重复数据的SQL
try:
cursor.execute('''
WITH DuplicateRows AS (
SELECT
id, -- 现在我们可以使用这个新的主键
ROW_NUMBER() OVER(
PARTITION BY company_code, currency, indicator, value_date
ORDER BY update_date DESC
) as rn
FROM
stockcard
)
DELETE FROM stockcard
WHERE id IN (
SELECT id
FROM DuplicateRows
WHERE rn > 1
);
''')
conn.commit()
print(f"成功执行删除重复数据SQL共删除 {cursor.rowcount} 条记录。")
except Exception as e:
print(f"执行 RemoveDuplicateStockcardEntries.sql 失败: {e}")
conn.rollback()
# 执行更新唯一公司代码的SQL
try:
cursor.execute('''
-- 如果物化视图存在则先删除它以解决relation already exists错误
DROP MATERIALIZED VIEW IF EXISTS public.unique_company_codes;
-- 创建物化视图 public.unique_company_codes包含唯一的公司代码和对应的公司名称
CREATE MATERIALIZED VIEW public.unique_company_codes AS
SELECT
s.Company_code, -- 公司代码
-- 使用子查询确保每个公司代码只关联一个公司名称
(SELECT cn_sub.value
FROM public.stockcard AS cn_sub
WHERE cn_sub.Company_code = s.Company_code
AND cn_sub.indicator = 'company_name'
ORDER BY cn_sub.value ASC -- 如果存在多个公司名称则按字母顺序选择第一个以确保唯一性
LIMIT 1) AS company_name -- 只取一个匹配的 company_name
FROM
-- 先选择出所有不为 NULL 的唯一公司代码
(SELECT DISTINCT Company_code FROM public.stockcard WHERE Company_code IS NOT NULL) s
ORDER BY
s.Company_code; -- 按公司代码排序方便查看和使用
-- 在物化视图的 Company_code 列上创建索引以便后续查询该视图时更快
-- 注意在创建物化视图之后需要重新创建索引
CREATE INDEX idx_unique_company_codes_company_code ON public.unique_company_codes (Company_code);
''')
conn.commit()
print("成功执行 UniqueCompanyCodes.sql 中的 SQL 语句(创建/刷新物化视图及索引)。")
except Exception as e:
print(f"执行 UniqueCompanyCodes.sql 失败: {e}")
conn.rollback()
else:
print("没有待处理的公司")
except Exception as e:
print(f"主程序出错: {str(e)}")
finally:
# 确保关闭所有资源
if cursor:
cursor.close()
if conn:
conn.close()
if bquery:
bquery.stop()
print("程序执行完毕,所有资源已释放")
if __name__ == "__main__":
main() # 重新抛出异常,确保外部调用者知道初始化失败

View File

@ -0,0 +1,653 @@
import os
import sys
import json
import uuid
import time
import requests
import websocket
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from datetime import datetime
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# --- Configurations adapted from legacy/bloomberg.py ---
CURRENCY_CONFIG = {
"Revenue": "SALES_REV_TURN",
"Net_Income": "EARN_FOR_COMMON",
"Cash_From_Operating": "CF_CASH_FROM_OPER",
"Capital_Expenditure": "CAPITAL_EXPEND",
"Free_Cash_Flow": "CF_FREE_CASH_FLOW",
"Dividends_Paid": "CF_DVD_PAID",
"Total_Assets": "BS_TOT_ASSET",
"Equity": "BS_TOT_EQY",
"Goodwill": "BS_GOODWILL",
"SG&A": "IS_SG_AND_A_EXPENSE",
"Selling&Marketing": "IS_SELLING_EXPENSES",
"General&Admin": "IS_GENERAL_AND_ADMIN_GAAP",
"R&D": "IS_RD_EXPEND",
"Depreciation": "IS_DEPR_EXP",
"Cash": "BS_CASH_NEAR_CASH_ITEM",
"Inventory": "BS_INVENTORIES",
"Accounts&Notes_Receivable": "BS_ACCT_NOTE_RCV",
"Prepaid": "BS_PREPAY",
"Property_Plant&Equipment": "BS_NET_FIX_ASSET",
"LT_Investment": "BS_LT_INVEST",
"Accounts_Payable": "BS_ACCT_PAYABLE",
"ST_Debt": "BS_ST_BORROW",
"LT_Debt": "BS_LT_BORROW",
"ST_Defer_Rev":"ST_DEFERRED_REVENUE",
"repurchase":"cf_proceeds_repurchase_equity"
}
NON_CURRENCY_CONFIG = {
"ROE": "RETURN_COM_EQY",
"ROA": "RETURN_ON_ASSET",
"ROCE": "RETURN_ON_CAPITAL_EMPLOYED",
"Gross_Margin": "GROSS_MARGIN",
"EBITDA_margin": "EBITDA_TO_REVENUE",
"Net_Profit_Margin": "PROF_MARGIN",
"Tax_Rate": "IS_STATUTORY_TAX_RATE",
"Inventory_Days": "INVENT_DAYS",
"Days_Sales_Outstanding": "ANNUALIZED_DAYS_SALES_OUTSTDG",
"Payables_Days": "ACCOUNTS_PAYABLE_TURNOVER_DAYS",
"Employee": "NUM_OF_EMPLOYEES",
"PE": "px_last/is_eps",
"PB": "PX_TO_BOOK_RATIO",
"Shareholders": "BS_NUM_OF_SHAREHOLDERS",
"Dividend_Payout_Ratio":"DVD_PAYOUT_RATIO",
"Total_Debt_Ratio":"TOT_DEBT_TO_TOT_ASSET",
"Net_Fixed_Asset_Turnover":"NET_FIX_ASSET_TURN",
"Asset_Turnover":"ASSET_TURNOVER",
"NetIncome_Growth":"earn_for_com_growth",
"Revenue_Growth":"sales_growth",
}
PRICE_CONFIG = {
"Last_Price": "PX_LAST",
"Market_Cap":"cur_mkt_cap",
}
STOCKCARD_CONFIG = {
"period": 10,
"unit": 100000000
}
class BloombergClient:
def __init__(self, jupyter_url="http://192.168.3.161:8888", password="Value609!"):
self.jupyter_url = jupyter_url.rstrip("/")
self.password = password
self.session = requests.Session()
self.kernel_id = None
self.ws = None
# Authenticate and setup connection
self._authenticate()
self._setup_kernel()
self._connect_websocket()
def _authenticate(self):
"""Authenticate with JupyterHub/Lab using password to get session cookies."""
print(f"Connecting to Jupyter at {self.jupyter_url}...")
try:
# 1. Get Login Page to fetch _xsrf (if needed)
r = self.session.get(f"{self.jupyter_url}/login")
if r.status_code != 200:
print(f"Failed to access login page: {r.status_code}")
xsrf = self.session.cookies.get("_xsrf", "")
# 2. Post Password
data = {"password": self.password}
if xsrf:
data["_xsrf"] = xsrf
login_resp = self.session.post(f"{self.jupyter_url}/login", data=data)
# 3. Verify Auth
api_resp = self.session.get(f"{self.jupyter_url}/api/status")
if api_resp.status_code == 200:
print("✅ Authentication successful.")
else:
raise Exception(f"Authentication failed. Status: {api_resp.status_code}")
except Exception as e:
print(f"❌ Error during authentication: {e}")
raise
def _setup_kernel(self):
"""Find an existing kernel or create a new one."""
try:
# List kernels
resp = self.session.get(f"{self.jupyter_url}/api/kernels")
kernels = resp.json()
# Try to find 'remote_env' or just pick the first available
target_kernel = None
for k in kernels:
if k.get('name') == 'remote_env': # Ideal match
target_kernel = k
break
if not target_kernel and kernels:
target_kernel = kernels[0] # Fallback
if target_kernel:
self.kernel_id = target_kernel['id']
print(f"✅ Found existing kernel: {self.kernel_id} ({target_kernel.get('name')})")
else:
print("No active kernels found. Starting a new one...")
# Start new kernel (assuming python3)
start_resp = self.session.post(f"{self.jupyter_url}/api/kernels", json={"name": "python3"})
if start_resp.status_code == 201:
self.kernel_id = start_resp.json()['id']
print(f"✅ Started new kernel: {self.kernel_id}")
else:
raise Exception(f"Failed to start kernel: {start_resp.text}")
except Exception as e:
print(f"❌ Error setting up kernel: {e}")
raise
def _connect_websocket(self):
"""Connect to the Kernel's WebSocket channel."""
base_ws_url = self.jupyter_url.replace("http", "ws")
ws_url = f"{base_ws_url}/api/kernels/{self.kernel_id}/channels"
# Extract cookies for WS connection
cookies = self.session.cookies.get_dict()
cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()])
try:
self.ws = websocket.create_connection(
ws_url,
header={"Cookie": cookie_str},
timeout=60 # Connection timeout
)
print("✅ WebSocket connected.")
except Exception as e:
print(f"❌ WebSocket connection failed: {e}")
raise
def execute_remote_code(self, code_str):
"""
Send code to remote kernel via WebSocket and wait for result.
Returns list of output strings (stdout).
"""
msg_id = str(uuid.uuid4())
msg = {
"header": {
"msg_id": msg_id,
"username": "client",
"session": str(uuid.uuid4()),
"msg_type": "execute_request",
"version": "5.3"
},
"parent_header": {},
"metadata": {},
"content": {
"code": code_str,
"silent": False,
"store_history": True,
"user_expressions": {},
"allow_stdin": False
}
}
self.ws.send(json.dumps(msg))
outputs = []
error = None
# Loop to receive messages until 'idle'
while True:
try:
resp = json.loads(self.ws.recv())
msg_type = resp['msg_type']
# Check for idle status (command finished)
if msg_type == 'status':
if resp['content']['execution_state'] == 'idle':
# Make sure it's the response to OUR request (simple check)
# Ideally check parent_header.msg_id == msg_id
if resp['parent_header'].get('msg_id') == msg_id:
break
elif msg_type == 'stream':
# Stdout/Stderr
outputs.append(resp['content']['text'])
elif msg_type == 'error':
error = resp['content']['evalue']
print(f"❌ Remote Execution Error: {error}")
for line in resp['content']['traceback']:
print(line)
except Exception as e:
print(f"Error receiving WS message: {e}")
break
if error:
raise Exception(f"Remote Code Execution Failed: {error}")
return "".join(outputs)
# --- Database Operations ---
def _get_db_connection(self):
"""Create a database connection, creating the DB if it doesn't exist."""
db_host = os.getenv("DB_HOST")
db_user = os.getenv("DB_USER")
db_pass = os.getenv("DB_PASSWORD")
db_name = os.getenv("DB_NAME")
db_port = os.getenv("DB_PORT", "5432")
# 1. Try connecting to the specific DB
try:
conn = psycopg2.connect(
host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port
)
return conn
except psycopg2.OperationalError as e:
if f'database "{db_name}" does not exist' in str(e):
print(f"Database '{db_name}' does not exist. Creating it...")
# Connect to 'postgres' to create DB
sys_conn = psycopg2.connect(
host=db_host, user=db_user, password=db_pass, dbname="postgres", port=db_port
)
sys_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
cur = sys_conn.cursor()
cur.execute(f'CREATE DATABASE "{db_name}"')
cur.close()
sys_conn.close()
print(f"✅ Database '{db_name}' created.")
# Retry connection
conn = psycopg2.connect(
host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port
)
return conn
else:
raise e
def _ensure_schema(self, conn):
"""Ensure the necessary tables exist."""
with conn.cursor() as cur:
# Create table if not exists (Inferred schema from legacy use)
cur.execute("""
CREATE TABLE IF NOT EXISTS stockcard (
id SERIAL PRIMARY KEY,
Company_code TEXT,
update_date DATE,
currency TEXT,
indicator TEXT,
value TEXT,
value_date DATE,
source TEXT
);
""")
# Ensure column exists if table was already created
try:
cur.execute("ALTER TABLE stockcard ADD COLUMN IF NOT EXISTS source TEXT;")
except Exception:
conn.rollback() # Should not happen with IF NOT EXISTS but good practice
conn.commit()
def save_data(self, data_list):
"""Insert a list of data dictionaries into the database."""
if not data_list:
return
conn = self._get_db_connection()
self._ensure_schema(conn)
try:
with conn.cursor() as cur:
args_list = []
for d in data_list:
# Format: (Company_code, update_date, currency, indicator, value, value_date, source)
args_list.append((
d['Company_code'],
d['update_date'],
d['currency'],
d['indicator'],
d['value'],
d['value_date'],
'bloomberg'
))
query = """
INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date, source)
VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
cur.executemany(query, args_list)
conn.commit()
print(f"✅ Saved {len(data_list)} records to database.")
finally:
conn.close()
def run_cleanup(self):
"""Run deduplication and view refresh logic."""
print("Running database cleanup...")
conn = self._get_db_connection()
try:
with conn.cursor() as cur:
# 1. Deduplication
cur.execute('''
WITH DuplicateRows AS (
SELECT
id,
ROW_NUMBER() OVER(
PARTITION BY company_code, currency, indicator, value_date
ORDER BY update_date DESC
) as rn
FROM
stockcard
)
DELETE FROM stockcard
WHERE id IN (
SELECT id
FROM DuplicateRows
WHERE rn > 1
);
''')
# 2. Materialized View Refresh
cur.execute('''
DROP MATERIALIZED VIEW IF EXISTS public.unique_company_codes;
CREATE MATERIALIZED VIEW public.unique_company_codes AS
SELECT
s.Company_code,
(SELECT cn_sub.value
FROM public.stockcard AS cn_sub
WHERE cn_sub.Company_code = s.Company_code
AND cn_sub.indicator = 'company_name'
ORDER BY cn_sub.value ASC
LIMIT 1) AS company_name
FROM
(SELECT DISTINCT Company_code FROM public.stockcard WHERE Company_code IS NOT NULL) s
ORDER BY
s.Company_code;
CREATE INDEX idx_unique_company_codes_company_code ON public.unique_company_codes (Company_code);
''')
conn.commit()
print("✅ Cleanup and View Refresh completed.")
except Exception as e:
print(f"❌ Cleanup failed: {e}")
conn.rollback()
finally:
conn.close()
# --- Core Fetching Logic (Generating Remote Python) ---
def fetch_company(self, market, symbol):
"""
Main entry point to fetch data for a single company.
Generates Python code, executes it remotely, parses result, saves to DB.
"""
company_code = f"{symbol} {market} Equity"
today_str = datetime.now().strftime('%Y-%m-%d')
print(f"🚀 Starting fetch for: {company_code}")
# 0. Prep Remote Environment
init_code = """
import json
import pandas as pd
from datetime import datetime
try:
from blp import blp
except ImportError:
print("Error: 'blp' module not found.")
# Ensure bquery is started
if 'bquery' not in globals():
try:
bquery = blp.BlpQuery().start()
print("BlpQuery started.")
except Exception as e:
print(f"Error starting BlpQuery: {e}")
"""
self.execute_remote_code(init_code)
# 1. Fetch Basic Info
print("Fetching Basic Data...")
basic_data = self._fetch_basic_remote(company_code, "USD") # Default USD for basic? Legacy uses arg.
self.save_data(basic_data)
# Get Currency for subsequent calls (from basic data provided elsewhere or just assume config?)
# Legacy script takes 'currency' from waiting list. Here we need to decide.
# Usually JP -> JPY, US -> USD, VN -> VND.
# Logic: Infer currency from Market?
currency = "USD"
if "JP" in market.upper(): currency = "JPY"
elif "VN" in market.upper(): currency = "VND"
elif "CN" in market.upper(): currency = "CNY"
elif "HK" in market.upper(): currency = "HKD"
print(f"Using currency: {currency}")
# 2. Fetch Currency Data
print("Fetching Currency Data...")
curr_data = self._fetch_series_remote(company_code, currency, CURRENCY_CONFIG, "currency")
self.save_data(curr_data)
# 3. Fetch Non-Currency Data
print("Fetching Non-Currency Data...")
non_curr_data = self._fetch_series_remote(company_code, currency, NON_CURRENCY_CONFIG, "non_currency")
self.save_data(non_curr_data)
# 4. Fetch Price Data
print("Fetching Price Data...")
price_data = self._fetch_series_remote(company_code, currency, PRICE_CONFIG, "price")
self.save_data(price_data)
# 5. Cleanup
self.run_cleanup()
print(f"✅ Completed processing for {company_code}")
def _fetch_basic_remote(self, company_code, currency):
"""Generates code to fetch basic data similar to get_basic_stock_data"""
# We construct a python script to run on the server that returns a JSON list.
code = f"""
def get_basic():
company = "{company_code}"
curr = "{currency}"
res_list = []
# 1. BQL query for name, pe, pb, mkt_cap, foreign_rev
q = f"for(['{{company}}']) get(name,listing_date,pe_ratio,px_to_book_ratio,cur_mkt_cap(currency={{curr}}),PCT_REVENUE_FROM_FOREIGN_SOURCES)"
try:
df = bquery.bql(q)
if not df.empty:
# Helper to safely get value
def get_val(df, field_name):
# Matches either exact field or contains
# The legacy script has logic: result[result['field'] == 'name']
# But BQL return structure might vary. Assuming standard 'field' column.
# Adapting legacy logic:
if field_name == 'name':
rows = df[df['field'] == 'name']
elif 'cur_mkt_cap' in field_name:
rows = df[df['field'].str.contains('cur_mkt_cap')]
elif 'FOREIGN' in field_name:
rows = df[df['field'].str.contains('FOREIGN')]
else:
rows = df[df['field'] == field_name]
if not rows.empty:
val = rows['value'].iloc[0]
return str(val) if val is not None else None
return None
res_list.append({{"indicator": "company_name", "value": get_val(df, 'name')}})
res_list.append({{"indicator": "pe_ratio", "value": get_val(df, 'pe_ratio')}})
res_list.append({{"indicator": "pb_ratio", "value": get_val(df, 'px_to_book_ratio')}})
res_list.append({{"indicator": "market_cap", "value": get_val(df, 'cur_mkt_cap')}})
res_list.append({{"indicator": "Rev_Abroad", "value": get_val(df, 'FOREIGN')}})
except Exception as e:
print(f"Basic BQL Error: {{e}}")
# 2. BDP for IPO and Dividend
try:
did = bquery.bdp([company], ["DIVIDEND_12_MONTH_YIELD"])
if not did.empty and 'DIVIDEND_12_MONTH_YIELD' in did.columns:
res_list.append({{"indicator": "dividend_yield", "value": str(did['DIVIDEND_12_MONTH_YIELD'][0])}})
ipo = bquery.bdp([company], ["EQY_INIT_PO_DT"])
if not ipo.empty and 'EQY_INIT_PO_DT' in ipo.columns:
res_list.append({{"indicator": "IPO_date", "value": str(ipo['EQY_INIT_PO_DT'][0])}})
except Exception as e:
print(f"Basic BDP Error: {{e}}")
# Format result
final_res = []
today = datetime.now().strftime('%Y-%m-%d')
for item in res_list:
if item['value']:
final_res.append({{
"Company_code": company,
"update_date": today,
"currency": curr,
"indicator": item['indicator'],
"value": item['value'],
"value_date": today
}})
print("JSON_START")
print(json.dumps(final_res))
print("JSON_END")
get_basic()
"""
return self._execute_and_parse(code)
def _fetch_series_remote(self, company_code, currency, config_dict, result_type):
"""Generates code to fetch series data using BDH (Historical Data)"""
config_json = json.dumps(config_dict)
period_years = STOCKCARD_CONFIG['period']
# Calculate start/end dates
start_year = datetime.now().year - period_years
start_date = f"{start_year}0101"
end_date = datetime.now().strftime('%Y%m%d')
# Define options locally and serialize to JSON string to inject into remote code
bdh_options = {
'periodicitySelection': 'YEARLY',
'currency': currency,
'nonTradingDayFillOption': 'ALL_CALENDAR_DAYS',
'nonTradingDayFillMethod': 'PREVIOUS_VALUE'
}
bdh_opts_json = json.dumps(bdh_options)
code = f"""
def get_series():
company = "{company_code}"
curr = "{currency}"
config = {config_json}
# Invert Config to map Mnemonic -> Human Indicator Name for easier parsing
# config is {{"HumanName": "Mnemonic"}}
mnemonic_map = {{v.upper(): k for k, v in config.items()}}
fields = list(mnemonic_map.keys())
res_list = []
try:
# Fetch BDH with injected options
df = bquery.bdh(
[company],
fields,
start_date='{start_date}',
end_date='{end_date}',
options={bdh_opts_json}
)
if not df.empty:
for _, row in df.iterrows():
# Get Date
# date col might be 'date' or 'DATE'
date_val = None
if 'date' in df.columns: date_val = row['date']
elif 'DATE' in df.columns: date_val = row['DATE']
if not date_val: continue
date_str = str(date_val)[:10] # YYYY-MM-DD
# Iterate through fields
for mnemonic, indicator_name in mnemonic_map.items():
if mnemonic in df.columns:
val = row[mnemonic]
if val is None or str(val).lower() == 'nan' or str(val).lower() == 'none':
continue
res_list.append({{
"Company_code": company,
"update_date": datetime.now().strftime('%Y-%m-%d'),
"currency": curr,
"indicator": indicator_name,
"value": str(val),
"value_date": date_str
}})
except Exception as e:
print(f"BDH Error: {{e}}")
print("JSON_START")
print(json.dumps(res_list))
print("JSON_END")
get_series()
"""
return self._execute_and_parse(code)
def _execute_and_parse(self, code):
"""Execute code and parse [JSON_START]...[JSON_END]"""
raw_output = self.execute_remote_code(code)
# Simple parser
try:
start_idx = raw_output.find("JSON_START")
end_idx = raw_output.find("JSON_END")
if start_idx != -1 and end_idx != -1:
json_str = raw_output[start_idx + len("JSON_START"):end_idx].strip()
return json.loads(json_str)
else:
print(f"⚠️ No JSON output found in remote response. Raw output:\n{raw_output[:200]}...")
return []
except Exception as e:
print(f"❌ Error parsing JSON from remote: {e}")
return []
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: python bloomberg_client.py <MARKET> <SYMBOL>")
print("Example: python bloomberg_client.py JP 7203")
sys.exit(1)
market_arg = sys.argv[1]
symbol_arg = sys.argv[2]
try:
client = BloombergClient()
client.fetch_company(market_arg, symbol_arg)
except Exception as e:
print(f"CRITICAL ERROR: {e}")
sys.exit(1)

View File

@ -1,105 +0,0 @@
import os
import sys
import pandas as pd
from unittest.mock import MagicMock
from src.fetchers.hk_fetcher import HkFetcher
# Mock the IFindClient to avoid actual network requests and credentials
class MockIFindClient:
def __init__(self, refresh_token):
pass
def post(self, endpoint, params):
# Simulate data availability logic
# If querying for 20261231, return empty
# If querying for 20251231, return data
# Extract date from params
date = "unknown"
if "indipara" in params:
for item in params["indipara"]:
if "indiparams" in item and len(item["indiparams"]) > 0:
param0 = item["indiparams"][0]
if len(param0) == 8: # YYYYMMDD
date = param0
break
# Test Case 1: Detect year logic
if "20261231" in date:
return {"tables": [{"time": [], "table": {}}]} # Empty
if "20251231" in date:
return {
"tables": [{
"time": ["2025-12-31"],
"table": {
"revenue_oas": [1000],
"roe": [15.5],
"total_oi": [5000]
}
}]
}
if "20241231" in date:
return {
"tables": [{
"time": ["2024-12-31"],
"table": {
"revenue_oas": [900],
"roe": [14.0],
"total_oi": [4000]
}
}]
}
return {"tables": []}
def test_hk_fetcher_year_detection():
print("Testing HK Fetcher Year Detection Logic...")
# Mock time.strftime to return 2026
import time
original_strftime = time.strftime
time.strftime = MagicMock(return_value="2026")
try:
fetcher = HkFetcher("fake_token")
# Replace the client with our mock
fetcher.cli = MockIFindClient("fake_token")
# 1. Test get_income_statement logic
print("\nTesting _fetch_financial_data_annual (via income statement)...")
# We expect it to try 2026 (fail), then 2025 (succeed), then fetch 2025-2021
df_income = fetcher.get_income_statement("0700.HK")
if not df_income.empty:
dates = df_income['end_date'].tolist()
print(f"Fetched Income Statement Dates: {dates}")
if "20251231" in dates and "20261231" not in dates:
print("PASS: Correctly anchored to 2025 instead of 2026.")
else:
print(f"FAIL: Logic incorrect. Dates found: {dates}")
else:
print("FAIL: No data returned.")
# 2. Test get_financial_ratios logic
print("\nTesting get_financial_ratios...")
df_ratios = fetcher.get_financial_ratios("0700.HK")
if not df_ratios.empty:
dates = df_ratios['end_date'].tolist()
print(f"Fetched Ratios Dates: {dates}")
if "20251231" in dates and "20261231" not in dates:
print("PASS: Correctly anchored to 2025 instead of 2026.")
else:
print(f"FAIL: Logic incorrect. Dates found: {dates}")
else:
print("FAIL: No data returned.")
finally:
# Restore time
time.strftime = original_strftime
if __name__ == "__main__":
test_hk_fetcher_year_detection()