575 lines
24 KiB
Python
575 lines
24 KiB
Python
from datetime import datetime
|
||
import pandas as pd
|
||
from blp import blp
|
||
import psycopg2
|
||
import json
|
||
|
||
currency_config = {
|
||
"Revenue": "SALES_REV_TURN",
|
||
"Net_Income": "EARN_FOR_COMMON",
|
||
"Cash_From_Operating": "CF_CASH_FROM_OPER",
|
||
"Capital_Expenditure": "CAPITAL_EXPEND",
|
||
"Free_Cash_Flow": "CF_FREE_CASH_FLOW",
|
||
"Dividends_Paid": "CF_DVD_PAID",
|
||
"Total_Assets": "BS_TOT_ASSET",
|
||
"Equity": "BS_TOT_EQY",
|
||
"Goodwill": "BS_GOODWILL",
|
||
"SG&A": "IS_SG_AND_A_EXPENSE",
|
||
"Selling&Marketing": "IS_SELLING_EXPENSES",
|
||
"General&Admin": "IS_GENERAL_AND_ADMIN_GAAP",
|
||
"R&D": "IS_RD_EXPEND",
|
||
"Depreciation": "IS_DEPR_EXP",
|
||
"Cash": "BS_CASH_NEAR_CASH_ITEM",
|
||
"Inventory": "BS_INVENTORIES",
|
||
"Accounts&Notes_Receivable": "BS_ACCT_NOTE_RCV",
|
||
"Prepaid": "BS_PREPAY",
|
||
"Property_Plant&Equipment": "BS_NET_FIX_ASSET",
|
||
"LT_Investment": "BS_LT_INVEST",
|
||
"Accounts_Payable": "BS_ACCT_PAYABLE",
|
||
"ST_Debt": "BS_ST_BORROW",
|
||
"LT_Debt": "BS_LT_BORROW",
|
||
"ST_Defer_Rev":"ST_DEFERRED_REVENUE",
|
||
"repurchase":"cf_proceeds_repurchase_equity"
|
||
}
|
||
|
||
non_currency_config = {
|
||
"ROE": "RETURN_COM_EQY",
|
||
"ROA": "RETURN_ON_ASSET",
|
||
"ROCE": "RETURN_ON_CAPITAL_EMPLOYED",
|
||
"Gross_Margin": "GROSS_MARGIN",
|
||
"EBITDA_margin": "EBITDA_TO_REVENUE",
|
||
"Net_Profit_Margin": "PROF_MARGIN",
|
||
"Tax_Rate": "IS_STATUTORY_TAX_RATE",
|
||
"Inventory_Days": "INVENT_DAYS",
|
||
"Days_Sales_Outstanding": "ANNUALIZED_DAYS_SALES_OUTSTDG",
|
||
"Payables_Days": "ACCOUNTS_PAYABLE_TURNOVER_DAYS",
|
||
"Employee": "NUM_OF_EMPLOYEES",
|
||
"PE": "px_last/is_eps",
|
||
"PB": "PX_TO_BOOK_RATIO",
|
||
"Shareholders": "BS_NUM_OF_SHAREHOLDERS",
|
||
"Dividend_Payout_Ratio":"DVD_PAYOUT_RATIO",
|
||
"Total_Debt_Ratio":"TOT_DEBT_TO_TOT_ASSET",
|
||
"Net_Fixed_Asset_Turnover":"NET_FIX_ASSET_TURN",
|
||
"Asset_Turnover":"ASSET_TURNOVER",
|
||
"NetIncome_Growth":"earn_for_com_growth",
|
||
"Revenue_Growth":"sales_growth",
|
||
}
|
||
|
||
price_config= {
|
||
"Last_Price": "PX_LAST",
|
||
"Market_Cap":"cur_mkt_cap",
|
||
}
|
||
|
||
# 基础配置
|
||
stockcard_CONFIG = {
|
||
"period": 10,
|
||
"unit": 100000000
|
||
}
|
||
|
||
|
||
def process_stockcard_data(bquery, cursor, conn, company_code, period, currency):
|
||
"""
|
||
处理单个公司的股票卡片数据,包括基本信息、非货币数据、货币数据和价格数据,并将其存入数据库。
|
||
|
||
参数:
|
||
bquery: Bloomberg查询对象。
|
||
cursor: 数据库游标。
|
||
conn: 数据库连接。
|
||
company_code (str): 公司代码,例如 'IBM US Equity'。
|
||
period (int): 查询的年度周期数。
|
||
currency (str): 货币代码,例如 'USD', 'CNY'。
|
||
"""
|
||
try:
|
||
# 由于原代码中此处后续是文件读取和数据库连接操作,
|
||
# 为了保证资源的正确释放,添加 finally 子句
|
||
# 同时添加 except 子句来捕获可能的异常
|
||
# 处理输入信息
|
||
company_code = company_code + " Equity"
|
||
period = int(period)
|
||
currency = currency
|
||
|
||
# 获取基本信息
|
||
stockcard_data = get_basic_stock_data(bquery, company_code, currency)
|
||
|
||
# 插入数据到数据库
|
||
for data in stockcard_data:
|
||
cursor.execute("""
|
||
INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date)
|
||
VALUES (%s, %s, %s, %s, %s, %s)
|
||
""", (
|
||
data['Company_code'],
|
||
data['update_date'],
|
||
data['currency'],
|
||
data['indicator'],
|
||
data['value'],
|
||
data['value_date']
|
||
))
|
||
conn.commit()
|
||
|
||
# 处理非货币数据
|
||
process_non_currency_data(bquery, cursor, conn, company_code, period, currency)
|
||
|
||
# 处理货币数据
|
||
process_currency_data(bquery, cursor, conn, company_code, period, currency)
|
||
|
||
# 处理价格数据
|
||
process_price_data(bquery, cursor, conn, company_code, period, currency)
|
||
|
||
print(f"处理完成: {company_code}")
|
||
|
||
except Exception as e:
|
||
# 这里可以添加具体的异常处理逻辑,例如记录日志等
|
||
print(f"捕获到异常: {str(e)}")
|
||
print(f"处理数据时发生错误: {str(e)}")
|
||
raise
|
||
|
||
def get_basic_stock_data(bquery, company_code, currency):
|
||
"""
|
||
从Bloomberg获取股票的基础数据,包括公司名称、IPO日期、市盈率、市净率、股息收益率、市值和海外收入占比。
|
||
|
||
参数:
|
||
bquery: Bloomberg查询对象。
|
||
company_code (str): 公司代码,例如 'IBM US Equity'。
|
||
currency (str): 货币代码,用于市值查询,例如 'USD', 'CNY'。
|
||
|
||
返回:
|
||
list: 包含股票基础数据的字典列表,每个字典代表一个指标及其值。
|
||
"""
|
||
print(f"Processing company: {company_code}") # 调试输出
|
||
query = f"for(['{company_code.strip()}']) get(name,listing_date,pe_ratio,px_to_book_ratio,cur_mkt_cap(currency={currency}),PCT_REVENUE_FROM_FOREIGN_SOURCES)"
|
||
result = bquery.bql(query)
|
||
did_result = bquery.bdp([f"{company_code}"], ["DIVIDEND_12_MONTH_YIELD"])
|
||
IPO_date = bquery.bdp([f"{company_code}"], ["EQY_INIT_PO_DT"])
|
||
|
||
return [
|
||
{
|
||
'Company_code': company_code,
|
||
'update_date': datetime.now().strftime('%Y-%m-%d'),
|
||
'currency': currency,
|
||
'indicator': 'company_name',
|
||
'value': result[result['field'] == 'name']['value'].iloc[0],
|
||
'value_date': datetime.now().strftime('%Y-%m-%d')
|
||
},
|
||
{
|
||
'Company_code': company_code,
|
||
'update_date': datetime.now().strftime('%Y-%m-%d'),
|
||
'currency': currency,
|
||
'indicator': 'IPO_date',
|
||
'value': str(IPO_date['EQY_INIT_PO_DT'][0]),
|
||
'value_date': datetime.now().strftime('%Y-%m-%d')
|
||
},
|
||
{
|
||
'Company_code': company_code,
|
||
'update_date': datetime.now().strftime('%Y-%m-%d'),
|
||
'currency': currency,
|
||
'indicator': 'pe_ratio',
|
||
'value': str(result[result['field'] == 'pe_ratio']['value'].iloc[0]),
|
||
'value_date': datetime.now().strftime('%Y-%m-%d')
|
||
},
|
||
{
|
||
'Company_code': company_code,
|
||
'update_date': datetime.now().strftime('%Y-%m-%d'),
|
||
'currency': currency,
|
||
'indicator': 'pb_ratio',
|
||
'value': str(result[result['field'] == 'px_to_book_ratio']['value'].iloc[0]),
|
||
'value_date': datetime.now().strftime('%Y-%m-%d')
|
||
},
|
||
{
|
||
'Company_code': company_code,
|
||
'update_date': datetime.now().strftime('%Y-%m-%d'),
|
||
'currency': currency,
|
||
'indicator': 'dividend_yield',
|
||
'value': str(did_result['DIVIDEND_12_MONTH_YIELD'][0]),
|
||
'value_date': datetime.now().strftime('%Y-%m-%d')
|
||
},
|
||
{
|
||
'Company_code': company_code,
|
||
'update_date': datetime.now().strftime('%Y-%m-%d'),
|
||
'currency': currency,
|
||
'indicator': 'market_cap',
|
||
'value': str(result[result['field'].str.contains('cur_mkt_cap')]['value'].iloc[0]),
|
||
'value_date': datetime.now().strftime('%Y-%m-%d')
|
||
},
|
||
{
|
||
'Company_code': company_code,
|
||
'update_date': datetime.now().strftime('%Y-%m-%d'),
|
||
'currency': currency,
|
||
'indicator': 'Rev_Abroad',
|
||
'value': str(result[result['field'].str.contains('FOREIGN')]['value'].iloc[0]),
|
||
'value_date': datetime.now().strftime('%Y-%m-%d')
|
||
}
|
||
|
||
]
|
||
|
||
def process_currency_data(bquery, cursor, conn, company_code, period, currency):
|
||
"""
|
||
处理货币相关的财务数据,通过Bloomberg BQL查询获取指定公司、周期和货币的财务指标数据,
|
||
并将这些数据批量插入到 `stockcard` 表中。
|
||
|
||
参数:
|
||
bquery: Bloomberg查询对象。
|
||
cursor: 数据库游标。
|
||
conn: 数据库连接。
|
||
company_code (str): 公司代码,例如 'IBM US Equity'。
|
||
period (int): 查询的年度周期数。
|
||
currency (str): 货币代码,例如 'USD', 'CNY'。
|
||
"""
|
||
data_to_insert = []
|
||
for key, value_bql in currency_config.items():
|
||
time_series_query = f"for(['{company_code.strip()}']) get({value_bql}(currency={currency},fa_period_offset=range(-{period}A, 0A),fa_period_type=A))"
|
||
print(f"Executing currency series query for {key} ({currency}): {time_series_query}")
|
||
result = bquery.bql(time_series_query)
|
||
|
||
# 检查结果是否为空或不包含预期字段
|
||
if result.empty or 'value' not in result.columns or 'secondary_value' not in result.columns:
|
||
print(f"No data found for currency indicator: {key}")
|
||
continue
|
||
|
||
# 提取PERIOD_END_DATE对应的值和日期
|
||
filtered_results = result[result["secondary_name"] == 'PERIOD_END_DATE']
|
||
|
||
if filtered_results.empty:
|
||
print(f"No PERIOD_END_DATE found for currency indicator: {key}")
|
||
continue
|
||
|
||
for _, row in filtered_results.iterrows():
|
||
date_str = row["secondary_value"]
|
||
value = row["value"]
|
||
|
||
try:
|
||
# 确保日期格式正确
|
||
formatted_date = datetime.strptime(date_str[:10], '%Y-%m-%d').strftime('%Y-%m-%d')
|
||
# 转换值为浮点数,处理None、空字符串和'None'字符串
|
||
numeric_value = float(value) if value is not None and str(value).strip() and str(value).strip().lower() != 'none' else None
|
||
|
||
data_to_insert.append((
|
||
company_code,
|
||
datetime.now().strftime('%Y-%m-%d'),
|
||
currency,
|
||
key,
|
||
numeric_value,
|
||
formatted_date
|
||
))
|
||
except (ValueError, TypeError) as e:
|
||
print(f"Skipping invalid currency data for {key} on {date_str}: {value}. Error: {e}")
|
||
continue
|
||
|
||
if data_to_insert:
|
||
insert_query = """
|
||
INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date)
|
||
VALUES (%s, %s, %s, %s, %s, %s)
|
||
"""
|
||
try:
|
||
cursor.executemany(insert_query, data_to_insert)
|
||
conn.commit()
|
||
print(f"Successfully inserted {len(data_to_insert)} currency data points for {company_code}.")
|
||
except Exception as e:
|
||
conn.rollback() # 回滚所有数据
|
||
print(f"Failed to insert currency data for {company_code}. Error: {e}")
|
||
|
||
def process_price_data(bquery, cursor, conn, company_code, period, currency):
|
||
"""
|
||
处理价格数据并存入数据库。
|
||
通过Bloomberg BQL查询获取指定公司、周期和货币的价格指标数据,
|
||
然后将这些数据批量插入到 `stockcard` 表中。
|
||
|
||
参数:
|
||
bquery: Bloomberg查询对象。
|
||
cursor: 数据库游标。
|
||
conn: 数据库连接。
|
||
company_code (str): 公司代码,例如 'IBM US Equity'。
|
||
period (int): 查询的年度周期数。
|
||
currency (str): 货币代码 (例如: 'USD', 'CNY')。
|
||
"""
|
||
data_to_insert = []
|
||
for key, value_bql in price_config.items():
|
||
# 注意:价格数据查询通常是按日或按年,per='Y' 表示按年频率
|
||
time_series_query = f"for(['{company_code.strip()}']) get({value_bql}(currency={currency},fill='PREV', per='Y', start='-{period}Y'))"
|
||
print(f"Executing price series query for {key}: {time_series_query}")
|
||
result = bquery.bql(time_series_query)
|
||
|
||
if result.empty or 'value' not in result.columns or 'secondary_value' not in result.columns:
|
||
print(f"No price data found for indicator: {key}")
|
||
continue
|
||
|
||
# 提取DATE对应的值和日期
|
||
filtered_results = result[result["secondary_name"] == 'DATE']
|
||
|
||
if filtered_results.empty:
|
||
print(f"No DATE found for price indicator: {key}")
|
||
continue
|
||
|
||
for _, row in filtered_results.iterrows():
|
||
date_str = row["secondary_value"]
|
||
value = row["value"]
|
||
|
||
try:
|
||
formatted_date = datetime.strptime(date_str[:10], '%Y-%m-%d').strftime('%Y-%m-%d')
|
||
numeric_value = float(value) if value is not None and str(value).strip() and str(value).strip().lower() != 'none' else None
|
||
|
||
data_to_insert.append((
|
||
company_code,
|
||
datetime.now().strftime('%Y-%m-%d'),
|
||
currency,
|
||
key,
|
||
numeric_value,
|
||
formatted_date
|
||
))
|
||
except (ValueError, TypeError) as e:
|
||
print(f"Skipping invalid price data for {key} on {date_str}: {value}. Error: {e}")
|
||
continue
|
||
|
||
if data_to_insert:
|
||
insert_query = """
|
||
INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date)
|
||
VALUES (%s, %s, %s, %s, %s, %s)
|
||
"""
|
||
try:
|
||
cursor.executemany(insert_query, data_to_insert)
|
||
conn.commit()
|
||
print(f"Successfully inserted {len(data_to_insert)} price data points for {company_code}.")
|
||
except Exception as e:
|
||
conn.rollback() # 回滚所有数据
|
||
print(f"Failed to insert price data for {company_code}. Error: {e}")
|
||
|
||
def process_non_currency_data(bquery, cursor, conn, company_code, period, currency):
|
||
"""
|
||
处理非货币相关的财务数据,通过Bloomberg BQL查询获取指定公司和周期的非货币财务指标数据,
|
||
并将这些数据批量插入到 `stockcard` 表中。
|
||
|
||
参数:
|
||
bquery: Bloomberg查询对象。
|
||
cursor: 数据库游标。
|
||
conn: 数据库连接。
|
||
company_code (str): 公司代码,例如 'IBM US Equity'。
|
||
period (int): 查询的年度周期数。
|
||
currency (str): 货币代码 (例如: 'USD', 'CNY')。此参数在此函数中可能不直接用于BQL查询,但作为数据记录的一部分。
|
||
"""
|
||
data_to_insert = []
|
||
for key, value_bql in non_currency_config.items():
|
||
time_series_query = f"for(['{company_code.strip()}']) get({value_bql}(fa_period_offset=range(-{period}A, 0A),fa_period_type=A))"
|
||
print(f"Executing non-currency series query for {key}: {time_series_query}")
|
||
result = bquery.bql(time_series_query)
|
||
|
||
if result.empty or 'value' not in result.columns or 'secondary_value' not in result.columns:
|
||
print(f"No data found for non-currency indicator: {key}")
|
||
continue
|
||
|
||
# 提取PERIOD_END_DATE对应的值和日期
|
||
filtered_results = result[result["secondary_name"] == 'PERIOD_END_DATE']
|
||
|
||
if filtered_results.empty:
|
||
print(f"No PERIOD_END_DATE found for non-currency indicator: {key}")
|
||
continue
|
||
|
||
for _, row in filtered_results.iterrows():
|
||
date_str = row["secondary_value"]
|
||
value = row["value"]
|
||
|
||
try:
|
||
formatted_date = datetime.strptime(date_str[:10], '%Y-%m-%d').strftime('%Y-%m-%d')
|
||
numeric_value = float(value) if value is not None and str(value).strip() and str(value).strip().lower() != 'none' else None
|
||
|
||
data_to_insert.append((
|
||
company_code,
|
||
datetime.now().strftime('%Y-%m-%d'),
|
||
currency, # currency 即使不用于BQL查询,仍作为记录的一部分
|
||
key,
|
||
numeric_value,
|
||
formatted_date
|
||
))
|
||
except (ValueError, TypeError) as e:
|
||
print(f"Skipping invalid non-currency data for {key} on {date_str}: {value}. Error: {e}")
|
||
continue
|
||
|
||
if data_to_insert:
|
||
insert_query = """
|
||
INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date)
|
||
VALUES (%s, %s, %s, %s, %s, %s)
|
||
"""
|
||
try:
|
||
cursor.executemany(insert_query, data_to_insert)
|
||
conn.commit()
|
||
print(f"Successfully inserted {len(data_to_insert)} non-currency data points for {company_code}.")
|
||
except Exception as e:
|
||
conn.rollback() # 回滚所有数据
|
||
print(f"Failed to insert non-currency data for {company_code}. Error: {e}")
|
||
|
||
|
||
|
||
def get_waiting_list(cursor, conn):
|
||
"""
|
||
从数据库中获取待处理的公司列表。这些公司在 `waiting_list` 表中 `status` 字段为 0。
|
||
本函数仅负责查询并返回列表,不负责更新 `status` 字段。
|
||
|
||
参数:
|
||
cursor: 数据库游标。
|
||
conn: 数据库连接。
|
||
|
||
返回:
|
||
pd.DataFrame: 包含待处理公司代码和对应货币的DataFrame,列名为 "member_ticker" 和 "currency"。
|
||
如果等待列表为空,则返回空的DataFrame。
|
||
"""
|
||
try:
|
||
# 查询数据库是否存在等待列表,并按更新日期排序,限制数量
|
||
query = "SELECT id, company_code, currency FROM waiting_list WHERE status = 0 ORDER BY update_date ASC LIMIT 300"
|
||
cursor.execute(query)
|
||
result = cursor.fetchall()
|
||
|
||
if result:
|
||
print(f"读到等待列表,待处理公司数量: {len(result)}")
|
||
# 获取company_code和currency列表
|
||
companies_and_currencies = [(row[0], row[1], row[2]) for row in result]
|
||
# 返回DataFrame,不再在此处更新status
|
||
return pd.DataFrame(companies_and_currencies, columns=["id", "member_ticker", "currency"])
|
||
else:
|
||
print("等待列表为空")
|
||
return pd.DataFrame(columns=["id", "member_ticker", "currency"])
|
||
except Exception as e:
|
||
print(f"获取等待列表失败: {str(e)}")
|
||
return pd.DataFrame(columns=["id", "member_ticker", "currency"])
|
||
|
||
def main():
|
||
"""
|
||
主函数,负责初始化Bloomberg查询和数据库连接,处理等待列表中的公司数据,并更新状态。
|
||
|
||
流程:
|
||
1. 初始化Bloomberg查询服务
|
||
2. 连接数据库
|
||
3. 获取等待列表
|
||
4. 处理每家公司数据
|
||
5. 更新处理状态
|
||
6. 关闭所有资源
|
||
"""
|
||
# 数据库连接配置
|
||
DB_HOST = "aws-0-ap-northeast-1.pooler.supabase.com"
|
||
DB_PORT = "5432"
|
||
DB_NAME = "postgres"
|
||
DB_USER = "postgres.kzexzbtpbnufbvrvkuae"
|
||
DB_PASSWORD = "cAuNDnJv0aj1NW9l"
|
||
|
||
# 初始化Bloomberg查询对象、数据库连接和游标
|
||
bquery = None
|
||
conn = None
|
||
cursor = None
|
||
|
||
try:
|
||
try:
|
||
bquery = blp.BlpQuery().start()
|
||
except Exception as e:
|
||
print(f"初始化bloomberg失败: {str(e)}")
|
||
raise # 重新抛出异常,确保外部调用者知道初始化失败
|
||
|
||
try:
|
||
# 构建连接字符串
|
||
conn_string = f"host={DB_HOST} port={DB_PORT} dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD} gssencmode=disable"
|
||
# 建立连接
|
||
conn = psycopg2.connect(conn_string)
|
||
cursor = conn.cursor()
|
||
print("成功连接到 Supabase Session Pooler!")
|
||
except Exception as e:
|
||
print(f"连接Supabase Session Pooler或查询失败: {e}")
|
||
|
||
# 获取等待列表
|
||
waiting_list = get_waiting_list(cursor, conn)
|
||
|
||
if not waiting_list.empty:
|
||
print(f"开始处理 {len(waiting_list)} 家公司数据")
|
||
# 处理每家公司数据
|
||
for _, row in waiting_list.iterrows():
|
||
id = int(row["id"])
|
||
company_code = str(row["member_ticker"]) # 确保 company_code 是字符串
|
||
currency = row["currency"]
|
||
try:
|
||
process_stockcard_data(bquery, cursor, conn, company_code, stockcard_CONFIG["period"], currency)
|
||
# 更新处理状态和更新日期
|
||
try:
|
||
cursor.execute("UPDATE waiting_list SET status = 1, update_date = NOW() WHERE id = %s", (id,))
|
||
conn.commit()
|
||
print(f"成功更新等待列表ID: {id} 的状态和日期。")
|
||
except Exception as e:
|
||
conn.rollback()
|
||
print(f"更新等待列表ID: {id} 失败: {str(e)}")
|
||
print(f"成功处理并更新状态: {company_code}")
|
||
except Exception as e:
|
||
print(f"处理 {company_code} 时出错: {str(e)}")
|
||
conn.rollback()
|
||
# 执行删除重复数据的SQL
|
||
try:
|
||
cursor.execute('''
|
||
WITH DuplicateRows AS (
|
||
SELECT
|
||
id, -- 现在我们可以使用这个新的主键
|
||
ROW_NUMBER() OVER(
|
||
PARTITION BY company_code, currency, indicator, value_date
|
||
ORDER BY update_date DESC
|
||
) as rn
|
||
FROM
|
||
stockcard
|
||
)
|
||
DELETE FROM stockcard
|
||
WHERE id IN (
|
||
SELECT id
|
||
FROM DuplicateRows
|
||
WHERE rn > 1
|
||
);
|
||
''')
|
||
conn.commit()
|
||
print(f"成功执行删除重复数据SQL,共删除 {cursor.rowcount} 条记录。")
|
||
except Exception as e:
|
||
print(f"执行 RemoveDuplicateStockcardEntries.sql 失败: {e}")
|
||
conn.rollback()
|
||
# 执行更新唯一公司代码的SQL
|
||
|
||
try:
|
||
cursor.execute('''
|
||
-- 如果物化视图存在,则先删除它,以解决“relation already exists”错误
|
||
DROP MATERIALIZED VIEW IF EXISTS public.unique_company_codes;
|
||
|
||
-- 创建物化视图 public.unique_company_codes,包含唯一的公司代码和对应的公司名称
|
||
CREATE MATERIALIZED VIEW public.unique_company_codes AS
|
||
SELECT
|
||
s.Company_code, -- 公司代码
|
||
-- 使用子查询确保每个公司代码只关联一个公司名称
|
||
(SELECT cn_sub.value
|
||
FROM public.stockcard AS cn_sub
|
||
WHERE cn_sub.Company_code = s.Company_code
|
||
AND cn_sub.indicator = 'company_name'
|
||
ORDER BY cn_sub.value ASC -- 如果存在多个公司名称,则按字母顺序选择第一个,以确保唯一性
|
||
LIMIT 1) AS company_name -- 只取一个匹配的 company_name
|
||
FROM
|
||
-- 先选择出所有不为 NULL 的唯一公司代码
|
||
(SELECT DISTINCT Company_code FROM public.stockcard WHERE Company_code IS NOT NULL) s
|
||
ORDER BY
|
||
s.Company_code; -- 按公司代码排序,方便查看和使用
|
||
|
||
-- 在物化视图的 Company_code 列上创建索引,以便后续查询该视图时更快
|
||
-- 注意:在创建物化视图之后,需要重新创建索引
|
||
CREATE INDEX idx_unique_company_codes_company_code ON public.unique_company_codes (Company_code);
|
||
|
||
''')
|
||
conn.commit()
|
||
print("成功执行 UniqueCompanyCodes.sql 中的 SQL 语句(创建/刷新物化视图及索引)。")
|
||
except Exception as e:
|
||
print(f"执行 UniqueCompanyCodes.sql 失败: {e}")
|
||
conn.rollback()
|
||
else:
|
||
print("没有待处理的公司")
|
||
|
||
except Exception as e:
|
||
print(f"主程序出错: {str(e)}")
|
||
finally:
|
||
# 确保关闭所有资源
|
||
if cursor:
|
||
cursor.close()
|
||
if conn:
|
||
conn.close()
|
||
if bquery:
|
||
bquery.stop()
|
||
print("程序执行完毕,所有资源已释放")
|
||
|
||
if __name__ == "__main__":
|
||
main() # 重新抛出异常,确保外部调用者知道初始化失败
|
||
|
||
|