feat: add support for quarterly and semiannual financial data frequencies

This commit is contained in:
xucheng 2026-01-22 20:37:23 +08:00
parent e5e72205e8
commit 5bce35d504
13 changed files with 355 additions and 114 deletions

View File

@ -81,7 +81,8 @@ async def fetch_data(
symbol=request.symbol, symbol=request.symbol,
data_source=request.data_source, data_source=request.data_source,
update_id=data_update.id, update_id=data_update.id,
currency=request.currency currency=request.currency,
frequency=request.frequency
) )
return FetchDataResponse( return FetchDataResponse(
@ -98,7 +99,8 @@ def fetch_data_background(
symbol: str, symbol: str,
data_source: str, data_source: str,
update_id: int, update_id: int,
currency: str = None currency: str = None,
frequency: str = "Annual"
): ):
"""后台数据获取任务 - 完全同步执行,避免event loop冲突""" """后台数据获取任务 - 完全同步执行,避免event loop冲突"""
import sys import sys
@ -120,7 +122,8 @@ def fetch_data_background(
symbol=symbol, symbol=symbol,
data_source=data_source, data_source=data_source,
update_id=update_id, update_id=update_id,
currency=currency currency=currency,
frequency=frequency
) )
# 更新数据更新记录 - 使用psycopg2同步连接 # 更新数据更新记录 - 使用psycopg2同步连接
@ -257,6 +260,7 @@ async def get_fetch_status(
async def get_financial_data( async def get_financial_data(
company_id: int, company_id: int,
data_source: str, data_source: str,
frequency: str = "Annual",
db: AsyncSession = Depends(get_db) db: AsyncSession = Depends(get_db)
): ):
""" """
@ -268,6 +272,7 @@ async def get_financial_data(
data = await data_fetcher_service.get_financial_data_from_db( data = await data_fetcher_service.get_financial_data_from_db(
company_id=company_id, company_id=company_id,
data_source=data_source, data_source=data_source,
frequency=frequency,
db=db db=db
) )

View File

@ -277,12 +277,17 @@ class BloombergClient:
logger.error(f"DB Connection Error: {e}") logger.error(f"DB Connection Error: {e}")
return None return None
def _ensure_schema(self, conn): def _ensure_schema(self, conn, table_name="stockcard"):
"""Ensure the necessary tables exist.""" """Ensure the necessary tables exist."""
with conn.cursor() as cur: with conn.cursor() as cur:
# Create table if not exists (Inferred schema from legacy use) # Create table if not exists (Inferred schema from legacy use)
cur.execute(""" # Use format string for table name as it cannot be parameterized directly in DDL
CREATE TABLE IF NOT EXISTS stockcard ( # Validate table_name to prevent SQL injection (simple whitelist or strict formatting)
if table_name not in ["stockcard", "stockcard_quarter", "stockcard_semiannual"]:
raise ValueError(f"Invalid table name: {table_name}")
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {table_name} (
id SERIAL PRIMARY KEY, id SERIAL PRIMARY KEY,
Company_code TEXT, Company_code TEXT,
update_date DATE, update_date DATE,
@ -296,31 +301,29 @@ class BloombergClient:
# Ensure column exists if table was already created # Ensure column exists if table was already created
try: try:
cur.execute("ALTER TABLE stockcard ADD COLUMN IF NOT EXISTS source TEXT;") cur.execute(f"ALTER TABLE {table_name} ADD COLUMN IF NOT EXISTS source TEXT;")
except Exception: except Exception:
conn.rollback() # Should not happen with IF NOT EXISTS but good practice conn.rollback() # Should not happen with IF NOT EXISTS but good practice
# Migrate update_date to TIMESTAMP # Migrate update_date to TIMESTAMP
try: try:
cur.execute("ALTER TABLE stockcard ALTER COLUMN update_date TYPE TIMESTAMP USING update_date::timestamp;") cur.execute(f"ALTER TABLE {table_name} ALTER COLUMN update_date TYPE TIMESTAMP USING update_date::timestamp;")
except Exception as e: except Exception as e:
# Likely already converted or failed, log but don't crash # Likely already converted or failed, log but don't crash
logger.info(f"Schema migration note (update_date): {e}") logger.info(f"Schema migration note (update_date) for {table_name}: {e}")
conn.rollback() conn.rollback()
conn.commit() conn.commit()
def save_data(self, data_list): def save_data(self, data_list, table_name="stockcard"):
"""Insert a list of data dictionaries into the database.""" """Insert a list of data dictionaries into the database."""
if not data_list: if not data_list:
return return
conn = self._get_db_connection() conn = self._get_db_connection()
if not conn: return if not conn: return
self._ensure_schema(conn) self._ensure_schema(conn, table_name)
try: try:
with conn.cursor() as cur: with conn.cursor() as cur:
@ -337,28 +340,31 @@ class BloombergClient:
'bloomberg' 'bloomberg'
)) ))
query = """ query = f"""
INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date, source) INSERT INTO {table_name} (Company_code, update_date, currency, indicator, value, value_date, source)
VALUES (%s, %s, %s, %s, %s, %s, %s) VALUES (%s, %s, %s, %s, %s, %s, %s)
""" """
cur.executemany(query, args_list) cur.executemany(query, args_list)
conn.commit() conn.commit()
logger.info(f"✅ Saved {len(data_list)} records to database.") logger.info(f"✅ Saved {len(data_list)} records to database table '{table_name}'.")
except Exception as e: except Exception as e:
logger.error(f"Error saving to stockcard: {e}") logger.error(f"Error saving to {table_name}: {e}")
conn.rollback() conn.rollback()
finally: finally:
conn.close() conn.close()
def run_cleanup(self): def run_cleanup(self, table_name="stockcard"):
"""Run deduplication and view refresh logic.""" """Run deduplication and view refresh logic."""
conn = self._get_db_connection() conn = self._get_db_connection()
if not conn: return if not conn: return
if table_name not in ["stockcard", "stockcard_quarter", "stockcard_semiannual"]:
return
try: try:
with conn.cursor() as cur: with conn.cursor() as cur:
# 1. Deduplication # 1. Deduplication
cur.execute(''' cur.execute(f'''
WITH DuplicateRows AS ( WITH DuplicateRows AS (
SELECT SELECT
id, id,
@ -367,9 +373,9 @@ class BloombergClient:
ORDER BY update_date DESC, id DESC ORDER BY update_date DESC, id DESC
) as rn ) as rn
FROM FROM
stockcard {table_name}
) )
DELETE FROM stockcard DELETE FROM {table_name}
WHERE id IN ( WHERE id IN (
SELECT id SELECT id
FROM DuplicateRows FROM DuplicateRows
@ -377,7 +383,11 @@ class BloombergClient:
); );
''') ''')
# 2. Materialized View Refresh # 2. Materialized View Refresh (Only for main table for now, or unified?)
# For now, unique_company_codes likely relies on the main table for list of companies.
# We can skip updating the view if it's just meant for company discovery,
# assuming stockcard (annual) covers all companies.
if table_name == "stockcard":
cur.execute(''' cur.execute('''
CREATE MATERIALIZED VIEW IF NOT EXISTS public.unique_company_codes AS CREATE MATERIALIZED VIEW IF NOT EXISTS public.unique_company_codes AS
SELECT SELECT
@ -400,18 +410,19 @@ class BloombergClient:
pass pass
conn.commit() conn.commit()
logger.info("✅ Cleanup and View Refresh completed.") logger.info(f"✅ Cleanup completed for {table_name}.")
except Exception as e: except Exception as e:
logger.error(f"❌ Cleanup failed: {e}") logger.error(f"❌ Cleanup failed for {table_name}: {e}")
conn.rollback() conn.rollback()
finally: finally:
conn.close() conn.close()
# --- Core Fetching Logic --- # --- Core Fetching Logic ---
def fetch_company(self, market, symbol, progress_callback=None, force_currency=None): def fetch_company(self, market, symbol, progress_callback=None, force_currency=None, period="YEARLY"):
""" """
Main entry point to fetch data for a single company. Main entry point to fetch data for a single company.
period: 'YEARLY' or 'QUARTERLY'
""" """
# Determine Bloomberg Ticker format # Determine Bloomberg Ticker format
# If symbol already has Equity, use it. Else append. # If symbol already has Equity, use it. Else append.
@ -440,8 +451,15 @@ class BloombergClient:
today_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') today_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
logger.info(f"🚀 Starting fetch for: {company_code}") # Determine Table Name
if progress_callback: progress_callback("Starting Bloomberg session...", 0) table_name = "stockcard"
if period == "QUARTERLY":
table_name = "stockcard_quarter"
elif period == "SEMI_ANNUALLY":
table_name = "stockcard_semiannual"
logger.info(f"🚀 Starting fetch for: {company_code} (Period: {period}, Table: {table_name})")
if progress_callback: progress_callback(f"Starting Bloomberg session ({period})...", 0)
# 0. Prep Remote Environment # 0. Prep Remote Environment
init_code = """ init_code = """
@ -482,28 +500,32 @@ if 'bquery' not in globals():
logger.info("Fetching Basic Data...") logger.info("Fetching Basic Data...")
if progress_callback: progress_callback("Fetching Company Basic Info...", 10) if progress_callback: progress_callback("Fetching Company Basic Info...", 10)
try: try:
# Basic data is always same regardless of period (it's metadata)
# We save it to the target table to ensure FK/Consistency if needed,
# or just save to main table?
# For simplicity and self-containment, save to target table too.
basic_data = self._fetch_basic_remote(company_code, currency, query_ticker=query_ticker) basic_data = self._fetch_basic_remote(company_code, currency, query_ticker=query_ticker)
logger.info(f"DEBUG: basic_data before save: {basic_data}") logger.info(f"DEBUG: basic_data before save: {basic_data}")
self.save_data(basic_data) self.save_data(basic_data, table_name=table_name)
except Exception as e: except Exception as e:
logger.error(f"Error fetching basic data: {e}") logger.error(f"Error fetching basic data: {e}")
# 3. Fetch Currency Data # 3. Fetch Currency Data
logger.info("Fetching Currency Data...") logger.info("Fetching Currency Data...")
if progress_callback: progress_callback(f"正在获取货币指标 ({currency})...", 30) if progress_callback: progress_callback(f"正在获取货币指标 ({period})...", 30)
curr_data = [] curr_data = []
try: try:
curr_data = self._fetch_series_remote(company_code, currency, CURRENCY_CONFIG, "currency", query_ticker=query_ticker) curr_data = self._fetch_series_remote(company_code, currency, CURRENCY_CONFIG, "currency", query_ticker=query_ticker, period=period)
self.save_data(curr_data) self.save_data(curr_data, table_name=table_name)
except Exception as e: except Exception as e:
logger.error(f"Error fetching currency series: {e}") logger.error(f"Error fetching currency series: {e}")
# 4. Fetch Non-Currency Data # 4. Fetch Non-Currency Data
logger.info("Fetching Non-Currency Data...") logger.info("Fetching Non-Currency Data...")
if progress_callback: progress_callback("正在获取非货币指标...", 50) if progress_callback: progress_callback(f"正在获取非货币指标 ({period})...", 50)
try: try:
non_curr_data = self._fetch_series_remote(company_code, currency, NON_CURRENCY_CONFIG, "non_currency", query_ticker=query_ticker) non_curr_data = self._fetch_series_remote(company_code, currency, NON_CURRENCY_CONFIG, "non_currency", query_ticker=query_ticker, period=period)
self.save_data(non_curr_data) self.save_data(non_curr_data, table_name=table_name)
except Exception as e: except Exception as e:
logger.error(f"Error fetching non-currency series: {e}") logger.error(f"Error fetching non-currency series: {e}")
@ -527,7 +549,7 @@ if 'bquery' not in globals():
logger.info(f"Found {len(revenue_dates)} revenue reporting dates. Fetching aligned price data...") logger.info(f"Found {len(revenue_dates)} revenue reporting dates. Fetching aligned price data...")
try: try:
price_data = self._fetch_price_by_dates_remote(company_code, currency, revenue_dates, query_ticker=query_ticker) price_data = self._fetch_price_by_dates_remote(company_code, currency, revenue_dates, query_ticker=query_ticker)
self.save_data(price_data) self.save_data(price_data, table_name=table_name)
except Exception as e: except Exception as e:
logger.error(f"Error fetching aligned price data: {e}") logger.error(f"Error fetching aligned price data: {e}")
else: else:
@ -540,13 +562,13 @@ if 'bquery' not in globals():
try: try:
price_data = self._fetch_price_by_dates_remote(company_code, currency, fallback_dates, query_ticker=query_ticker) price_data = self._fetch_price_by_dates_remote(company_code, currency, fallback_dates, query_ticker=query_ticker)
self.save_data(price_data) self.save_data(price_data, table_name=table_name)
except Exception as e: except Exception as e:
logger.error(f"Error fetching fallback price data: {e}") logger.error(f"Error fetching fallback price data: {e}")
# 5. Cleanup # 5. Cleanup
if progress_callback: progress_callback("Finalizing data...", 90) if progress_callback: progress_callback("Finalizing data...", 90)
self.run_cleanup() self.run_cleanup(table_name=table_name)
logger.info(f"✅ Completed processing for {company_code}") logger.info(f"✅ Completed processing for {company_code}")
if progress_callback: progress_callback("Bloomberg data sync complete", 100) if progress_callback: progress_callback("Bloomberg data sync complete", 100)
@ -639,7 +661,7 @@ get_basic()
""" """
return self._execute_and_parse(code) return self._execute_and_parse(code)
def _fetch_series_remote(self, company_code, currency, config_dict, result_type, query_ticker=None): def _fetch_series_remote(self, company_code, currency, config_dict, result_type, query_ticker=None, period="YEARLY"):
"""Generates code to fetch series data using BDH""" """Generates code to fetch series data using BDH"""
target_ticker = query_ticker if query_ticker else company_code target_ticker = query_ticker if query_ticker else company_code
@ -648,6 +670,15 @@ get_basic()
# Calculate start date # Calculate start date
end_date = datetime.now() end_date = datetime.now()
if period == "QUARTERLY":
# Recent 12 quarters = 3 years = ~1095 days.
# Adding small buffer to 1100 days to be safe.
start_date = end_date - timedelta(days=1100)
elif period == "SEMI_ANNUALLY":
# Recent 12 periods = 6 years = ~2200 days.
start_date = end_date - timedelta(days=2200)
else:
start_date = end_date - timedelta(days=period_years*365) start_date = end_date - timedelta(days=period_years*365)
# Format dates for BDH (YYYYMMDD) # Format dates for BDH (YYYYMMDD)
@ -657,7 +688,7 @@ get_basic()
# BDH Options # BDH Options
bdh_options = { bdh_options = {
"periodicityAdjustment": "FISCAL", "periodicityAdjustment": "FISCAL",
"periodicitySelection": "YEARLY", "periodicitySelection": period, # YEARLY or QUARTERLY
"currency": currency, "currency": currency,
# "nonTradingDayFillOption": "NON_TRADING_WEEKDAYS", # "nonTradingDayFillOption": "NON_TRADING_WEEKDAYS",
# "nonTradingDayFillMethod": "PREVIOUS_VALUE" # "nonTradingDayFillMethod": "PREVIOUS_VALUE"

View File

@ -105,11 +105,23 @@ class BloombergFetcher(DataFetcher):
except Exception as e: except Exception as e:
print(f"Bloomberg fetch failed (ignoring, checking DB): {e}") print(f"Bloomberg fetch failed (ignoring, checking DB): {e}")
def sync_all_data(self, symbol: str, progress_callback=None, force_currency=None): def sync_all_data(self, symbol: str, progress_callback=None, force_currency=None, frequency="Annual"):
""" """
Sync all data for the company. Sync all data for the company.
Delegates to the universal client. Delegates to the universal client.
frequency: 'Annual' (default) or 'Quarter'.
""" """
period = "YEARLY"
if frequency == "Quarterly" or frequency == "Quarter":
period = "QUARTERLY"
elif frequency == "Semiannual" or frequency == "Semiannually":
period = "SEMI_ANNUALLY"
# Pass period to client
if 'period' in self.client.fetch_company.__code__.co_varnames:
self.client.fetch_company(self.market, symbol, progress_callback=progress_callback, force_currency=force_currency, period=period)
else:
# Fallback for old client code (should not happen if client updated first)
self.client.fetch_company(self.market, symbol, progress_callback=progress_callback, force_currency=force_currency) self.client.fetch_company(self.market, symbol, progress_callback=progress_callback, force_currency=force_currency)
def get_income_statement(self, symbol: str) -> pd.DataFrame: def get_income_statement(self, symbol: str) -> pd.DataFrame:
@ -143,3 +155,4 @@ class BloombergFetcher(DataFetcher):
def get_financial_ratios(self, symbol: str) -> pd.DataFrame: def get_financial_ratios(self, symbol: str) -> pd.DataFrame:
"""兼容性空方法""" """兼容性空方法"""
return pd.DataFrame() return pd.DataFrame()

View File

@ -48,6 +48,7 @@ class FetchDataRequest(BaseModel):
data_source: str data_source: str
force_refresh: bool = False force_refresh: bool = False
currency: Optional[str] = None currency: Optional[str] = None
frequency: Optional[str] = "Annual"
class FetchDataResponse(BaseModel): class FetchDataResponse(BaseModel):
update_id: int update_id: int

View File

@ -15,7 +15,8 @@ logger = logging.getLogger(__name__)
async def get_bloomberg_data( async def get_bloomberg_data(
company: Company, company: Company,
db: AsyncSession db: AsyncSession,
frequency: str = "Annual"
) -> List[Dict]: ) -> List[Dict]:
""" """
获取指定公司的 Bloomberg 财务数据 获取指定公司的 Bloomberg 财务数据
@ -23,11 +24,25 @@ async def get_bloomberg_data(
Args: Args:
company: 公司对象 company: 公司对象
db: 数据库会话 db: 数据库会话
frequency: 'Annual' or 'Quarterly'
Returns: Returns:
List[Dict]: 统一格式的财务数据列表 List[Dict]: 统一格式的财务数据列表
""" """
try: try:
# Determine table name
table_name = "stockcard"
if frequency == "Quarterly" or frequency == "Quarter":
table_name = "stockcard_quarter"
elif frequency == "Semiannual" or frequency == "Semiannually":
table_name = "stockcard_semiannual"
# Check if table exists (to avoid UndefinedTableError on first run)
check_table_sql = text("SELECT to_regclass(:table_name)")
table_exists = await db.execute(check_table_sql, {"table_name": f"public.{table_name}"})
if not table_exists.scalar():
return []
# 1. 查找对应的 Company_code # 1. 查找对应的 Company_code
# stockcard 中存储的是 "AAPL US Equity" 而 symbol 是 "AAPL" # stockcard 中存储的是 "AAPL US Equity" 而 symbol 是 "AAPL"
target_code = None target_code = None
@ -37,8 +52,9 @@ async def get_bloomberg_data(
possible_codes = [f"{company.symbol}{s}" for s in suffixes] possible_codes = [f"{company.symbol}{s}" for s in suffixes]
# 检查哪个存在 # 检查哪个存在
# Use dynamic table name in SQL (safe since we control table_name variable)
for code in possible_codes: for code in possible_codes:
check_sql = text("SELECT 1 FROM stockcard WHERE Company_code = :code LIMIT 1") check_sql = text(f"SELECT 1 FROM {table_name} WHERE Company_code = :code LIMIT 1")
exists = await db.execute(check_sql, {"code": code}) exists = await db.execute(check_sql, {"code": code})
if exists.scalar(): if exists.scalar():
target_code = code target_code = code
@ -46,22 +62,22 @@ async def get_bloomberg_data(
# 如果没找到,尝试模糊匹配 (作为兜底) # 如果没找到,尝试模糊匹配 (作为兜底)
if not target_code: if not target_code:
fuzzy_sql = text("SELECT Company_code FROM stockcard WHERE Company_code LIKE :symbol LIMIT 1") fuzzy_sql = text(f"SELECT Company_code FROM {table_name} WHERE Company_code LIKE :symbol LIMIT 1")
fuzzy_res = await db.execute(fuzzy_sql, {"symbol": f"%{company.symbol}%"}) fuzzy_res = await db.execute(fuzzy_sql, {"symbol": f"%{company.symbol}%"})
row = fuzzy_res.fetchone() row = fuzzy_res.fetchone()
if row: if row:
target_code = row[0] target_code = row[0]
if not target_code: if not target_code:
logger.warning(f"No Bloomberg data found for symbol: {company.symbol}") logger.warning(f"No Bloomberg data found for symbol: {company.symbol} in {table_name}")
return [] return []
# 2. 获取该公司的所有数据 # 2. 获取该公司的所有数据
# schema: indicator, value, value_date (作为报告期) # schema: indicator, value, value_date (作为报告期)
# Added update_date # Added update_date
query = text(""" query = text(f"""
SELECT indicator, value, value_date, currency, update_date SELECT indicator, value, value_date, currency, update_date
FROM stockcard FROM {table_name}
WHERE Company_code = :code WHERE Company_code = :code
""") """)
result = await db.execute(query, {"code": target_code}) result = await db.execute(query, {"code": target_code})
@ -115,5 +131,5 @@ async def get_bloomberg_data(
return full_list return full_list
except Exception as e: except Exception as e:
logger.error(f"Error fetching Bloomberg data from stockcard: {e}", exc_info=True) logger.error(f"Error fetching Bloomberg data from {table_name}: {e}", exc_info=True)
return [] return []

View File

@ -210,7 +210,8 @@ def fetch_financial_data_sync(
symbol: str, symbol: str,
data_source: str, data_source: str,
update_id: int, update_id: int,
currency: Optional[str] = None currency: Optional[str] = None,
frequency: Optional[str] = "Annual"
): ):
""" """
同步方式获取财务数据在后台任务中调用 同步方式获取财务数据在后台任务中调用
@ -219,7 +220,8 @@ def fetch_financial_data_sync(
""" """
try: try:
# 0. 初始化 # 0. 初始化
update_progress_sync(update_id, "正在初始化数据获取...", 0) display_freq = "季度" if frequency == "Quarterly" or frequency == "Quarter" else "年度"
update_progress_sync(update_id, f"正在初始化数据获取 ({display_freq})...", 0)
# 格式化股票代码 - CN市场需要添加.SH或.SZ后缀 # 格式化股票代码 - CN市场需要添加.SH或.SZ后缀
formatted_symbol = symbol formatted_symbol = symbol
@ -258,13 +260,17 @@ def fetch_financial_data_sync(
# 检查 sync_all_data 是否接受 progress_callback 参数 # 检查 sync_all_data 是否接受 progress_callback 参数
import inspect import inspect
sig = inspect.signature(fetcher.sync_all_data) sig = inspect.signature(fetcher.sync_all_data)
kwargs = {}
if 'progress_callback' in sig.parameters: if 'progress_callback' in sig.parameters:
kwargs['progress_callback'] = progress_callback
if 'force_currency' in sig.parameters: if 'force_currency' in sig.parameters:
fetcher.sync_all_data(formatted_symbol, progress_callback=progress_callback, force_currency=currency) kwargs['force_currency'] = currency
else: if 'frequency' in sig.parameters:
fetcher.sync_all_data(formatted_symbol, progress_callback=progress_callback) kwargs['frequency'] = frequency
else:
fetcher.sync_all_data(formatted_symbol) fetcher.sync_all_data(formatted_symbol, **kwargs)
else: else:
# 兼容旧代码,虽然有了 sync_all_data 后这部分应该不需要了 # 兼容旧代码,虽然有了 sync_all_data 后这部分应该不需要了
fetcher.get_income_statement(formatted_symbol) fetcher.get_income_statement(formatted_symbol)
@ -309,7 +315,8 @@ def fetch_financial_data_sync(
async def get_financial_data_from_db( async def get_financial_data_from_db(
company_id: int, company_id: int,
data_source: str, data_source: str,
db: AsyncSession db: AsyncSession,
frequency: str = "Annual"
) -> Dict: ) -> Dict:
""" """
从数据库读取财务数据 从数据库读取财务数据
@ -318,6 +325,7 @@ async def get_financial_data_from_db(
company_id: 公司ID company_id: 公司ID
data_source: 数据源 (iFinD, Bloomberg, Tushare) data_source: 数据源 (iFinD, Bloomberg, Tushare)
db: 数据库会话 db: 数据库会话
frequency: 'Annual' or 'Quarterly'
Returns: Returns:
包含所有财务数据的字典 包含所有财务数据的字典
@ -447,7 +455,7 @@ async def get_financial_data_from_db(
elif data_source == 'Bloomberg': elif data_source == 'Bloomberg':
try: try:
# 使用独立的 Bloomberg 服务读取数据 # 使用独立的 Bloomberg 服务读取数据
unified_data = await get_bloomberg_data(company, db) unified_data = await get_bloomberg_data(company, db, frequency=frequency)
response_data["unified_data"] = unified_data response_data["unified_data"] = unified_data
response_data["income_statement"] = [] response_data["income_statement"] = []

View File

@ -0,0 +1,51 @@
import os
import psycopg2
from dotenv import load_dotenv
load_dotenv()
def create_table():
db_host = os.getenv("DB_HOST", "192.168.3.195")
db_user = os.getenv("DB_USER", "value")
db_pass = os.getenv("DB_PASSWORD", "Value609!")
db_name = os.getenv("DB_NAME", "fa3")
db_port = os.getenv("DB_PORT", "5432")
try:
conn = psycopg2.connect(
host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port
)
cur = conn.cursor()
print("Creating stockcard_quarter table...")
create_sql = """
CREATE TABLE IF NOT EXISTS stockcard_quarter (
id SERIAL PRIMARY KEY,
company_code TEXT,
indicator TEXT,
value TEXT,
currency TEXT,
value_date DATE,
update_date TIMESTAMP WITHOUT TIME ZONE,
source TEXT
);
"""
cur.execute(create_sql)
# Create indexes
print("Creating indexes...")
cur.execute("CREATE INDEX IF NOT EXISTS idx_stockcard_quarter_company_code ON stockcard_quarter(company_code);")
cur.execute("CREATE INDEX IF NOT EXISTS idx_stockcard_quarter_value_date ON stockcard_quarter(value_date);")
conn.commit()
print("Done!")
cur.close()
conn.close()
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
create_table()

View File

@ -0,0 +1,51 @@
import os
import psycopg2
from dotenv import load_dotenv
load_dotenv()
def create_table():
db_host = os.getenv("DB_HOST", "192.168.3.195")
db_user = os.getenv("DB_USER", "value")
db_pass = os.getenv("DB_PASSWORD", "Value609!")
db_name = os.getenv("DB_NAME", "fa3")
db_port = os.getenv("DB_PORT", "5432")
try:
conn = psycopg2.connect(
host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port
)
cur = conn.cursor()
print("Creating stockcard_semiannual table...")
create_sql = """
CREATE TABLE IF NOT EXISTS stockcard_semiannual (
id SERIAL PRIMARY KEY,
company_code TEXT,
indicator TEXT,
value TEXT,
currency TEXT,
value_date DATE,
update_date TIMESTAMP WITHOUT TIME ZONE,
source TEXT
);
"""
cur.execute(create_sql)
# Create indexes
print("Creating indexes...")
cur.execute("CREATE INDEX IF NOT EXISTS idx_stockcard_semiannual_company_code ON stockcard_semiannual(company_code);")
cur.execute("CREATE INDEX IF NOT EXISTS idx_stockcard_semiannual_value_date ON stockcard_semiannual(value_date);")
conn.commit()
print("Done!")
cur.close()
conn.close()
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
create_table()

View File

@ -325,6 +325,7 @@ function CompanyAnalysisView({
<Badge className="text-xs">{company.market}</Badge> <Badge className="text-xs">{company.market}</Badge>
</div> </div>
<div className="flex items-center gap-2 mr-4"> <div className="flex items-center gap-2 mr-4">
{dataSource !== 'Bloomberg' && (
<Button <Button
variant="outline" variant="outline"
size="sm" size="sm"
@ -335,6 +336,7 @@ function CompanyAnalysisView({
{(loading || fetching) ? <Loader2 className="h-4 w-4 animate-spin" /> : <RefreshCw className="h-4 w-4" />} {(loading || fetching) ? <Loader2 className="h-4 w-4 animate-spin" /> : <RefreshCw className="h-4 w-4" />}
{(loading || fetching) ? "更新中..." : "更新数据"} {(loading || fetching) ? "更新中..." : "更新数据"}
</Button> </Button>
)}
</div> </div>
<div className="flex items-center space-x-1 mr-6 border rounded-md p-1 bg-muted/20"> <div className="flex items-center space-x-1 mr-6 border rounded-md p-1 bg-muted/20">
@ -363,7 +365,7 @@ function CompanyAnalysisView({
</div> </div>
)} )}
{!fetching && !loading && <div className="text-xs text-muted-foreground flex items-center gap-1"> {!fetching && !loading && dataSource !== 'Bloomberg' && <div className="text-xs text-muted-foreground flex items-center gap-1">
<CheckCircle2 className="w-3 h-3 text-green-500" /> : {status?.last_update?.date ? formatTimestamp(status.last_update.date) : "无记录"} <CheckCircle2 className="w-3 h-3 text-green-500" /> : {status?.last_update?.date ? formatTimestamp(status.last_update.date) : "无记录"}
</div>} </div>}
</HeaderPortal> </HeaderPortal>
@ -384,6 +386,9 @@ function CompanyAnalysisView({
selectedCurrency={currency} selectedCurrency={currency}
userMarket={company.market} userMarket={company.market}
companyId={status.company_id} companyId={status.company_id}
companySymbol={company.symbol}
companyMarket={company.market}
companyName={company.company_name}
lastUpdate={status.last_update?.date} lastUpdate={status.last_update?.date}
/> />
) )

View File

@ -1,26 +1,43 @@
"use client" import { useEffect, useState, useMemo } from "react"
import { useEffect, useState } from "react"
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card" import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"
import { Table, TableBody, TableCell, TableHead, TableHeader, TableRow } from "@/components/ui/table" import { Table, TableBody, TableCell, TableHead, TableHeader, TableRow } from "@/components/ui/table"
import { Loader2, DollarSign, RefreshCw, ChevronRight, ChevronDown } from "lucide-react" import { Loader2, DollarSign, RefreshCw, ChevronRight, ChevronDown, CloudDownload } from "lucide-react"
import { Button } from "@/components/ui/button" import { Button } from "@/components/ui/button"
import { Tabs, TabsList, TabsTrigger } from "@/components/ui/tabs"
import { getFinancialData } from "@/lib/api" import { getFinancialData } from "@/lib/api"
import { formatNumber, formatLargeNumber, formatDate } from "@/lib/formatters" import { formatNumber, formatLargeNumber, formatDate } from "@/lib/formatters"
import type { FinancialDataResponse } from "@/lib/types" import type { FinancialDataResponse, SearchResult } from "@/lib/types"
import { useFinancialData } from "@/hooks/use-financial-data"
interface BloombergViewProps { interface BloombergViewProps {
companyId: number companyId: number
companySymbol?: string // Need symbol/market for hook
companyMarket?: string
companyName?: string
onBack?: () => void onBack?: () => void
selectedCurrency?: string selectedCurrency?: string
userMarket?: string userMarket?: string // This is likely same as companyMarket but passed from parent context
lastUpdate?: string lastUpdate?: string
} }
export function BloombergView({ companyId, onBack, selectedCurrency = "Auto", userMarket, lastUpdate }: BloombergViewProps) { export function BloombergView({ companyId, companySymbol, companyMarket, companyName, onBack, selectedCurrency = "Auto", userMarket, lastUpdate }: BloombergViewProps) {
const [data, setData] = useState<FinancialDataResponse | null>(null) const [data, setData] = useState<FinancialDataResponse | null>(null)
const [loading, setLoading] = useState(true) const [loading, setLoading] = useState(true)
const [error, setError] = useState("") const [error, setError] = useState("")
const [frequency, setFrequency] = useState<'Annual' | 'Quarterly' | 'Semiannual'>('Annual')
// Construct mock Company SearchResult for hook
// Note: In a real app we might pass the full object props
const companyObj: SearchResult | null = useMemo(() => {
return companySymbol && companyMarket ? {
symbol: companySymbol,
market: companyMarket,
company_name: companyName || companySymbol
} : null
}, [companySymbol, companyMarket, companyName])
// Use Hook
const { fetching, fetchData, updateStatus } = useFinancialData(companyObj, "Bloomberg")
const loadData = async () => { const loadData = async () => {
if (!companyId) return if (!companyId) return
@ -28,8 +45,8 @@ export function BloombergView({ companyId, onBack, selectedCurrency = "Auto", us
setLoading(true) setLoading(true)
setError("") setError("")
try { try {
console.log("Fetching Bloomberg data for company:", companyId) console.log(`Fetching Bloomberg data for company: ${companyId} (${frequency})`)
const result = await getFinancialData(companyId, "Bloomberg") const result = await getFinancialData(companyId, "Bloomberg", frequency)
console.log("Bloomberg data result:", result) console.log("Bloomberg data result:", result)
setData(result) setData(result)
} catch (err: any) { } catch (err: any) {
@ -42,18 +59,32 @@ export function BloombergView({ companyId, onBack, selectedCurrency = "Auto", us
useEffect(() => { useEffect(() => {
loadData() loadData()
}, [companyId, lastUpdate]) }, [companyId, lastUpdate, frequency])
if (loading) { // Re-load data when fetch completes
useEffect(() => {
if (updateStatus?.status === 'completed') {
loadData()
}
}, [updateStatus?.status])
const handleSync = () => {
if (companyObj) {
// Force refresh, with auto currency (undefined), and selected frequency
fetchData(true, undefined, frequency)
}
}
if (loading && !data) {
return ( return (
<div className="flex flex-col items-center justify-center min-h-[400px] space-y-4"> <div className="flex flex-col items-center justify-center min-h-[400px] space-y-4">
<Loader2 className="h-8 w-8 animate-spin text-primary" /> <Loader2 className="h-8 w-8 animate-spin text-primary" />
<p className="text-muted-foreground"> Bloomberg ...</p> <p className="text-muted-foreground"> Bloomberg ({frequency === 'Annual' ? '年度' : frequency === 'Semiannual' ? '半年度' : '季度'})...</p>
</div> </div>
) )
} }
if (error) { if (error && !data) {
return ( return (
<Card className="border-destructive"> <Card className="border-destructive">
<CardContent className="pt-6"> <CardContent className="pt-6">
@ -66,10 +97,10 @@ export function BloombergView({ companyId, onBack, selectedCurrency = "Auto", us
) )
} }
if (!data) return null if (!data && !fetching) return null
// 如果后端提供了统一数据字段,直接使用 // 如果后端提供了统一数据字段,直接使用
const mergedData = data.unified_data || [] const mergedData = data?.unified_data || []
return ( return (
<div className="space-y-6"> <div className="space-y-6">
@ -84,16 +115,42 @@ export function BloombergView({ companyId, onBack, selectedCurrency = "Auto", us
<BasicInfoHeader data={mergedData} selectedCurrency={selectedCurrency} userMarket={userMarket} /> <BasicInfoHeader data={mergedData} selectedCurrency={selectedCurrency} userMarket={userMarket} />
</div> </div>
<div className="flex gap-2 ml-auto"> <div className="flex gap-2 ml-auto items-center">
<Button variant="outline" size="sm" onClick={loadData}> {fetching && (
<RefreshCw className="w-4 h-4 mr-2" /> <div className="flex items-center gap-2 mr-2 text-sm text-muted-foreground">
<Loader2 className="h-3 w-3 animate-spin" />
<span>
{updateStatus?.progress_message || "正在同步..."} ({updateStatus?.progress_percentage || 0}%)
</span>
</div>
)}
<Tabs value={frequency} onValueChange={(v) => setFrequency(v as 'Annual' | 'Quarterly' | 'Semiannual')} className="w-[270px]">
<TabsList className="grid w-full grid-cols-3">
<TabsTrigger value="Annual"></TabsTrigger>
<TabsTrigger value="Semiannual"></TabsTrigger>
<TabsTrigger value="Quarterly"></TabsTrigger>
</TabsList>
</Tabs>
<Button variant="outline" size="sm" onClick={handleSync} disabled={fetching}>
{fetching ? <Loader2 className="w-4 h-4 mr-2 animate-spin" /> : <CloudDownload className="w-4 h-4 mr-2" />}
</Button>
<Button variant="ghost" size="icon" onClick={loadData} title="刷新本地视图">
<RefreshCw className="w-4 h-4" />
</Button> </Button>
</div> </div>
</div> </div>
</div> </div>
<RawDataTable title="财务数据总表" data={mergedData} selectedCurrency={selectedCurrency} userMarket={userMarket} /> <RawDataTable
title={frequency === 'Annual' ? "财务数据总表 (年度)" : frequency === 'Semiannual' ? "财务数据总表 (半年度)" : "财务数据总表 (季度)"}
data={mergedData}
selectedCurrency={selectedCurrency}
userMarket={userMarket}
/>
</div> </div>
) )
} }

View File

@ -58,7 +58,7 @@ export function useFinancialData(company: SearchResult | null, dataSource: strin
}, [updateId, fetching, checkStatus]) }, [updateId, fetching, checkStatus])
// Trigger data fetch // Trigger data fetch
const fetchData = async (forceRefresh = false, currency?: string) => { const fetchData = async (forceRefresh = false, currency?: string, frequency: 'Annual' | 'Quarterly' | 'Semiannual' = 'Annual') => {
if (!company) return if (!company) return
setFetching(true) setFetching(true)
setError("") setError("")
@ -69,7 +69,8 @@ export function useFinancialData(company: SearchResult | null, dataSource: strin
company_name: company.company_name, company_name: company.company_name,
data_source: dataSource, data_source: dataSource,
force_refresh: forceRefresh, force_refresh: forceRefresh,
currency: currency currency: currency,
frequency: frequency
}) })
setUpdateId(response.update_id) setUpdateId(response.update_id)
} catch (err: any) { } catch (err: any) {

View File

@ -108,10 +108,11 @@ export async function getFetchStatus(updateId: number): Promise<DataUpdateRespon
*/ */
export async function getFinancialData( export async function getFinancialData(
companyId: number, companyId: number,
dataSource: string dataSource: string,
frequency: 'Annual' | 'Quarterly' | 'Semiannual' = 'Annual'
): Promise<FinancialDataResponse> { ): Promise<FinancialDataResponse> {
const res = await fetch( const res = await fetch(
`${API_BASE}/data/financial?company_id=${companyId}&data_source=${dataSource}` `${API_BASE}/data/financial?company_id=${companyId}&data_source=${dataSource}&frequency=${frequency}`
) )
if (!res.ok) throw new Error("Failed to get financial data") if (!res.ok) throw new Error("Failed to get financial data")
return res.json() return res.json()

View File

@ -59,6 +59,7 @@ export interface FetchDataRequest {
data_source: string data_source: string
force_refresh?: boolean force_refresh?: boolean
currency?: string currency?: string
frequency?: 'Annual' | 'Quarterly' | 'Semiannual'
} }
export interface FetchDataResponse { export interface FetchDataResponse {