diff --git a/.env b/.env index 184020c..053075a 100644 --- a/.env +++ b/.env @@ -5,4 +5,11 @@ OPENAI_API_KEY=AIzaSyAT5QXWoACp87oqg1OK4USTIwc2RsJHvIc OPENAI_BASE_URL=https://generativelanguage.googleapis.com/v1beta/openai/ LLM_MODEL=gemini-2.5-flash IFIND_REFRESH_TOKEN=eyJzaWduX3RpbWUiOiIyMDI1LTEyLTIxIDE3OjQwOjU2In0=.eyJ1aWQiOiI3MjMwNDQwNzciLCJ1c2VyIjp7ImFjY2Vzc1Rva2VuIjoiYzk5NjdlNGUzNmJjZDliNzI2ZjM5YmQ5MGFkOWQwY2Q4NzE3OTllZi5zaWduc19Oekl6TURRME1EYzMiLCJhY2Nlc3NUb2tlbkV4cGlyZWRUaW1lIjoiMjAyNS0xMi0yMSAxNzo0MDo1NiIsImFjY291bnQiOiJ3eGhsdHowMDEiLCJhdXRoVXNlckluZm8iOnsiRVRyYW5zZmVyIjp0cnVlLCJFZXhjZWxQYXllcnMiOiIxNzk4NjgzMDAyMDAwIn0sImNvZGVDU0kiOltdLCJjb2RlWnpBdXRoIjpbXSwiaGFzQUlQcmVkaWN0IjpmYWxzZSwiaGFzQUlUYWxrIjpmYWxzZSwiaGFzQ0lDQyI6ZmFsc2UsImhhc0NTSSI6ZmFsc2UsImhhc0V2ZW50RHJpdmUiOmZhbHNlLCJoYXNGVFNFIjpmYWxzZSwiaGFzRmFzdCI6ZmFsc2UsImhhc0Z1bmRWYWx1YXRpb24iOmZhbHNlLCJoYXNISyI6dHJ1ZSwiaGFzTE1FIjpmYWxzZSwiaGFzTGV2ZWwyIjpmYWxzZSwiaGFzUmVhbENNRSI6ZmFsc2UsImhhc1RyYW5zZmVyIjpmYWxzZSwiaGFzVVMiOmZhbHNlLCJoYXNVU0FJbmRleCI6ZmFsc2UsImhhc1VTREVCVCI6ZmFsc2UsIm1hcmtldEF1dGgiOnsiRENFIjpmYWxzZX0sIm1heE9uTGluZSI6MSwibm9EaXNrIjpmYWxzZSwicHJvZHVjdFR5cGUiOiJTVVBFUkNPTU1BTkRQUk9EVUNUIiwicmVmcmVzaFRva2VuIjoiIiwicmVmcmVzaFRva2VuRXhwaXJlZFRpbWUiOiIyMDI2LTEyLTMxIDEwOjEwOjAyIiwic2Vzc3Npb24iOiI4ZmYxNzVmYzNmZDkxMzA3YTRlOWIzMTE3Njc4NDlhNiIsInNpZEluZm8iOns2NDoiMTExMTExMTExMTExMTExMTExMTExMTExIiwxOiIxMDEiLDI6IjEiLDY3OiIxMDExMTExMTExMTExMTExMTExMTExMTEiLDM6IjEiLDY5OiIxMTExMTExMTExMTExMTExMTExMTExMTExIiw1OiIxIiw2OiIxIiw3MToiMTExMTExMTExMTExMTExMTExMTExMTAwIiw3OiIxMTExMTExMTExMSIsODoiMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDEiLDEzODoiMTExMTExMTExMTExMTExMTExMTExMTExMSIsMTM5OiIxMTExMTExMTExMTExMTExMTExMTExMTExIiwxNDA6IjExMTExMTExMTExMTExMTExMTExMTExMTEiLDE0MToiMTExMTExMTExMTExMTExMTExMTExMTExMSIsMTQyOiIxMTExMTExMTExMTExMTExMTExMTExMTExIiwxNDM6IjExIiw4MDoiMTExMTExMTExMTExMTExMTExMTExMTExIiw4MToiMTExMTExMTExMTExMTExMTExMTExMTExIiw4MjoiMTExMTExMTExMTExMTExMTExMTEwMTEwIiw4MzoiMTExMTExMTExMTExMTExMTExMDAwMDAwIiw4NToiMDExMTExMTExMTExMTExMTExMTExMTExIiw4NzoiMTExMTExMTEwMDExMTExMDExMTExMTExIiw4OToiMTExMTExMTEwMTEwMTAwMDAwMDAxMTExIiw5MDoiMTExMTEwMTExMTExMTExMTEwMDAxMTExMTAiLDkzOiIxMTExMTExMTExMTExMTExMTAwMDAxMTExIiw5NDoiMTExMTExMTExMTExMTExMTExMTExMTExMSIsOTY6IjExMTExMTExMTExMTExMTExMTExMTExMTEiLDk5OiIxMDAiLDEwMDoiMTExMTAxMTExMTExMTExMTExMCIsMTAyOiIxIiw0NDoiMTEiLDEwOToiMSIsNTM6IjExMTExMTExMTExMTExMTExMTExMTExMSIsNTQ6IjExMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwMDAwIiw1NzoiMDAwMDAwMDAwMDAwMDAwMDAwMDAxMDAwMDAwMDAiLDYyOiIxMTExMTExMTExMTExMTExMTExMTExMTEiLDYzOiIxMTExMTExMTExMTExMTExMTExMTExMTEifSwidGltZXN0YW1wIjoiMTc2NjMxMDA1NjAzNyIsInRyYW5zQXV0aCI6ZmFsc2UsInR0bFZhbHVlIjowLCJ1aWQiOiI3MjMwNDQwNzciLCJ1c2VyVHlwZSI6IkZSRUVJQUwiLCJ3aWZpbmRMaW1pdE1hcCI6e319fQ==.03DB82A62F865C511B2C2BD464B535B62B0081C47AE33540D5D236CEF0C1FE9D -GEMINI_API_KEY=AIzaSyAT5QXWoACp87oqg1OK4USTIwc2RsJHvIc \ No newline at end of file +GEMINI_API_KEY=AIzaSyAT5QXWoACp87oqg1OK4USTIwc2RsJHvIc + +# 数据库配置 +DB_HOST=192.168.3.195 +DB_PORT=5432 +DB_USER=value +DB_PASSWORD=Value609! +DB_NAME=FA \ No newline at end of file diff --git a/legacy/bloomberg.py b/legacy/bloomberg.py new file mode 100644 index 0000000..fd21a37 --- /dev/null +++ b/legacy/bloomberg.py @@ -0,0 +1,574 @@ +from datetime import datetime +import pandas as pd +from blp import blp +import psycopg2 +import json + +currency_config = { + "Revenue": "SALES_REV_TURN", + "Net_Income": "EARN_FOR_COMMON", + "Cash_From_Operating": "CF_CASH_FROM_OPER", + "Capital_Expenditure": "CAPITAL_EXPEND", + "Free_Cash_Flow": "CF_FREE_CASH_FLOW", + "Dividends_Paid": "CF_DVD_PAID", + "Total_Assets": "BS_TOT_ASSET", + "Equity": "BS_TOT_EQY", + "Goodwill": "BS_GOODWILL", + "SG&A": "IS_SG_AND_A_EXPENSE", + "Selling&Marketing": "IS_SELLING_EXPENSES", + "General&Admin": "IS_GENERAL_AND_ADMIN_GAAP", + "R&D": "IS_RD_EXPEND", + "Depreciation": "IS_DEPR_EXP", + "Cash": "BS_CASH_NEAR_CASH_ITEM", + "Inventory": "BS_INVENTORIES", + "Accounts&Notes_Receivable": "BS_ACCT_NOTE_RCV", + "Prepaid": "BS_PREPAY", + "Property_Plant&Equipment": "BS_NET_FIX_ASSET", + "LT_Investment": "BS_LT_INVEST", + "Accounts_Payable": "BS_ACCT_PAYABLE", + "ST_Debt": "BS_ST_BORROW", + "LT_Debt": "BS_LT_BORROW", + "ST_Defer_Rev":"ST_DEFERRED_REVENUE", + "repurchase":"cf_proceeds_repurchase_equity" +} + +non_currency_config = { + "ROE": "RETURN_COM_EQY", + "ROA": "RETURN_ON_ASSET", + "ROCE": "RETURN_ON_CAPITAL_EMPLOYED", + "Gross_Margin": "GROSS_MARGIN", + "EBITDA_margin": "EBITDA_TO_REVENUE", + "Net_Profit_Margin": "PROF_MARGIN", + "Tax_Rate": "IS_STATUTORY_TAX_RATE", + "Inventory_Days": "INVENT_DAYS", + "Days_Sales_Outstanding": "ANNUALIZED_DAYS_SALES_OUTSTDG", + "Payables_Days": "ACCOUNTS_PAYABLE_TURNOVER_DAYS", + "Employee": "NUM_OF_EMPLOYEES", + "PE": "px_last/is_eps", + "PB": "PX_TO_BOOK_RATIO", + "Shareholders": "BS_NUM_OF_SHAREHOLDERS", + "Dividend_Payout_Ratio":"DVD_PAYOUT_RATIO", + "Total_Debt_Ratio":"TOT_DEBT_TO_TOT_ASSET", + "Net_Fixed_Asset_Turnover":"NET_FIX_ASSET_TURN", + "Asset_Turnover":"ASSET_TURNOVER", + "NetIncome_Growth":"earn_for_com_growth", + "Revenue_Growth":"sales_growth", +} + +price_config= { + "Last_Price": "PX_LAST", + "Market_Cap":"cur_mkt_cap", +} + +# 基础配置 +stockcard_CONFIG = { + "period": 10, + "unit": 100000000 +} + + +def process_stockcard_data(bquery, cursor, conn, company_code, period, currency): + """ + 处理单个公司的股票卡片数据,包括基本信息、非货币数据、货币数据和价格数据,并将其存入数据库。 + + 参数: + bquery: Bloomberg查询对象。 + cursor: 数据库游标。 + conn: 数据库连接。 + company_code (str): 公司代码,例如 'IBM US Equity'。 + period (int): 查询的年度周期数。 + currency (str): 货币代码,例如 'USD', 'CNY'。 + """ + try: + # 由于原代码中此处后续是文件读取和数据库连接操作, + # 为了保证资源的正确释放,添加 finally 子句 + # 同时添加 except 子句来捕获可能的异常 + # 处理输入信息 + company_code = company_code + " Equity" + period = int(period) + currency = currency + + # 获取基本信息 + stockcard_data = get_basic_stock_data(bquery, company_code, currency) + + # 插入数据到数据库 + for data in stockcard_data: + cursor.execute(""" + INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date) + VALUES (%s, %s, %s, %s, %s, %s) + """, ( + data['Company_code'], + data['update_date'], + data['currency'], + data['indicator'], + data['value'], + data['value_date'] + )) + conn.commit() + + # 处理非货币数据 + process_non_currency_data(bquery, cursor, conn, company_code, period, currency) + + # 处理货币数据 + process_currency_data(bquery, cursor, conn, company_code, period, currency) + + # 处理价格数据 + process_price_data(bquery, cursor, conn, company_code, period, currency) + + print(f"处理完成: {company_code}") + + except Exception as e: + # 这里可以添加具体的异常处理逻辑,例如记录日志等 + print(f"捕获到异常: {str(e)}") + print(f"处理数据时发生错误: {str(e)}") + raise + +def get_basic_stock_data(bquery, company_code, currency): + """ + 从Bloomberg获取股票的基础数据,包括公司名称、IPO日期、市盈率、市净率、股息收益率、市值和海外收入占比。 + + 参数: + bquery: Bloomberg查询对象。 + company_code (str): 公司代码,例如 'IBM US Equity'。 + currency (str): 货币代码,用于市值查询,例如 'USD', 'CNY'。 + + 返回: + list: 包含股票基础数据的字典列表,每个字典代表一个指标及其值。 + """ + print(f"Processing company: {company_code}") # 调试输出 + query = f"for(['{company_code.strip()}']) get(name,listing_date,pe_ratio,px_to_book_ratio,cur_mkt_cap(currency={currency}),PCT_REVENUE_FROM_FOREIGN_SOURCES)" + result = bquery.bql(query) + did_result = bquery.bdp([f"{company_code}"], ["DIVIDEND_12_MONTH_YIELD"]) + IPO_date = bquery.bdp([f"{company_code}"], ["EQY_INIT_PO_DT"]) + + return [ + { + 'Company_code': company_code, + 'update_date': datetime.now().strftime('%Y-%m-%d'), + 'currency': currency, + 'indicator': 'company_name', + 'value': result[result['field'] == 'name']['value'].iloc[0], + 'value_date': datetime.now().strftime('%Y-%m-%d') + }, + { + 'Company_code': company_code, + 'update_date': datetime.now().strftime('%Y-%m-%d'), + 'currency': currency, + 'indicator': 'IPO_date', + 'value': str(IPO_date['EQY_INIT_PO_DT'][0]), + 'value_date': datetime.now().strftime('%Y-%m-%d') + }, + { + 'Company_code': company_code, + 'update_date': datetime.now().strftime('%Y-%m-%d'), + 'currency': currency, + 'indicator': 'pe_ratio', + 'value': str(result[result['field'] == 'pe_ratio']['value'].iloc[0]), + 'value_date': datetime.now().strftime('%Y-%m-%d') + }, + { + 'Company_code': company_code, + 'update_date': datetime.now().strftime('%Y-%m-%d'), + 'currency': currency, + 'indicator': 'pb_ratio', + 'value': str(result[result['field'] == 'px_to_book_ratio']['value'].iloc[0]), + 'value_date': datetime.now().strftime('%Y-%m-%d') + }, + { + 'Company_code': company_code, + 'update_date': datetime.now().strftime('%Y-%m-%d'), + 'currency': currency, + 'indicator': 'dividend_yield', + 'value': str(did_result['DIVIDEND_12_MONTH_YIELD'][0]), + 'value_date': datetime.now().strftime('%Y-%m-%d') + }, + { + 'Company_code': company_code, + 'update_date': datetime.now().strftime('%Y-%m-%d'), + 'currency': currency, + 'indicator': 'market_cap', + 'value': str(result[result['field'].str.contains('cur_mkt_cap')]['value'].iloc[0]), + 'value_date': datetime.now().strftime('%Y-%m-%d') + }, + { + 'Company_code': company_code, + 'update_date': datetime.now().strftime('%Y-%m-%d'), + 'currency': currency, + 'indicator': 'Rev_Abroad', + 'value': str(result[result['field'].str.contains('FOREIGN')]['value'].iloc[0]), + 'value_date': datetime.now().strftime('%Y-%m-%d') + } + + ] + +def process_currency_data(bquery, cursor, conn, company_code, period, currency): + """ + 处理货币相关的财务数据,通过Bloomberg BQL查询获取指定公司、周期和货币的财务指标数据, + 并将这些数据批量插入到 `stockcard` 表中。 + + 参数: + bquery: Bloomberg查询对象。 + cursor: 数据库游标。 + conn: 数据库连接。 + company_code (str): 公司代码,例如 'IBM US Equity'。 + period (int): 查询的年度周期数。 + currency (str): 货币代码,例如 'USD', 'CNY'。 + """ + data_to_insert = [] + for key, value_bql in currency_config.items(): + time_series_query = f"for(['{company_code.strip()}']) get({value_bql}(currency={currency},fa_period_offset=range(-{period}A, 0A),fa_period_type=A))" + print(f"Executing currency series query for {key} ({currency}): {time_series_query}") + result = bquery.bql(time_series_query) + + # 检查结果是否为空或不包含预期字段 + if result.empty or 'value' not in result.columns or 'secondary_value' not in result.columns: + print(f"No data found for currency indicator: {key}") + continue + + # 提取PERIOD_END_DATE对应的值和日期 + filtered_results = result[result["secondary_name"] == 'PERIOD_END_DATE'] + + if filtered_results.empty: + print(f"No PERIOD_END_DATE found for currency indicator: {key}") + continue + + for _, row in filtered_results.iterrows(): + date_str = row["secondary_value"] + value = row["value"] + + try: + # 确保日期格式正确 + formatted_date = datetime.strptime(date_str[:10], '%Y-%m-%d').strftime('%Y-%m-%d') + # 转换值为浮点数,处理None、空字符串和'None'字符串 + numeric_value = float(value) if value is not None and str(value).strip() and str(value).strip().lower() != 'none' else None + + data_to_insert.append(( + company_code, + datetime.now().strftime('%Y-%m-%d'), + currency, + key, + numeric_value, + formatted_date + )) + except (ValueError, TypeError) as e: + print(f"Skipping invalid currency data for {key} on {date_str}: {value}. Error: {e}") + continue + + if data_to_insert: + insert_query = """ + INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date) + VALUES (%s, %s, %s, %s, %s, %s) + """ + try: + cursor.executemany(insert_query, data_to_insert) + conn.commit() + print(f"Successfully inserted {len(data_to_insert)} currency data points for {company_code}.") + except Exception as e: + conn.rollback() # 回滚所有数据 + print(f"Failed to insert currency data for {company_code}. Error: {e}") + +def process_price_data(bquery, cursor, conn, company_code, period, currency): + """ + 处理价格数据并存入数据库。 + 通过Bloomberg BQL查询获取指定公司、周期和货币的价格指标数据, + 然后将这些数据批量插入到 `stockcard` 表中。 + + 参数: + bquery: Bloomberg查询对象。 + cursor: 数据库游标。 + conn: 数据库连接。 + company_code (str): 公司代码,例如 'IBM US Equity'。 + period (int): 查询的年度周期数。 + currency (str): 货币代码 (例如: 'USD', 'CNY')。 + """ + data_to_insert = [] + for key, value_bql in price_config.items(): + # 注意:价格数据查询通常是按日或按年,per='Y' 表示按年频率 + time_series_query = f"for(['{company_code.strip()}']) get({value_bql}(currency={currency},fill='PREV', per='Y', start='-{period}Y'))" + print(f"Executing price series query for {key}: {time_series_query}") + result = bquery.bql(time_series_query) + + if result.empty or 'value' not in result.columns or 'secondary_value' not in result.columns: + print(f"No price data found for indicator: {key}") + continue + + # 提取DATE对应的值和日期 + filtered_results = result[result["secondary_name"] == 'DATE'] + + if filtered_results.empty: + print(f"No DATE found for price indicator: {key}") + continue + + for _, row in filtered_results.iterrows(): + date_str = row["secondary_value"] + value = row["value"] + + try: + formatted_date = datetime.strptime(date_str[:10], '%Y-%m-%d').strftime('%Y-%m-%d') + numeric_value = float(value) if value is not None and str(value).strip() and str(value).strip().lower() != 'none' else None + + data_to_insert.append(( + company_code, + datetime.now().strftime('%Y-%m-%d'), + currency, + key, + numeric_value, + formatted_date + )) + except (ValueError, TypeError) as e: + print(f"Skipping invalid price data for {key} on {date_str}: {value}. Error: {e}") + continue + + if data_to_insert: + insert_query = """ + INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date) + VALUES (%s, %s, %s, %s, %s, %s) + """ + try: + cursor.executemany(insert_query, data_to_insert) + conn.commit() + print(f"Successfully inserted {len(data_to_insert)} price data points for {company_code}.") + except Exception as e: + conn.rollback() # 回滚所有数据 + print(f"Failed to insert price data for {company_code}. Error: {e}") + +def process_non_currency_data(bquery, cursor, conn, company_code, period, currency): + """ + 处理非货币相关的财务数据,通过Bloomberg BQL查询获取指定公司和周期的非货币财务指标数据, + 并将这些数据批量插入到 `stockcard` 表中。 + + 参数: + bquery: Bloomberg查询对象。 + cursor: 数据库游标。 + conn: 数据库连接。 + company_code (str): 公司代码,例如 'IBM US Equity'。 + period (int): 查询的年度周期数。 + currency (str): 货币代码 (例如: 'USD', 'CNY')。此参数在此函数中可能不直接用于BQL查询,但作为数据记录的一部分。 + """ + data_to_insert = [] + for key, value_bql in non_currency_config.items(): + time_series_query = f"for(['{company_code.strip()}']) get({value_bql}(fa_period_offset=range(-{period}A, 0A),fa_period_type=A))" + print(f"Executing non-currency series query for {key}: {time_series_query}") + result = bquery.bql(time_series_query) + + if result.empty or 'value' not in result.columns or 'secondary_value' not in result.columns: + print(f"No data found for non-currency indicator: {key}") + continue + + # 提取PERIOD_END_DATE对应的值和日期 + filtered_results = result[result["secondary_name"] == 'PERIOD_END_DATE'] + + if filtered_results.empty: + print(f"No PERIOD_END_DATE found for non-currency indicator: {key}") + continue + + for _, row in filtered_results.iterrows(): + date_str = row["secondary_value"] + value = row["value"] + + try: + formatted_date = datetime.strptime(date_str[:10], '%Y-%m-%d').strftime('%Y-%m-%d') + numeric_value = float(value) if value is not None and str(value).strip() and str(value).strip().lower() != 'none' else None + + data_to_insert.append(( + company_code, + datetime.now().strftime('%Y-%m-%d'), + currency, # currency 即使不用于BQL查询,仍作为记录的一部分 + key, + numeric_value, + formatted_date + )) + except (ValueError, TypeError) as e: + print(f"Skipping invalid non-currency data for {key} on {date_str}: {value}. Error: {e}") + continue + + if data_to_insert: + insert_query = """ + INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date) + VALUES (%s, %s, %s, %s, %s, %s) + """ + try: + cursor.executemany(insert_query, data_to_insert) + conn.commit() + print(f"Successfully inserted {len(data_to_insert)} non-currency data points for {company_code}.") + except Exception as e: + conn.rollback() # 回滚所有数据 + print(f"Failed to insert non-currency data for {company_code}. Error: {e}") + + + +def get_waiting_list(cursor, conn): + """ + 从数据库中获取待处理的公司列表。这些公司在 `waiting_list` 表中 `status` 字段为 0。 + 本函数仅负责查询并返回列表,不负责更新 `status` 字段。 + + 参数: + cursor: 数据库游标。 + conn: 数据库连接。 + + 返回: + pd.DataFrame: 包含待处理公司代码和对应货币的DataFrame,列名为 "member_ticker" 和 "currency"。 + 如果等待列表为空,则返回空的DataFrame。 + """ + try: + # 查询数据库是否存在等待列表,并按更新日期排序,限制数量 + query = "SELECT id, company_code, currency FROM waiting_list WHERE status = 0 ORDER BY update_date ASC LIMIT 300" + cursor.execute(query) + result = cursor.fetchall() + + if result: + print(f"读到等待列表,待处理公司数量: {len(result)}") + # 获取company_code和currency列表 + companies_and_currencies = [(row[0], row[1], row[2]) for row in result] + # 返回DataFrame,不再在此处更新status + return pd.DataFrame(companies_and_currencies, columns=["id", "member_ticker", "currency"]) + else: + print("等待列表为空") + return pd.DataFrame(columns=["id", "member_ticker", "currency"]) + except Exception as e: + print(f"获取等待列表失败: {str(e)}") + return pd.DataFrame(columns=["id", "member_ticker", "currency"]) + +def main(): + """ + 主函数,负责初始化Bloomberg查询和数据库连接,处理等待列表中的公司数据,并更新状态。 + + 流程: + 1. 初始化Bloomberg查询服务 + 2. 连接数据库 + 3. 获取等待列表 + 4. 处理每家公司数据 + 5. 更新处理状态 + 6. 关闭所有资源 + """ + # 数据库连接配置 + DB_HOST = "aws-0-ap-northeast-1.pooler.supabase.com" + DB_PORT = "5432" + DB_NAME = "postgres" + DB_USER = "postgres.kzexzbtpbnufbvrvkuae" + DB_PASSWORD = "cAuNDnJv0aj1NW9l" + + # 初始化Bloomberg查询对象、数据库连接和游标 + bquery = None + conn = None + cursor = None + + try: + try: + bquery = blp.BlpQuery().start() + except Exception as e: + print(f"初始化bloomberg失败: {str(e)}") + raise # 重新抛出异常,确保外部调用者知道初始化失败 + + try: + # 构建连接字符串 + conn_string = f"host={DB_HOST} port={DB_PORT} dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD} gssencmode=disable" + # 建立连接 + conn = psycopg2.connect(conn_string) + cursor = conn.cursor() + print("成功连接到 Supabase Session Pooler!") + except Exception as e: + print(f"连接Supabase Session Pooler或查询失败: {e}") + + # 获取等待列表 + waiting_list = get_waiting_list(cursor, conn) + + if not waiting_list.empty: + print(f"开始处理 {len(waiting_list)} 家公司数据") + # 处理每家公司数据 + for _, row in waiting_list.iterrows(): + id = int(row["id"]) + company_code = str(row["member_ticker"]) # 确保 company_code 是字符串 + currency = row["currency"] + try: + process_stockcard_data(bquery, cursor, conn, company_code, stockcard_CONFIG["period"], currency) + # 更新处理状态和更新日期 + try: + cursor.execute("UPDATE waiting_list SET status = 1, update_date = NOW() WHERE id = %s", (id,)) + conn.commit() + print(f"成功更新等待列表ID: {id} 的状态和日期。") + except Exception as e: + conn.rollback() + print(f"更新等待列表ID: {id} 失败: {str(e)}") + print(f"成功处理并更新状态: {company_code}") + except Exception as e: + print(f"处理 {company_code} 时出错: {str(e)}") + conn.rollback() + # 执行删除重复数据的SQL + try: + cursor.execute(''' + WITH DuplicateRows AS ( + SELECT + id, -- 现在我们可以使用这个新的主键 + ROW_NUMBER() OVER( + PARTITION BY company_code, currency, indicator, value_date + ORDER BY update_date DESC + ) as rn + FROM + stockcard + ) + DELETE FROM stockcard + WHERE id IN ( + SELECT id + FROM DuplicateRows + WHERE rn > 1 + ); + ''') + conn.commit() + print(f"成功执行删除重复数据SQL,共删除 {cursor.rowcount} 条记录。") + except Exception as e: + print(f"执行 RemoveDuplicateStockcardEntries.sql 失败: {e}") + conn.rollback() + # 执行更新唯一公司代码的SQL + + try: + cursor.execute(''' + -- 如果物化视图存在,则先删除它,以解决“relation already exists”错误 + DROP MATERIALIZED VIEW IF EXISTS public.unique_company_codes; + + -- 创建物化视图 public.unique_company_codes,包含唯一的公司代码和对应的公司名称 + CREATE MATERIALIZED VIEW public.unique_company_codes AS + SELECT + s.Company_code, -- 公司代码 + -- 使用子查询确保每个公司代码只关联一个公司名称 + (SELECT cn_sub.value + FROM public.stockcard AS cn_sub + WHERE cn_sub.Company_code = s.Company_code + AND cn_sub.indicator = 'company_name' + ORDER BY cn_sub.value ASC -- 如果存在多个公司名称,则按字母顺序选择第一个,以确保唯一性 + LIMIT 1) AS company_name -- 只取一个匹配的 company_name + FROM + -- 先选择出所有不为 NULL 的唯一公司代码 + (SELECT DISTINCT Company_code FROM public.stockcard WHERE Company_code IS NOT NULL) s + ORDER BY + s.Company_code; -- 按公司代码排序,方便查看和使用 + + -- 在物化视图的 Company_code 列上创建索引,以便后续查询该视图时更快 + -- 注意:在创建物化视图之后,需要重新创建索引 + CREATE INDEX idx_unique_company_codes_company_code ON public.unique_company_codes (Company_code); + + ''') + conn.commit() + print("成功执行 UniqueCompanyCodes.sql 中的 SQL 语句(创建/刷新物化视图及索引)。") + except Exception as e: + print(f"执行 UniqueCompanyCodes.sql 失败: {e}") + conn.rollback() + else: + print("没有待处理的公司") + + except Exception as e: + print(f"主程序出错: {str(e)}") + finally: + # 确保关闭所有资源 + if cursor: + cursor.close() + if conn: + conn.close() + if bquery: + bquery.stop() + print("程序执行完毕,所有资源已释放") + +if __name__ == "__main__": + main() # 重新抛出异常,确保外部调用者知道初始化失败 + + diff --git a/src/fetchers/bloomberg_client.py b/src/fetchers/bloomberg_client.py new file mode 100644 index 0000000..db5a01c --- /dev/null +++ b/src/fetchers/bloomberg_client.py @@ -0,0 +1,653 @@ +import os +import sys +import json +import uuid +import time +import requests +import websocket +import psycopg2 +from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT +from datetime import datetime +from dotenv import load_dotenv + +# Load environment variables +load_dotenv() + +# --- Configurations adapted from legacy/bloomberg.py --- + +CURRENCY_CONFIG = { + "Revenue": "SALES_REV_TURN", + "Net_Income": "EARN_FOR_COMMON", + "Cash_From_Operating": "CF_CASH_FROM_OPER", + "Capital_Expenditure": "CAPITAL_EXPEND", + "Free_Cash_Flow": "CF_FREE_CASH_FLOW", + "Dividends_Paid": "CF_DVD_PAID", + "Total_Assets": "BS_TOT_ASSET", + "Equity": "BS_TOT_EQY", + "Goodwill": "BS_GOODWILL", + "SG&A": "IS_SG_AND_A_EXPENSE", + "Selling&Marketing": "IS_SELLING_EXPENSES", + "General&Admin": "IS_GENERAL_AND_ADMIN_GAAP", + "R&D": "IS_RD_EXPEND", + "Depreciation": "IS_DEPR_EXP", + "Cash": "BS_CASH_NEAR_CASH_ITEM", + "Inventory": "BS_INVENTORIES", + "Accounts&Notes_Receivable": "BS_ACCT_NOTE_RCV", + "Prepaid": "BS_PREPAY", + "Property_Plant&Equipment": "BS_NET_FIX_ASSET", + "LT_Investment": "BS_LT_INVEST", + "Accounts_Payable": "BS_ACCT_PAYABLE", + "ST_Debt": "BS_ST_BORROW", + "LT_Debt": "BS_LT_BORROW", + "ST_Defer_Rev":"ST_DEFERRED_REVENUE", + "repurchase":"cf_proceeds_repurchase_equity" +} + +NON_CURRENCY_CONFIG = { + "ROE": "RETURN_COM_EQY", + "ROA": "RETURN_ON_ASSET", + "ROCE": "RETURN_ON_CAPITAL_EMPLOYED", + "Gross_Margin": "GROSS_MARGIN", + "EBITDA_margin": "EBITDA_TO_REVENUE", + "Net_Profit_Margin": "PROF_MARGIN", + "Tax_Rate": "IS_STATUTORY_TAX_RATE", + "Inventory_Days": "INVENT_DAYS", + "Days_Sales_Outstanding": "ANNUALIZED_DAYS_SALES_OUTSTDG", + "Payables_Days": "ACCOUNTS_PAYABLE_TURNOVER_DAYS", + "Employee": "NUM_OF_EMPLOYEES", + "PE": "px_last/is_eps", + "PB": "PX_TO_BOOK_RATIO", + "Shareholders": "BS_NUM_OF_SHAREHOLDERS", + "Dividend_Payout_Ratio":"DVD_PAYOUT_RATIO", + "Total_Debt_Ratio":"TOT_DEBT_TO_TOT_ASSET", + "Net_Fixed_Asset_Turnover":"NET_FIX_ASSET_TURN", + "Asset_Turnover":"ASSET_TURNOVER", + "NetIncome_Growth":"earn_for_com_growth", + "Revenue_Growth":"sales_growth", +} + +PRICE_CONFIG = { + "Last_Price": "PX_LAST", + "Market_Cap":"cur_mkt_cap", +} + +STOCKCARD_CONFIG = { + "period": 10, + "unit": 100000000 +} + + +class BloombergClient: + def __init__(self, jupyter_url="http://192.168.3.161:8888", password="Value609!"): + self.jupyter_url = jupyter_url.rstrip("/") + self.password = password + self.session = requests.Session() + self.kernel_id = None + self.ws = None + + # Authenticate and setup connection + self._authenticate() + self._setup_kernel() + self._connect_websocket() + + def _authenticate(self): + """Authenticate with JupyterHub/Lab using password to get session cookies.""" + print(f"Connecting to Jupyter at {self.jupyter_url}...") + try: + # 1. Get Login Page to fetch _xsrf (if needed) + r = self.session.get(f"{self.jupyter_url}/login") + if r.status_code != 200: + print(f"Failed to access login page: {r.status_code}") + + xsrf = self.session.cookies.get("_xsrf", "") + + # 2. Post Password + data = {"password": self.password} + if xsrf: + data["_xsrf"] = xsrf + + login_resp = self.session.post(f"{self.jupyter_url}/login", data=data) + + # 3. Verify Auth + api_resp = self.session.get(f"{self.jupyter_url}/api/status") + if api_resp.status_code == 200: + print("✅ Authentication successful.") + else: + raise Exception(f"Authentication failed. Status: {api_resp.status_code}") + + except Exception as e: + print(f"❌ Error during authentication: {e}") + raise + + def _setup_kernel(self): + """Find an existing kernel or create a new one.""" + try: + # List kernels + resp = self.session.get(f"{self.jupyter_url}/api/kernels") + kernels = resp.json() + + # Try to find 'remote_env' or just pick the first available + target_kernel = None + for k in kernels: + if k.get('name') == 'remote_env': # Ideal match + target_kernel = k + break + + if not target_kernel and kernels: + target_kernel = kernels[0] # Fallback + + if target_kernel: + self.kernel_id = target_kernel['id'] + print(f"✅ Found existing kernel: {self.kernel_id} ({target_kernel.get('name')})") + else: + print("No active kernels found. Starting a new one...") + # Start new kernel (assuming python3) + start_resp = self.session.post(f"{self.jupyter_url}/api/kernels", json={"name": "python3"}) + if start_resp.status_code == 201: + self.kernel_id = start_resp.json()['id'] + print(f"✅ Started new kernel: {self.kernel_id}") + else: + raise Exception(f"Failed to start kernel: {start_resp.text}") + + except Exception as e: + print(f"❌ Error setting up kernel: {e}") + raise + + def _connect_websocket(self): + """Connect to the Kernel's WebSocket channel.""" + base_ws_url = self.jupyter_url.replace("http", "ws") + ws_url = f"{base_ws_url}/api/kernels/{self.kernel_id}/channels" + + # Extract cookies for WS connection + cookies = self.session.cookies.get_dict() + cookie_str = "; ".join([f"{k}={v}" for k, v in cookies.items()]) + + try: + self.ws = websocket.create_connection( + ws_url, + header={"Cookie": cookie_str}, + timeout=60 # Connection timeout + ) + print("✅ WebSocket connected.") + except Exception as e: + print(f"❌ WebSocket connection failed: {e}") + raise + + def execute_remote_code(self, code_str): + """ + Send code to remote kernel via WebSocket and wait for result. + Returns list of output strings (stdout). + """ + msg_id = str(uuid.uuid4()) + msg = { + "header": { + "msg_id": msg_id, + "username": "client", + "session": str(uuid.uuid4()), + "msg_type": "execute_request", + "version": "5.3" + }, + "parent_header": {}, + "metadata": {}, + "content": { + "code": code_str, + "silent": False, + "store_history": True, + "user_expressions": {}, + "allow_stdin": False + } + } + + self.ws.send(json.dumps(msg)) + + outputs = [] + error = None + + # Loop to receive messages until 'idle' + while True: + try: + resp = json.loads(self.ws.recv()) + msg_type = resp['msg_type'] + + # Check for idle status (command finished) + if msg_type == 'status': + if resp['content']['execution_state'] == 'idle': + # Make sure it's the response to OUR request (simple check) + # Ideally check parent_header.msg_id == msg_id + if resp['parent_header'].get('msg_id') == msg_id: + break + + elif msg_type == 'stream': + # Stdout/Stderr + outputs.append(resp['content']['text']) + + elif msg_type == 'error': + error = resp['content']['evalue'] + print(f"❌ Remote Execution Error: {error}") + for line in resp['content']['traceback']: + print(line) + + except Exception as e: + print(f"Error receiving WS message: {e}") + break + + if error: + raise Exception(f"Remote Code Execution Failed: {error}") + + return "".join(outputs) + + # --- Database Operations --- + + def _get_db_connection(self): + """Create a database connection, creating the DB if it doesn't exist.""" + db_host = os.getenv("DB_HOST") + db_user = os.getenv("DB_USER") + db_pass = os.getenv("DB_PASSWORD") + db_name = os.getenv("DB_NAME") + db_port = os.getenv("DB_PORT", "5432") + + # 1. Try connecting to the specific DB + try: + conn = psycopg2.connect( + host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port + ) + return conn + except psycopg2.OperationalError as e: + if f'database "{db_name}" does not exist' in str(e): + print(f"Database '{db_name}' does not exist. Creating it...") + # Connect to 'postgres' to create DB + sys_conn = psycopg2.connect( + host=db_host, user=db_user, password=db_pass, dbname="postgres", port=db_port + ) + sys_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) + cur = sys_conn.cursor() + cur.execute(f'CREATE DATABASE "{db_name}"') + cur.close() + sys_conn.close() + print(f"✅ Database '{db_name}' created.") + + # Retry connection + conn = psycopg2.connect( + host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port + ) + return conn + else: + raise e + + def _ensure_schema(self, conn): + """Ensure the necessary tables exist.""" + with conn.cursor() as cur: + # Create table if not exists (Inferred schema from legacy use) + cur.execute(""" + CREATE TABLE IF NOT EXISTS stockcard ( + id SERIAL PRIMARY KEY, + Company_code TEXT, + update_date DATE, + currency TEXT, + indicator TEXT, + value TEXT, + value_date DATE, + source TEXT + ); + """) + + # Ensure column exists if table was already created + try: + cur.execute("ALTER TABLE stockcard ADD COLUMN IF NOT EXISTS source TEXT;") + except Exception: + conn.rollback() # Should not happen with IF NOT EXISTS but good practice + + conn.commit() + + def save_data(self, data_list): + """Insert a list of data dictionaries into the database.""" + if not data_list: + return + + conn = self._get_db_connection() + self._ensure_schema(conn) + + try: + with conn.cursor() as cur: + args_list = [] + for d in data_list: + # Format: (Company_code, update_date, currency, indicator, value, value_date, source) + args_list.append(( + d['Company_code'], + d['update_date'], + d['currency'], + d['indicator'], + d['value'], + d['value_date'], + 'bloomberg' + )) + + query = """ + INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date, source) + VALUES (%s, %s, %s, %s, %s, %s, %s) + """ + cur.executemany(query, args_list) + conn.commit() + print(f"✅ Saved {len(data_list)} records to database.") + finally: + conn.close() + + def run_cleanup(self): + """Run deduplication and view refresh logic.""" + print("Running database cleanup...") + conn = self._get_db_connection() + try: + with conn.cursor() as cur: + # 1. Deduplication + cur.execute(''' + WITH DuplicateRows AS ( + SELECT + id, + ROW_NUMBER() OVER( + PARTITION BY company_code, currency, indicator, value_date + ORDER BY update_date DESC + ) as rn + FROM + stockcard + ) + DELETE FROM stockcard + WHERE id IN ( + SELECT id + FROM DuplicateRows + WHERE rn > 1 + ); + ''') + + # 2. Materialized View Refresh + cur.execute(''' + DROP MATERIALIZED VIEW IF EXISTS public.unique_company_codes; + + CREATE MATERIALIZED VIEW public.unique_company_codes AS + SELECT + s.Company_code, + (SELECT cn_sub.value + FROM public.stockcard AS cn_sub + WHERE cn_sub.Company_code = s.Company_code + AND cn_sub.indicator = 'company_name' + ORDER BY cn_sub.value ASC + LIMIT 1) AS company_name + FROM + (SELECT DISTINCT Company_code FROM public.stockcard WHERE Company_code IS NOT NULL) s + ORDER BY + s.Company_code; + + CREATE INDEX idx_unique_company_codes_company_code ON public.unique_company_codes (Company_code); + ''') + + conn.commit() + print("✅ Cleanup and View Refresh completed.") + except Exception as e: + print(f"❌ Cleanup failed: {e}") + conn.rollback() + finally: + conn.close() + + # --- Core Fetching Logic (Generating Remote Python) --- + + def fetch_company(self, market, symbol): + """ + Main entry point to fetch data for a single company. + Generates Python code, executes it remotely, parses result, saves to DB. + """ + company_code = f"{symbol} {market} Equity" + today_str = datetime.now().strftime('%Y-%m-%d') + + print(f"🚀 Starting fetch for: {company_code}") + + # 0. Prep Remote Environment + init_code = """ +import json +import pandas as pd +from datetime import datetime +try: + from blp import blp +except ImportError: + print("Error: 'blp' module not found.") + +# Ensure bquery is started +if 'bquery' not in globals(): + try: + bquery = blp.BlpQuery().start() + print("BlpQuery started.") + except Exception as e: + print(f"Error starting BlpQuery: {e}") +""" + self.execute_remote_code(init_code) + + # 1. Fetch Basic Info + print("Fetching Basic Data...") + basic_data = self._fetch_basic_remote(company_code, "USD") # Default USD for basic? Legacy uses arg. + self.save_data(basic_data) + + # Get Currency for subsequent calls (from basic data provided elsewhere or just assume config?) + # Legacy script takes 'currency' from waiting list. Here we need to decide. + # Usually JP -> JPY, US -> USD, VN -> VND. + # Logic: Infer currency from Market? + currency = "USD" + if "JP" in market.upper(): currency = "JPY" + elif "VN" in market.upper(): currency = "VND" + elif "CN" in market.upper(): currency = "CNY" + elif "HK" in market.upper(): currency = "HKD" + + print(f"Using currency: {currency}") + + # 2. Fetch Currency Data + print("Fetching Currency Data...") + curr_data = self._fetch_series_remote(company_code, currency, CURRENCY_CONFIG, "currency") + self.save_data(curr_data) + + # 3. Fetch Non-Currency Data + print("Fetching Non-Currency Data...") + non_curr_data = self._fetch_series_remote(company_code, currency, NON_CURRENCY_CONFIG, "non_currency") + self.save_data(non_curr_data) + + # 4. Fetch Price Data + print("Fetching Price Data...") + price_data = self._fetch_series_remote(company_code, currency, PRICE_CONFIG, "price") + self.save_data(price_data) + + # 5. Cleanup + self.run_cleanup() + print(f"✅ Completed processing for {company_code}") + + def _fetch_basic_remote(self, company_code, currency): + """Generates code to fetch basic data similar to get_basic_stock_data""" + + # We construct a python script to run on the server that returns a JSON list. + code = f""" +def get_basic(): + company = "{company_code}" + curr = "{currency}" + res_list = [] + + # 1. BQL query for name, pe, pb, mkt_cap, foreign_rev + q = f"for(['{{company}}']) get(name,listing_date,pe_ratio,px_to_book_ratio,cur_mkt_cap(currency={{curr}}),PCT_REVENUE_FROM_FOREIGN_SOURCES)" + try: + df = bquery.bql(q) + if not df.empty: + # Helper to safely get value + def get_val(df, field_name): + # Matches either exact field or contains + # The legacy script has logic: result[result['field'] == 'name'] + # But BQL return structure might vary. Assuming standard 'field' column. + # Adapting legacy logic: + if field_name == 'name': + rows = df[df['field'] == 'name'] + elif 'cur_mkt_cap' in field_name: + rows = df[df['field'].str.contains('cur_mkt_cap')] + elif 'FOREIGN' in field_name: + rows = df[df['field'].str.contains('FOREIGN')] + else: + rows = df[df['field'] == field_name] + + if not rows.empty: + val = rows['value'].iloc[0] + return str(val) if val is not None else None + return None + + res_list.append({{"indicator": "company_name", "value": get_val(df, 'name')}}) + res_list.append({{"indicator": "pe_ratio", "value": get_val(df, 'pe_ratio')}}) + res_list.append({{"indicator": "pb_ratio", "value": get_val(df, 'px_to_book_ratio')}}) + res_list.append({{"indicator": "market_cap", "value": get_val(df, 'cur_mkt_cap')}}) + res_list.append({{"indicator": "Rev_Abroad", "value": get_val(df, 'FOREIGN')}}) + except Exception as e: + print(f"Basic BQL Error: {{e}}") + + # 2. BDP for IPO and Dividend + try: + did = bquery.bdp([company], ["DIVIDEND_12_MONTH_YIELD"]) + if not did.empty and 'DIVIDEND_12_MONTH_YIELD' in did.columns: + res_list.append({{"indicator": "dividend_yield", "value": str(did['DIVIDEND_12_MONTH_YIELD'][0])}}) + + ipo = bquery.bdp([company], ["EQY_INIT_PO_DT"]) + if not ipo.empty and 'EQY_INIT_PO_DT' in ipo.columns: + res_list.append({{"indicator": "IPO_date", "value": str(ipo['EQY_INIT_PO_DT'][0])}}) + + except Exception as e: + print(f"Basic BDP Error: {{e}}") + + # Format result + final_res = [] + today = datetime.now().strftime('%Y-%m-%d') + for item in res_list: + if item['value']: + final_res.append({{ + "Company_code": company, + "update_date": today, + "currency": curr, + "indicator": item['indicator'], + "value": item['value'], + "value_date": today + }}) + + print("JSON_START") + print(json.dumps(final_res)) + print("JSON_END") + +get_basic() +""" + return self._execute_and_parse(code) + + def _fetch_series_remote(self, company_code, currency, config_dict, result_type): + """Generates code to fetch series data using BDH (Historical Data)""" + + config_json = json.dumps(config_dict) + period_years = STOCKCARD_CONFIG['period'] + + # Calculate start/end dates + start_year = datetime.now().year - period_years + start_date = f"{start_year}0101" + end_date = datetime.now().strftime('%Y%m%d') + + # Define options locally and serialize to JSON string to inject into remote code + bdh_options = { + 'periodicitySelection': 'YEARLY', + 'currency': currency, + 'nonTradingDayFillOption': 'ALL_CALENDAR_DAYS', + 'nonTradingDayFillMethod': 'PREVIOUS_VALUE' + } + bdh_opts_json = json.dumps(bdh_options) + + code = f""" +def get_series(): + company = "{company_code}" + curr = "{currency}" + config = {config_json} + + # Invert Config to map Mnemonic -> Human Indicator Name for easier parsing + # config is {{"HumanName": "Mnemonic"}} + mnemonic_map = {{v.upper(): k for k, v in config.items()}} + fields = list(mnemonic_map.keys()) + + res_list = [] + + try: + # Fetch BDH with injected options + df = bquery.bdh( + [company], + fields, + start_date='{start_date}', + end_date='{end_date}', + options={bdh_opts_json} + ) + + if not df.empty: + for _, row in df.iterrows(): + # Get Date + # date col might be 'date' or 'DATE' + date_val = None + if 'date' in df.columns: date_val = row['date'] + elif 'DATE' in df.columns: date_val = row['DATE'] + + if not date_val: continue + + date_str = str(date_val)[:10] # YYYY-MM-DD + + # Iterate through fields + for mnemonic, indicator_name in mnemonic_map.items(): + if mnemonic in df.columns: + val = row[mnemonic] + + if val is None or str(val).lower() == 'nan' or str(val).lower() == 'none': + continue + + res_list.append({{ + "Company_code": company, + "update_date": datetime.now().strftime('%Y-%m-%d'), + "currency": curr, + "indicator": indicator_name, + "value": str(val), + "value_date": date_str + }}) + + except Exception as e: + print(f"BDH Error: {{e}}") + + print("JSON_START") + print(json.dumps(res_list)) + print("JSON_END") + +get_series() +""" + return self._execute_and_parse(code) + + def _execute_and_parse(self, code): + """Execute code and parse [JSON_START]...[JSON_END]""" + raw_output = self.execute_remote_code(code) + + # Simple parser + try: + start_idx = raw_output.find("JSON_START") + end_idx = raw_output.find("JSON_END") + + if start_idx != -1 and end_idx != -1: + json_str = raw_output[start_idx + len("JSON_START"):end_idx].strip() + return json.loads(json_str) + else: + print(f"⚠️ No JSON output found in remote response. Raw output:\n{raw_output[:200]}...") + return [] + except Exception as e: + print(f"❌ Error parsing JSON from remote: {e}") + return [] + + +if __name__ == "__main__": + if len(sys.argv) < 3: + print("Usage: python bloomberg_client.py ") + print("Example: python bloomberg_client.py JP 7203") + sys.exit(1) + + market_arg = sys.argv[1] + symbol_arg = sys.argv[2] + + try: + client = BloombergClient() + client.fetch_company(market_arg, symbol_arg) + except Exception as e: + print(f"CRITICAL ERROR: {e}") + sys.exit(1) diff --git a/test_hk_fetcher_logic.py b/test_hk_fetcher_logic.py deleted file mode 100644 index e647c49..0000000 --- a/test_hk_fetcher_logic.py +++ /dev/null @@ -1,105 +0,0 @@ - -import os -import sys -import pandas as pd -from unittest.mock import MagicMock -from src.fetchers.hk_fetcher import HkFetcher - -# Mock the IFindClient to avoid actual network requests and credentials -class MockIFindClient: - def __init__(self, refresh_token): - pass - - def post(self, endpoint, params): - # Simulate data availability logic - # If querying for 20261231, return empty - # If querying for 20251231, return data - - # Extract date from params - date = "unknown" - if "indipara" in params: - for item in params["indipara"]: - if "indiparams" in item and len(item["indiparams"]) > 0: - param0 = item["indiparams"][0] - if len(param0) == 8: # YYYYMMDD - date = param0 - break - - # Test Case 1: Detect year logic - if "20261231" in date: - return {"tables": [{"time": [], "table": {}}]} # Empty - - if "20251231" in date: - return { - "tables": [{ - "time": ["2025-12-31"], - "table": { - "revenue_oas": [1000], - "roe": [15.5], - "total_oi": [5000] - } - }] - } - - if "20241231" in date: - return { - "tables": [{ - "time": ["2024-12-31"], - "table": { - "revenue_oas": [900], - "roe": [14.0], - "total_oi": [4000] - } - }] - } - - return {"tables": []} - -def test_hk_fetcher_year_detection(): - print("Testing HK Fetcher Year Detection Logic...") - - # Mock time.strftime to return 2026 - import time - original_strftime = time.strftime - time.strftime = MagicMock(return_value="2026") - - try: - fetcher = HkFetcher("fake_token") - # Replace the client with our mock - fetcher.cli = MockIFindClient("fake_token") - - # 1. Test get_income_statement logic - print("\nTesting _fetch_financial_data_annual (via income statement)...") - # We expect it to try 2026 (fail), then 2025 (succeed), then fetch 2025-2021 - df_income = fetcher.get_income_statement("0700.HK") - - if not df_income.empty: - dates = df_income['end_date'].tolist() - print(f"Fetched Income Statement Dates: {dates}") - if "20251231" in dates and "20261231" not in dates: - print("PASS: Correctly anchored to 2025 instead of 2026.") - else: - print(f"FAIL: Logic incorrect. Dates found: {dates}") - else: - print("FAIL: No data returned.") - - # 2. Test get_financial_ratios logic - print("\nTesting get_financial_ratios...") - df_ratios = fetcher.get_financial_ratios("0700.HK") - - if not df_ratios.empty: - dates = df_ratios['end_date'].tolist() - print(f"Fetched Ratios Dates: {dates}") - if "20251231" in dates and "20261231" not in dates: - print("PASS: Correctly anchored to 2025 instead of 2026.") - else: - print(f"FAIL: Logic incorrect. Dates found: {dates}") - else: - print("FAIL: No data returned.") - - finally: - # Restore time - time.strftime = original_strftime - -if __name__ == "__main__": - test_hk_fetcher_year_detection()