from datetime import datetime import pandas as pd from blp import blp import psycopg2 import json currency_config = { "Revenue": "SALES_REV_TURN", "Net_Income": "EARN_FOR_COMMON", "Cash_From_Operating": "CF_CASH_FROM_OPER", "Capital_Expenditure": "CAPITAL_EXPEND", "Free_Cash_Flow": "CF_FREE_CASH_FLOW", "Dividends_Paid": "CF_DVD_PAID", "Total_Assets": "BS_TOT_ASSET", "Equity": "BS_TOT_EQY", "Goodwill": "BS_GOODWILL", "SG&A": "IS_SG_AND_A_EXPENSE", "Selling&Marketing": "IS_SELLING_EXPENSES", "General&Admin": "IS_GENERAL_AND_ADMIN_GAAP", "R&D": "IS_RD_EXPEND", "Depreciation": "IS_DEPR_EXP", "Cash": "BS_CASH_NEAR_CASH_ITEM", "Inventory": "BS_INVENTORIES", "Accounts&Notes_Receivable": "BS_ACCT_NOTE_RCV", "Prepaid": "BS_PREPAY", "Property_Plant&Equipment": "BS_NET_FIX_ASSET", "LT_Investment": "BS_LT_INVEST", "Accounts_Payable": "BS_ACCT_PAYABLE", "ST_Debt": "BS_ST_BORROW", "LT_Debt": "BS_LT_BORROW", "ST_Defer_Rev":"ST_DEFERRED_REVENUE", "repurchase":"cf_proceeds_repurchase_equity" } non_currency_config = { "ROE": "RETURN_COM_EQY", "ROA": "RETURN_ON_ASSET", "ROCE": "RETURN_ON_CAPITAL_EMPLOYED", "Gross_Margin": "GROSS_MARGIN", "EBITDA_margin": "EBITDA_TO_REVENUE", "Net_Profit_Margin": "PROF_MARGIN", "Tax_Rate": "IS_STATUTORY_TAX_RATE", "Inventory_Days": "INVENT_DAYS", "Days_Sales_Outstanding": "ANNUALIZED_DAYS_SALES_OUTSTDG", "Payables_Days": "ACCOUNTS_PAYABLE_TURNOVER_DAYS", "Employee": "NUM_OF_EMPLOYEES", "PE": "px_last/is_eps", "PB": "PX_TO_BOOK_RATIO", "Shareholders": "BS_NUM_OF_SHAREHOLDERS", "Dividend_Payout_Ratio":"DVD_PAYOUT_RATIO", "Total_Debt_Ratio":"TOT_DEBT_TO_TOT_ASSET", "Net_Fixed_Asset_Turnover":"NET_FIX_ASSET_TURN", "Asset_Turnover":"ASSET_TURNOVER", "NetIncome_Growth":"earn_for_com_growth", "Revenue_Growth":"sales_growth", } price_config= { "Last_Price": "PX_LAST", "Market_Cap":"cur_mkt_cap", } # 基础配置 stockcard_CONFIG = { "period": 10, "unit": 100000000 } def process_stockcard_data(bquery, cursor, conn, company_code, period, currency): """ 处理单个公司的股票卡片数据,包括基本信息、非货币数据、货币数据和价格数据,并将其存入数据库。 参数: bquery: Bloomberg查询对象。 cursor: 数据库游标。 conn: 数据库连接。 company_code (str): 公司代码,例如 'IBM US Equity'。 period (int): 查询的年度周期数。 currency (str): 货币代码,例如 'USD', 'CNY'。 """ try: # 由于原代码中此处后续是文件读取和数据库连接操作, # 为了保证资源的正确释放,添加 finally 子句 # 同时添加 except 子句来捕获可能的异常 # 处理输入信息 company_code = company_code + " Equity" period = int(period) currency = currency # 获取基本信息 stockcard_data = get_basic_stock_data(bquery, company_code, currency) # 插入数据到数据库 for data in stockcard_data: cursor.execute(""" INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date) VALUES (%s, %s, %s, %s, %s, %s) """, ( data['Company_code'], data['update_date'], data['currency'], data['indicator'], data['value'], data['value_date'] )) conn.commit() # 处理非货币数据 process_non_currency_data(bquery, cursor, conn, company_code, period, currency) # 处理货币数据 process_currency_data(bquery, cursor, conn, company_code, period, currency) # 处理价格数据 process_price_data(bquery, cursor, conn, company_code, period, currency) print(f"处理完成: {company_code}") except Exception as e: # 这里可以添加具体的异常处理逻辑,例如记录日志等 print(f"捕获到异常: {str(e)}") print(f"处理数据时发生错误: {str(e)}") raise def get_basic_stock_data(bquery, company_code, currency): """ 从Bloomberg获取股票的基础数据,包括公司名称、IPO日期、市盈率、市净率、股息收益率、市值和海外收入占比。 参数: bquery: Bloomberg查询对象。 company_code (str): 公司代码,例如 'IBM US Equity'。 currency (str): 货币代码,用于市值查询,例如 'USD', 'CNY'。 返回: list: 包含股票基础数据的字典列表,每个字典代表一个指标及其值。 """ print(f"Processing company: {company_code}") # 调试输出 query = f"for(['{company_code.strip()}']) get(name,listing_date,pe_ratio,px_to_book_ratio,cur_mkt_cap(currency={currency}),PCT_REVENUE_FROM_FOREIGN_SOURCES)" result = bquery.bql(query) did_result = bquery.bdp([f"{company_code}"], ["DIVIDEND_12_MONTH_YIELD"]) IPO_date = bquery.bdp([f"{company_code}"], ["EQY_INIT_PO_DT"]) return [ { 'Company_code': company_code, 'update_date': datetime.now().strftime('%Y-%m-%d'), 'currency': currency, 'indicator': 'company_name', 'value': result[result['field'] == 'name']['value'].iloc[0], 'value_date': datetime.now().strftime('%Y-%m-%d') }, { 'Company_code': company_code, 'update_date': datetime.now().strftime('%Y-%m-%d'), 'currency': currency, 'indicator': 'IPO_date', 'value': str(IPO_date['EQY_INIT_PO_DT'][0]), 'value_date': datetime.now().strftime('%Y-%m-%d') }, { 'Company_code': company_code, 'update_date': datetime.now().strftime('%Y-%m-%d'), 'currency': currency, 'indicator': 'pe_ratio', 'value': str(result[result['field'] == 'pe_ratio']['value'].iloc[0]), 'value_date': datetime.now().strftime('%Y-%m-%d') }, { 'Company_code': company_code, 'update_date': datetime.now().strftime('%Y-%m-%d'), 'currency': currency, 'indicator': 'pb_ratio', 'value': str(result[result['field'] == 'px_to_book_ratio']['value'].iloc[0]), 'value_date': datetime.now().strftime('%Y-%m-%d') }, { 'Company_code': company_code, 'update_date': datetime.now().strftime('%Y-%m-%d'), 'currency': currency, 'indicator': 'dividend_yield', 'value': str(did_result['DIVIDEND_12_MONTH_YIELD'][0]), 'value_date': datetime.now().strftime('%Y-%m-%d') }, { 'Company_code': company_code, 'update_date': datetime.now().strftime('%Y-%m-%d'), 'currency': currency, 'indicator': 'market_cap', 'value': str(result[result['field'].str.contains('cur_mkt_cap')]['value'].iloc[0]), 'value_date': datetime.now().strftime('%Y-%m-%d') }, { 'Company_code': company_code, 'update_date': datetime.now().strftime('%Y-%m-%d'), 'currency': currency, 'indicator': 'Rev_Abroad', 'value': str(result[result['field'].str.contains('FOREIGN')]['value'].iloc[0]), 'value_date': datetime.now().strftime('%Y-%m-%d') } ] def process_currency_data(bquery, cursor, conn, company_code, period, currency): """ 处理货币相关的财务数据,通过Bloomberg BQL查询获取指定公司、周期和货币的财务指标数据, 并将这些数据批量插入到 `stockcard` 表中。 参数: bquery: Bloomberg查询对象。 cursor: 数据库游标。 conn: 数据库连接。 company_code (str): 公司代码,例如 'IBM US Equity'。 period (int): 查询的年度周期数。 currency (str): 货币代码,例如 'USD', 'CNY'。 """ data_to_insert = [] for key, value_bql in currency_config.items(): time_series_query = f"for(['{company_code.strip()}']) get({value_bql}(currency={currency},fa_period_offset=range(-{period}A, 0A),fa_period_type=A))" print(f"Executing currency series query for {key} ({currency}): {time_series_query}") result = bquery.bql(time_series_query) # 检查结果是否为空或不包含预期字段 if result.empty or 'value' not in result.columns or 'secondary_value' not in result.columns: print(f"No data found for currency indicator: {key}") continue # 提取PERIOD_END_DATE对应的值和日期 filtered_results = result[result["secondary_name"] == 'PERIOD_END_DATE'] if filtered_results.empty: print(f"No PERIOD_END_DATE found for currency indicator: {key}") continue for _, row in filtered_results.iterrows(): date_str = row["secondary_value"] value = row["value"] try: # 确保日期格式正确 formatted_date = datetime.strptime(date_str[:10], '%Y-%m-%d').strftime('%Y-%m-%d') # 转换值为浮点数,处理None、空字符串和'None'字符串 numeric_value = float(value) if value is not None and str(value).strip() and str(value).strip().lower() != 'none' else None data_to_insert.append(( company_code, datetime.now().strftime('%Y-%m-%d'), currency, key, numeric_value, formatted_date )) except (ValueError, TypeError) as e: print(f"Skipping invalid currency data for {key} on {date_str}: {value}. Error: {e}") continue if data_to_insert: insert_query = """ INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date) VALUES (%s, %s, %s, %s, %s, %s) """ try: cursor.executemany(insert_query, data_to_insert) conn.commit() print(f"Successfully inserted {len(data_to_insert)} currency data points for {company_code}.") except Exception as e: conn.rollback() # 回滚所有数据 print(f"Failed to insert currency data for {company_code}. Error: {e}") def process_price_data(bquery, cursor, conn, company_code, period, currency): """ 处理价格数据并存入数据库。 通过Bloomberg BQL查询获取指定公司、周期和货币的价格指标数据, 然后将这些数据批量插入到 `stockcard` 表中。 参数: bquery: Bloomberg查询对象。 cursor: 数据库游标。 conn: 数据库连接。 company_code (str): 公司代码,例如 'IBM US Equity'。 period (int): 查询的年度周期数。 currency (str): 货币代码 (例如: 'USD', 'CNY')。 """ data_to_insert = [] for key, value_bql in price_config.items(): # 注意:价格数据查询通常是按日或按年,per='Y' 表示按年频率 time_series_query = f"for(['{company_code.strip()}']) get({value_bql}(currency={currency},fill='PREV', per='Y', start='-{period}Y'))" print(f"Executing price series query for {key}: {time_series_query}") result = bquery.bql(time_series_query) if result.empty or 'value' not in result.columns or 'secondary_value' not in result.columns: print(f"No price data found for indicator: {key}") continue # 提取DATE对应的值和日期 filtered_results = result[result["secondary_name"] == 'DATE'] if filtered_results.empty: print(f"No DATE found for price indicator: {key}") continue for _, row in filtered_results.iterrows(): date_str = row["secondary_value"] value = row["value"] try: formatted_date = datetime.strptime(date_str[:10], '%Y-%m-%d').strftime('%Y-%m-%d') numeric_value = float(value) if value is not None and str(value).strip() and str(value).strip().lower() != 'none' else None data_to_insert.append(( company_code, datetime.now().strftime('%Y-%m-%d'), currency, key, numeric_value, formatted_date )) except (ValueError, TypeError) as e: print(f"Skipping invalid price data for {key} on {date_str}: {value}. Error: {e}") continue if data_to_insert: insert_query = """ INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date) VALUES (%s, %s, %s, %s, %s, %s) """ try: cursor.executemany(insert_query, data_to_insert) conn.commit() print(f"Successfully inserted {len(data_to_insert)} price data points for {company_code}.") except Exception as e: conn.rollback() # 回滚所有数据 print(f"Failed to insert price data for {company_code}. Error: {e}") def process_non_currency_data(bquery, cursor, conn, company_code, period, currency): """ 处理非货币相关的财务数据,通过Bloomberg BQL查询获取指定公司和周期的非货币财务指标数据, 并将这些数据批量插入到 `stockcard` 表中。 参数: bquery: Bloomberg查询对象。 cursor: 数据库游标。 conn: 数据库连接。 company_code (str): 公司代码,例如 'IBM US Equity'。 period (int): 查询的年度周期数。 currency (str): 货币代码 (例如: 'USD', 'CNY')。此参数在此函数中可能不直接用于BQL查询,但作为数据记录的一部分。 """ data_to_insert = [] for key, value_bql in non_currency_config.items(): time_series_query = f"for(['{company_code.strip()}']) get({value_bql}(fa_period_offset=range(-{period}A, 0A),fa_period_type=A))" print(f"Executing non-currency series query for {key}: {time_series_query}") result = bquery.bql(time_series_query) if result.empty or 'value' not in result.columns or 'secondary_value' not in result.columns: print(f"No data found for non-currency indicator: {key}") continue # 提取PERIOD_END_DATE对应的值和日期 filtered_results = result[result["secondary_name"] == 'PERIOD_END_DATE'] if filtered_results.empty: print(f"No PERIOD_END_DATE found for non-currency indicator: {key}") continue for _, row in filtered_results.iterrows(): date_str = row["secondary_value"] value = row["value"] try: formatted_date = datetime.strptime(date_str[:10], '%Y-%m-%d').strftime('%Y-%m-%d') numeric_value = float(value) if value is not None and str(value).strip() and str(value).strip().lower() != 'none' else None data_to_insert.append(( company_code, datetime.now().strftime('%Y-%m-%d'), currency, # currency 即使不用于BQL查询,仍作为记录的一部分 key, numeric_value, formatted_date )) except (ValueError, TypeError) as e: print(f"Skipping invalid non-currency data for {key} on {date_str}: {value}. Error: {e}") continue if data_to_insert: insert_query = """ INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date) VALUES (%s, %s, %s, %s, %s, %s) """ try: cursor.executemany(insert_query, data_to_insert) conn.commit() print(f"Successfully inserted {len(data_to_insert)} non-currency data points for {company_code}.") except Exception as e: conn.rollback() # 回滚所有数据 print(f"Failed to insert non-currency data for {company_code}. Error: {e}") def get_waiting_list(cursor, conn): """ 从数据库中获取待处理的公司列表。这些公司在 `waiting_list` 表中 `status` 字段为 0。 本函数仅负责查询并返回列表,不负责更新 `status` 字段。 参数: cursor: 数据库游标。 conn: 数据库连接。 返回: pd.DataFrame: 包含待处理公司代码和对应货币的DataFrame,列名为 "member_ticker" 和 "currency"。 如果等待列表为空,则返回空的DataFrame。 """ try: # 查询数据库是否存在等待列表,并按更新日期排序,限制数量 query = "SELECT id, company_code, currency FROM waiting_list WHERE status = 0 ORDER BY update_date ASC LIMIT 300" cursor.execute(query) result = cursor.fetchall() if result: print(f"读到等待列表,待处理公司数量: {len(result)}") # 获取company_code和currency列表 companies_and_currencies = [(row[0], row[1], row[2]) for row in result] # 返回DataFrame,不再在此处更新status return pd.DataFrame(companies_and_currencies, columns=["id", "member_ticker", "currency"]) else: print("等待列表为空") return pd.DataFrame(columns=["id", "member_ticker", "currency"]) except Exception as e: print(f"获取等待列表失败: {str(e)}") return pd.DataFrame(columns=["id", "member_ticker", "currency"]) def main(): """ 主函数,负责初始化Bloomberg查询和数据库连接,处理等待列表中的公司数据,并更新状态。 流程: 1. 初始化Bloomberg查询服务 2. 连接数据库 3. 获取等待列表 4. 处理每家公司数据 5. 更新处理状态 6. 关闭所有资源 """ # 数据库连接配置 DB_HOST = "aws-0-ap-northeast-1.pooler.supabase.com" DB_PORT = "5432" DB_NAME = "postgres" DB_USER = "postgres.kzexzbtpbnufbvrvkuae" DB_PASSWORD = "cAuNDnJv0aj1NW9l" # 初始化Bloomberg查询对象、数据库连接和游标 bquery = None conn = None cursor = None try: try: bquery = blp.BlpQuery().start() except Exception as e: print(f"初始化bloomberg失败: {str(e)}") raise # 重新抛出异常,确保外部调用者知道初始化失败 try: # 构建连接字符串 conn_string = f"host={DB_HOST} port={DB_PORT} dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD} gssencmode=disable" # 建立连接 conn = psycopg2.connect(conn_string) cursor = conn.cursor() print("成功连接到 Supabase Session Pooler!") except Exception as e: print(f"连接Supabase Session Pooler或查询失败: {e}") # 获取等待列表 waiting_list = get_waiting_list(cursor, conn) if not waiting_list.empty: print(f"开始处理 {len(waiting_list)} 家公司数据") # 处理每家公司数据 for _, row in waiting_list.iterrows(): id = int(row["id"]) company_code = str(row["member_ticker"]) # 确保 company_code 是字符串 currency = row["currency"] try: process_stockcard_data(bquery, cursor, conn, company_code, stockcard_CONFIG["period"], currency) # 更新处理状态和更新日期 try: cursor.execute("UPDATE waiting_list SET status = 1, update_date = NOW() WHERE id = %s", (id,)) conn.commit() print(f"成功更新等待列表ID: {id} 的状态和日期。") except Exception as e: conn.rollback() print(f"更新等待列表ID: {id} 失败: {str(e)}") print(f"成功处理并更新状态: {company_code}") except Exception as e: print(f"处理 {company_code} 时出错: {str(e)}") conn.rollback() # 执行删除重复数据的SQL try: cursor.execute(''' WITH DuplicateRows AS ( SELECT id, -- 现在我们可以使用这个新的主键 ROW_NUMBER() OVER( PARTITION BY company_code, currency, indicator, value_date ORDER BY update_date DESC ) as rn FROM stockcard ) DELETE FROM stockcard WHERE id IN ( SELECT id FROM DuplicateRows WHERE rn > 1 ); ''') conn.commit() print(f"成功执行删除重复数据SQL,共删除 {cursor.rowcount} 条记录。") except Exception as e: print(f"执行 RemoveDuplicateStockcardEntries.sql 失败: {e}") conn.rollback() # 执行更新唯一公司代码的SQL try: cursor.execute(''' -- 如果物化视图存在,则先删除它,以解决“relation already exists”错误 DROP MATERIALIZED VIEW IF EXISTS public.unique_company_codes; -- 创建物化视图 public.unique_company_codes,包含唯一的公司代码和对应的公司名称 CREATE MATERIALIZED VIEW public.unique_company_codes AS SELECT s.Company_code, -- 公司代码 -- 使用子查询确保每个公司代码只关联一个公司名称 (SELECT cn_sub.value FROM public.stockcard AS cn_sub WHERE cn_sub.Company_code = s.Company_code AND cn_sub.indicator = 'company_name' ORDER BY cn_sub.value ASC -- 如果存在多个公司名称,则按字母顺序选择第一个,以确保唯一性 LIMIT 1) AS company_name -- 只取一个匹配的 company_name FROM -- 先选择出所有不为 NULL 的唯一公司代码 (SELECT DISTINCT Company_code FROM public.stockcard WHERE Company_code IS NOT NULL) s ORDER BY s.Company_code; -- 按公司代码排序,方便查看和使用 -- 在物化视图的 Company_code 列上创建索引,以便后续查询该视图时更快 -- 注意:在创建物化视图之后,需要重新创建索引 CREATE INDEX idx_unique_company_codes_company_code ON public.unique_company_codes (Company_code); ''') conn.commit() print("成功执行 UniqueCompanyCodes.sql 中的 SQL 语句(创建/刷新物化视图及索引)。") except Exception as e: print(f"执行 UniqueCompanyCodes.sql 失败: {e}") conn.rollback() else: print("没有待处理的公司") except Exception as e: print(f"主程序出错: {str(e)}") finally: # 确保关闭所有资源 if cursor: cursor.close() if conn: conn.close() if bquery: bquery.stop() print("程序执行完毕,所有资源已释放") if __name__ == "__main__": main() # 重新抛出异常,确保外部调用者知道初始化失败