From 5bce35d5048dca6eb6e2aad05d34e1a9ea14d9b9 Mon Sep 17 00:00:00 2001 From: xucheng Date: Thu, 22 Jan 2026 20:37:23 +0800 Subject: [PATCH] feat: add support for quarterly and semiannual financial data frequencies --- backend/app/api/data_routes.py | 11 +- backend/app/clients/bloomberg_client.py | 145 +++++++++++-------- backend/app/fetchers/bloomberg_fetcher.py | 17 ++- backend/app/schemas.py | 1 + backend/app/services/bloomberg_service.py | 30 +++- backend/app/services/data_fetcher_service.py | 28 ++-- backend/scripts/create_quarterly_table.py | 51 +++++++ backend/scripts/create_semiannual_table.py | 51 +++++++ frontend/src/app/page.tsx | 27 ++-- frontend/src/components/bloomberg-view.tsx | 97 ++++++++++--- frontend/src/hooks/use-financial-data.ts | 5 +- frontend/src/lib/api.ts | 5 +- frontend/src/lib/types.ts | 1 + 13 files changed, 355 insertions(+), 114 deletions(-) create mode 100644 backend/scripts/create_quarterly_table.py create mode 100644 backend/scripts/create_semiannual_table.py diff --git a/backend/app/api/data_routes.py b/backend/app/api/data_routes.py index 1c55693..7615881 100644 --- a/backend/app/api/data_routes.py +++ b/backend/app/api/data_routes.py @@ -81,7 +81,8 @@ async def fetch_data( symbol=request.symbol, data_source=request.data_source, update_id=data_update.id, - currency=request.currency + currency=request.currency, + frequency=request.frequency ) return FetchDataResponse( @@ -98,7 +99,8 @@ def fetch_data_background( symbol: str, data_source: str, update_id: int, - currency: str = None + currency: str = None, + frequency: str = "Annual" ): """后台数据获取任务 - 完全同步执行,避免event loop冲突""" import sys @@ -120,7 +122,8 @@ def fetch_data_background( symbol=symbol, data_source=data_source, update_id=update_id, - currency=currency + currency=currency, + frequency=frequency ) # 更新数据更新记录 - 使用psycopg2同步连接 @@ -257,6 +260,7 @@ async def get_fetch_status( async def get_financial_data( company_id: int, data_source: str, + frequency: str = "Annual", db: AsyncSession = Depends(get_db) ): """ @@ -268,6 +272,7 @@ async def get_financial_data( data = await data_fetcher_service.get_financial_data_from_db( company_id=company_id, data_source=data_source, + frequency=frequency, db=db ) diff --git a/backend/app/clients/bloomberg_client.py b/backend/app/clients/bloomberg_client.py index 0f944f8..a45a65a 100644 --- a/backend/app/clients/bloomberg_client.py +++ b/backend/app/clients/bloomberg_client.py @@ -277,12 +277,17 @@ class BloombergClient: logger.error(f"DB Connection Error: {e}") return None - def _ensure_schema(self, conn): + def _ensure_schema(self, conn, table_name="stockcard"): """Ensure the necessary tables exist.""" with conn.cursor() as cur: # Create table if not exists (Inferred schema from legacy use) - cur.execute(""" - CREATE TABLE IF NOT EXISTS stockcard ( + # Use format string for table name as it cannot be parameterized directly in DDL + # Validate table_name to prevent SQL injection (simple whitelist or strict formatting) + if table_name not in ["stockcard", "stockcard_quarter", "stockcard_semiannual"]: + raise ValueError(f"Invalid table name: {table_name}") + + cur.execute(f""" + CREATE TABLE IF NOT EXISTS {table_name} ( id SERIAL PRIMARY KEY, Company_code TEXT, update_date DATE, @@ -296,31 +301,29 @@ class BloombergClient: # Ensure column exists if table was already created try: - cur.execute("ALTER TABLE stockcard ADD COLUMN IF NOT EXISTS source TEXT;") + cur.execute(f"ALTER TABLE {table_name} ADD COLUMN IF NOT EXISTS source TEXT;") except Exception: conn.rollback() # Should not happen with IF NOT EXISTS but good practice # Migrate update_date to TIMESTAMP try: - cur.execute("ALTER TABLE stockcard ALTER COLUMN update_date TYPE TIMESTAMP USING update_date::timestamp;") + cur.execute(f"ALTER TABLE {table_name} ALTER COLUMN update_date TYPE TIMESTAMP USING update_date::timestamp;") except Exception as e: # Likely already converted or failed, log but don't crash - logger.info(f"Schema migration note (update_date): {e}") + logger.info(f"Schema migration note (update_date) for {table_name}: {e}") conn.rollback() conn.commit() - def save_data(self, data_list): + def save_data(self, data_list, table_name="stockcard"): """Insert a list of data dictionaries into the database.""" if not data_list: return - - conn = self._get_db_connection() if not conn: return - self._ensure_schema(conn) + self._ensure_schema(conn, table_name) try: with conn.cursor() as cur: @@ -337,28 +340,31 @@ class BloombergClient: 'bloomberg' )) - query = """ - INSERT INTO stockcard (Company_code, update_date, currency, indicator, value, value_date, source) + query = f""" + INSERT INTO {table_name} (Company_code, update_date, currency, indicator, value, value_date, source) VALUES (%s, %s, %s, %s, %s, %s, %s) """ cur.executemany(query, args_list) conn.commit() - logger.info(f"✅ Saved {len(data_list)} records to database.") + logger.info(f"✅ Saved {len(data_list)} records to database table '{table_name}'.") except Exception as e: - logger.error(f"Error saving to stockcard: {e}") + logger.error(f"Error saving to {table_name}: {e}") conn.rollback() finally: conn.close() - def run_cleanup(self): + def run_cleanup(self, table_name="stockcard"): """Run deduplication and view refresh logic.""" conn = self._get_db_connection() if not conn: return + if table_name not in ["stockcard", "stockcard_quarter", "stockcard_semiannual"]: + return + try: with conn.cursor() as cur: # 1. Deduplication - cur.execute(''' + cur.execute(f''' WITH DuplicateRows AS ( SELECT id, @@ -367,9 +373,9 @@ class BloombergClient: ORDER BY update_date DESC, id DESC ) as rn FROM - stockcard + {table_name} ) - DELETE FROM stockcard + DELETE FROM {table_name} WHERE id IN ( SELECT id FROM DuplicateRows @@ -377,41 +383,46 @@ class BloombergClient: ); ''') - # 2. Materialized View Refresh - cur.execute(''' - CREATE MATERIALIZED VIEW IF NOT EXISTS public.unique_company_codes AS - SELECT - s.Company_code, - (SELECT cn_sub.value - FROM public.stockcard AS cn_sub - WHERE cn_sub.Company_code = s.Company_code - AND cn_sub.indicator = 'company_name' - ORDER BY cn_sub.value ASC - LIMIT 1) AS company_name - FROM - (SELECT DISTINCT Company_code FROM public.stockcard WHERE Company_code IS NOT NULL) s - ORDER BY - s.Company_code; - ''') - # Try refresh - try: - cur.execute("REFRESH MATERIALIZED VIEW public.unique_company_codes;") - except: - pass + # 2. Materialized View Refresh (Only for main table for now, or unified?) + # For now, unique_company_codes likely relies on the main table for list of companies. + # We can skip updating the view if it's just meant for company discovery, + # assuming stockcard (annual) covers all companies. + if table_name == "stockcard": + cur.execute(''' + CREATE MATERIALIZED VIEW IF NOT EXISTS public.unique_company_codes AS + SELECT + s.Company_code, + (SELECT cn_sub.value + FROM public.stockcard AS cn_sub + WHERE cn_sub.Company_code = s.Company_code + AND cn_sub.indicator = 'company_name' + ORDER BY cn_sub.value ASC + LIMIT 1) AS company_name + FROM + (SELECT DISTINCT Company_code FROM public.stockcard WHERE Company_code IS NOT NULL) s + ORDER BY + s.Company_code; + ''') + # Try refresh + try: + cur.execute("REFRESH MATERIALIZED VIEW public.unique_company_codes;") + except: + pass conn.commit() - logger.info("✅ Cleanup and View Refresh completed.") + logger.info(f"✅ Cleanup completed for {table_name}.") except Exception as e: - logger.error(f"❌ Cleanup failed: {e}") + logger.error(f"❌ Cleanup failed for {table_name}: {e}") conn.rollback() finally: conn.close() # --- Core Fetching Logic --- - def fetch_company(self, market, symbol, progress_callback=None, force_currency=None): + def fetch_company(self, market, symbol, progress_callback=None, force_currency=None, period="YEARLY"): """ Main entry point to fetch data for a single company. + period: 'YEARLY' or 'QUARTERLY' """ # Determine Bloomberg Ticker format # If symbol already has Equity, use it. Else append. @@ -440,8 +451,15 @@ class BloombergClient: today_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - logger.info(f"🚀 Starting fetch for: {company_code}") - if progress_callback: progress_callback("Starting Bloomberg session...", 0) + # Determine Table Name + table_name = "stockcard" + if period == "QUARTERLY": + table_name = "stockcard_quarter" + elif period == "SEMI_ANNUALLY": + table_name = "stockcard_semiannual" + + logger.info(f"🚀 Starting fetch for: {company_code} (Period: {period}, Table: {table_name})") + if progress_callback: progress_callback(f"Starting Bloomberg session ({period})...", 0) # 0. Prep Remote Environment init_code = """ @@ -482,28 +500,32 @@ if 'bquery' not in globals(): logger.info("Fetching Basic Data...") if progress_callback: progress_callback("Fetching Company Basic Info...", 10) try: + # Basic data is always same regardless of period (it's metadata) + # We save it to the target table to ensure FK/Consistency if needed, + # or just save to main table? + # For simplicity and self-containment, save to target table too. basic_data = self._fetch_basic_remote(company_code, currency, query_ticker=query_ticker) logger.info(f"DEBUG: basic_data before save: {basic_data}") - self.save_data(basic_data) + self.save_data(basic_data, table_name=table_name) except Exception as e: logger.error(f"Error fetching basic data: {e}") # 3. Fetch Currency Data logger.info("Fetching Currency Data...") - if progress_callback: progress_callback(f"正在获取货币指标 ({currency})...", 30) + if progress_callback: progress_callback(f"正在获取货币指标 ({period})...", 30) curr_data = [] try: - curr_data = self._fetch_series_remote(company_code, currency, CURRENCY_CONFIG, "currency", query_ticker=query_ticker) - self.save_data(curr_data) + curr_data = self._fetch_series_remote(company_code, currency, CURRENCY_CONFIG, "currency", query_ticker=query_ticker, period=period) + self.save_data(curr_data, table_name=table_name) except Exception as e: logger.error(f"Error fetching currency series: {e}") # 4. Fetch Non-Currency Data logger.info("Fetching Non-Currency Data...") - if progress_callback: progress_callback("正在获取非货币指标...", 50) + if progress_callback: progress_callback(f"正在获取非货币指标 ({period})...", 50) try: - non_curr_data = self._fetch_series_remote(company_code, currency, NON_CURRENCY_CONFIG, "non_currency", query_ticker=query_ticker) - self.save_data(non_curr_data) + non_curr_data = self._fetch_series_remote(company_code, currency, NON_CURRENCY_CONFIG, "non_currency", query_ticker=query_ticker, period=period) + self.save_data(non_curr_data, table_name=table_name) except Exception as e: logger.error(f"Error fetching non-currency series: {e}") @@ -527,7 +549,7 @@ if 'bquery' not in globals(): logger.info(f"Found {len(revenue_dates)} revenue reporting dates. Fetching aligned price data...") try: price_data = self._fetch_price_by_dates_remote(company_code, currency, revenue_dates, query_ticker=query_ticker) - self.save_data(price_data) + self.save_data(price_data, table_name=table_name) except Exception as e: logger.error(f"Error fetching aligned price data: {e}") else: @@ -540,13 +562,13 @@ if 'bquery' not in globals(): try: price_data = self._fetch_price_by_dates_remote(company_code, currency, fallback_dates, query_ticker=query_ticker) - self.save_data(price_data) + self.save_data(price_data, table_name=table_name) except Exception as e: logger.error(f"Error fetching fallback price data: {e}") # 5. Cleanup if progress_callback: progress_callback("Finalizing data...", 90) - self.run_cleanup() + self.run_cleanup(table_name=table_name) logger.info(f"✅ Completed processing for {company_code}") if progress_callback: progress_callback("Bloomberg data sync complete", 100) @@ -639,7 +661,7 @@ get_basic() """ return self._execute_and_parse(code) - def _fetch_series_remote(self, company_code, currency, config_dict, result_type, query_ticker=None): + def _fetch_series_remote(self, company_code, currency, config_dict, result_type, query_ticker=None, period="YEARLY"): """Generates code to fetch series data using BDH""" target_ticker = query_ticker if query_ticker else company_code @@ -648,7 +670,16 @@ get_basic() # Calculate start date end_date = datetime.now() - start_date = end_date - timedelta(days=period_years*365) + + if period == "QUARTERLY": + # Recent 12 quarters = 3 years = ~1095 days. + # Adding small buffer to 1100 days to be safe. + start_date = end_date - timedelta(days=1100) + elif period == "SEMI_ANNUALLY": + # Recent 12 periods = 6 years = ~2200 days. + start_date = end_date - timedelta(days=2200) + else: + start_date = end_date - timedelta(days=period_years*365) # Format dates for BDH (YYYYMMDD) start_date_str = start_date.strftime('%Y%m%d') @@ -657,7 +688,7 @@ get_basic() # BDH Options bdh_options = { "periodicityAdjustment": "FISCAL", - "periodicitySelection": "YEARLY", + "periodicitySelection": period, # YEARLY or QUARTERLY "currency": currency, # "nonTradingDayFillOption": "NON_TRADING_WEEKDAYS", # "nonTradingDayFillMethod": "PREVIOUS_VALUE" diff --git a/backend/app/fetchers/bloomberg_fetcher.py b/backend/app/fetchers/bloomberg_fetcher.py index a67232b..6138523 100644 --- a/backend/app/fetchers/bloomberg_fetcher.py +++ b/backend/app/fetchers/bloomberg_fetcher.py @@ -105,12 +105,24 @@ class BloombergFetcher(DataFetcher): except Exception as e: print(f"Bloomberg fetch failed (ignoring, checking DB): {e}") - def sync_all_data(self, symbol: str, progress_callback=None, force_currency=None): + def sync_all_data(self, symbol: str, progress_callback=None, force_currency=None, frequency="Annual"): """ Sync all data for the company. Delegates to the universal client. + frequency: 'Annual' (default) or 'Quarter'. """ - self.client.fetch_company(self.market, symbol, progress_callback=progress_callback, force_currency=force_currency) + period = "YEARLY" + if frequency == "Quarterly" or frequency == "Quarter": + period = "QUARTERLY" + elif frequency == "Semiannual" or frequency == "Semiannually": + period = "SEMI_ANNUALLY" + + # Pass period to client + if 'period' in self.client.fetch_company.__code__.co_varnames: + self.client.fetch_company(self.market, symbol, progress_callback=progress_callback, force_currency=force_currency, period=period) + else: + # Fallback for old client code (should not happen if client updated first) + self.client.fetch_company(self.market, symbol, progress_callback=progress_callback, force_currency=force_currency) def get_income_statement(self, symbol: str) -> pd.DataFrame: """兼容性空方法""" @@ -143,3 +155,4 @@ class BloombergFetcher(DataFetcher): def get_financial_ratios(self, symbol: str) -> pd.DataFrame: """兼容性空方法""" return pd.DataFrame() + diff --git a/backend/app/schemas.py b/backend/app/schemas.py index bd2d5d7..adba651 100644 --- a/backend/app/schemas.py +++ b/backend/app/schemas.py @@ -48,6 +48,7 @@ class FetchDataRequest(BaseModel): data_source: str force_refresh: bool = False currency: Optional[str] = None + frequency: Optional[str] = "Annual" class FetchDataResponse(BaseModel): update_id: int diff --git a/backend/app/services/bloomberg_service.py b/backend/app/services/bloomberg_service.py index 5db75c7..a5b6f0c 100644 --- a/backend/app/services/bloomberg_service.py +++ b/backend/app/services/bloomberg_service.py @@ -15,7 +15,8 @@ logger = logging.getLogger(__name__) async def get_bloomberg_data( company: Company, - db: AsyncSession + db: AsyncSession, + frequency: str = "Annual" ) -> List[Dict]: """ 获取指定公司的 Bloomberg 财务数据 @@ -23,11 +24,25 @@ async def get_bloomberg_data( Args: company: 公司对象 db: 数据库会话 + frequency: 'Annual' or 'Quarterly' Returns: List[Dict]: 统一格式的财务数据列表 """ try: + # Determine table name + table_name = "stockcard" + if frequency == "Quarterly" or frequency == "Quarter": + table_name = "stockcard_quarter" + elif frequency == "Semiannual" or frequency == "Semiannually": + table_name = "stockcard_semiannual" + + # Check if table exists (to avoid UndefinedTableError on first run) + check_table_sql = text("SELECT to_regclass(:table_name)") + table_exists = await db.execute(check_table_sql, {"table_name": f"public.{table_name}"}) + if not table_exists.scalar(): + return [] + # 1. 查找对应的 Company_code # stockcard 中存储的是 "AAPL US Equity" 而 symbol 是 "AAPL" target_code = None @@ -37,8 +52,9 @@ async def get_bloomberg_data( possible_codes = [f"{company.symbol}{s}" for s in suffixes] # 检查哪个存在 + # Use dynamic table name in SQL (safe since we control table_name variable) for code in possible_codes: - check_sql = text("SELECT 1 FROM stockcard WHERE Company_code = :code LIMIT 1") + check_sql = text(f"SELECT 1 FROM {table_name} WHERE Company_code = :code LIMIT 1") exists = await db.execute(check_sql, {"code": code}) if exists.scalar(): target_code = code @@ -46,22 +62,22 @@ async def get_bloomberg_data( # 如果没找到,尝试模糊匹配 (作为兜底) if not target_code: - fuzzy_sql = text("SELECT Company_code FROM stockcard WHERE Company_code LIKE :symbol LIMIT 1") + fuzzy_sql = text(f"SELECT Company_code FROM {table_name} WHERE Company_code LIKE :symbol LIMIT 1") fuzzy_res = await db.execute(fuzzy_sql, {"symbol": f"%{company.symbol}%"}) row = fuzzy_res.fetchone() if row: target_code = row[0] if not target_code: - logger.warning(f"No Bloomberg data found for symbol: {company.symbol}") + logger.warning(f"No Bloomberg data found for symbol: {company.symbol} in {table_name}") return [] # 2. 获取该公司的所有数据 # schema: indicator, value, value_date (作为报告期) # Added update_date - query = text(""" + query = text(f""" SELECT indicator, value, value_date, currency, update_date - FROM stockcard + FROM {table_name} WHERE Company_code = :code """) result = await db.execute(query, {"code": target_code}) @@ -115,5 +131,5 @@ async def get_bloomberg_data( return full_list except Exception as e: - logger.error(f"Error fetching Bloomberg data from stockcard: {e}", exc_info=True) + logger.error(f"Error fetching Bloomberg data from {table_name}: {e}", exc_info=True) return [] diff --git a/backend/app/services/data_fetcher_service.py b/backend/app/services/data_fetcher_service.py index a11ac3a..0776ae9 100644 --- a/backend/app/services/data_fetcher_service.py +++ b/backend/app/services/data_fetcher_service.py @@ -210,7 +210,8 @@ def fetch_financial_data_sync( symbol: str, data_source: str, update_id: int, - currency: Optional[str] = None + currency: Optional[str] = None, + frequency: Optional[str] = "Annual" ): """ 同步方式获取财务数据(在后台任务中调用) @@ -219,7 +220,8 @@ def fetch_financial_data_sync( """ try: # 0. 初始化 - update_progress_sync(update_id, "正在初始化数据获取...", 0) + display_freq = "季度" if frequency == "Quarterly" or frequency == "Quarter" else "年度" + update_progress_sync(update_id, f"正在初始化数据获取 ({display_freq})...", 0) # 格式化股票代码 - CN市场需要添加.SH或.SZ后缀 formatted_symbol = symbol @@ -258,13 +260,17 @@ def fetch_financial_data_sync( # 检查 sync_all_data 是否接受 progress_callback 参数 import inspect sig = inspect.signature(fetcher.sync_all_data) + + kwargs = {} if 'progress_callback' in sig.parameters: - if 'force_currency' in sig.parameters: - fetcher.sync_all_data(formatted_symbol, progress_callback=progress_callback, force_currency=currency) - else: - fetcher.sync_all_data(formatted_symbol, progress_callback=progress_callback) - else: - fetcher.sync_all_data(formatted_symbol) + kwargs['progress_callback'] = progress_callback + if 'force_currency' in sig.parameters: + kwargs['force_currency'] = currency + if 'frequency' in sig.parameters: + kwargs['frequency'] = frequency + + fetcher.sync_all_data(formatted_symbol, **kwargs) + else: # 兼容旧代码,虽然有了 sync_all_data 后这部分应该不需要了 fetcher.get_income_statement(formatted_symbol) @@ -309,7 +315,8 @@ def fetch_financial_data_sync( async def get_financial_data_from_db( company_id: int, data_source: str, - db: AsyncSession + db: AsyncSession, + frequency: str = "Annual" ) -> Dict: """ 从数据库读取财务数据 @@ -318,6 +325,7 @@ async def get_financial_data_from_db( company_id: 公司ID data_source: 数据源 (iFinD, Bloomberg, Tushare) db: 数据库会话 + frequency: 'Annual' or 'Quarterly' Returns: 包含所有财务数据的字典 @@ -447,7 +455,7 @@ async def get_financial_data_from_db( elif data_source == 'Bloomberg': try: # 使用独立的 Bloomberg 服务读取数据 - unified_data = await get_bloomberg_data(company, db) + unified_data = await get_bloomberg_data(company, db, frequency=frequency) response_data["unified_data"] = unified_data response_data["income_statement"] = [] diff --git a/backend/scripts/create_quarterly_table.py b/backend/scripts/create_quarterly_table.py new file mode 100644 index 0000000..6d67a62 --- /dev/null +++ b/backend/scripts/create_quarterly_table.py @@ -0,0 +1,51 @@ +import os +import psycopg2 +from dotenv import load_dotenv + +load_dotenv() + +def create_table(): + db_host = os.getenv("DB_HOST", "192.168.3.195") + db_user = os.getenv("DB_USER", "value") + db_pass = os.getenv("DB_PASSWORD", "Value609!") + db_name = os.getenv("DB_NAME", "fa3") + db_port = os.getenv("DB_PORT", "5432") + + try: + conn = psycopg2.connect( + host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port + ) + cur = conn.cursor() + + print("Creating stockcard_quarter table...") + + create_sql = """ + CREATE TABLE IF NOT EXISTS stockcard_quarter ( + id SERIAL PRIMARY KEY, + company_code TEXT, + indicator TEXT, + value TEXT, + currency TEXT, + value_date DATE, + update_date TIMESTAMP WITHOUT TIME ZONE, + source TEXT + ); + """ + cur.execute(create_sql) + + # Create indexes + print("Creating indexes...") + cur.execute("CREATE INDEX IF NOT EXISTS idx_stockcard_quarter_company_code ON stockcard_quarter(company_code);") + cur.execute("CREATE INDEX IF NOT EXISTS idx_stockcard_quarter_value_date ON stockcard_quarter(value_date);") + + conn.commit() + print("Done!") + + cur.close() + conn.close() + + except Exception as e: + print(f"Error: {e}") + +if __name__ == "__main__": + create_table() diff --git a/backend/scripts/create_semiannual_table.py b/backend/scripts/create_semiannual_table.py new file mode 100644 index 0000000..87dc8d3 --- /dev/null +++ b/backend/scripts/create_semiannual_table.py @@ -0,0 +1,51 @@ +import os +import psycopg2 +from dotenv import load_dotenv + +load_dotenv() + +def create_table(): + db_host = os.getenv("DB_HOST", "192.168.3.195") + db_user = os.getenv("DB_USER", "value") + db_pass = os.getenv("DB_PASSWORD", "Value609!") + db_name = os.getenv("DB_NAME", "fa3") + db_port = os.getenv("DB_PORT", "5432") + + try: + conn = psycopg2.connect( + host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port + ) + cur = conn.cursor() + + print("Creating stockcard_semiannual table...") + + create_sql = """ + CREATE TABLE IF NOT EXISTS stockcard_semiannual ( + id SERIAL PRIMARY KEY, + company_code TEXT, + indicator TEXT, + value TEXT, + currency TEXT, + value_date DATE, + update_date TIMESTAMP WITHOUT TIME ZONE, + source TEXT + ); + """ + cur.execute(create_sql) + + # Create indexes + print("Creating indexes...") + cur.execute("CREATE INDEX IF NOT EXISTS idx_stockcard_semiannual_company_code ON stockcard_semiannual(company_code);") + cur.execute("CREATE INDEX IF NOT EXISTS idx_stockcard_semiannual_value_date ON stockcard_semiannual(value_date);") + + conn.commit() + print("Done!") + + cur.close() + conn.close() + + except Exception as e: + print(f"Error: {e}") + +if __name__ == "__main__": + create_table() diff --git a/frontend/src/app/page.tsx b/frontend/src/app/page.tsx index 1796853..192a557 100644 --- a/frontend/src/app/page.tsx +++ b/frontend/src/app/page.tsx @@ -325,16 +325,18 @@ function CompanyAnalysisView({ {company.market}
- + {dataSource !== 'Bloomberg' && ( + + )}
@@ -363,7 +365,7 @@ function CompanyAnalysisView({
)} - {!fetching && !loading &&
+ {!fetching && !loading && dataSource !== 'Bloomberg' &&
已更新: {status?.last_update?.date ? formatTimestamp(status.last_update.date) : "无记录"}
} @@ -384,6 +386,9 @@ function CompanyAnalysisView({ selectedCurrency={currency} userMarket={company.market} companyId={status.company_id} + companySymbol={company.symbol} + companyMarket={company.market} + companyName={company.company_name} lastUpdate={status.last_update?.date} /> ) diff --git a/frontend/src/components/bloomberg-view.tsx b/frontend/src/components/bloomberg-view.tsx index da17588..0c05109 100644 --- a/frontend/src/components/bloomberg-view.tsx +++ b/frontend/src/components/bloomberg-view.tsx @@ -1,26 +1,43 @@ -"use client" - -import { useEffect, useState } from "react" +import { useEffect, useState, useMemo } from "react" import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card" import { Table, TableBody, TableCell, TableHead, TableHeader, TableRow } from "@/components/ui/table" -import { Loader2, DollarSign, RefreshCw, ChevronRight, ChevronDown } from "lucide-react" +import { Loader2, DollarSign, RefreshCw, ChevronRight, ChevronDown, CloudDownload } from "lucide-react" import { Button } from "@/components/ui/button" +import { Tabs, TabsList, TabsTrigger } from "@/components/ui/tabs" import { getFinancialData } from "@/lib/api" import { formatNumber, formatLargeNumber, formatDate } from "@/lib/formatters" -import type { FinancialDataResponse } from "@/lib/types" +import type { FinancialDataResponse, SearchResult } from "@/lib/types" +import { useFinancialData } from "@/hooks/use-financial-data" interface BloombergViewProps { companyId: number + companySymbol?: string // Need symbol/market for hook + companyMarket?: string + companyName?: string onBack?: () => void selectedCurrency?: string - userMarket?: string + userMarket?: string // This is likely same as companyMarket but passed from parent context lastUpdate?: string } -export function BloombergView({ companyId, onBack, selectedCurrency = "Auto", userMarket, lastUpdate }: BloombergViewProps) { +export function BloombergView({ companyId, companySymbol, companyMarket, companyName, onBack, selectedCurrency = "Auto", userMarket, lastUpdate }: BloombergViewProps) { const [data, setData] = useState(null) const [loading, setLoading] = useState(true) const [error, setError] = useState("") + const [frequency, setFrequency] = useState<'Annual' | 'Quarterly' | 'Semiannual'>('Annual') + + // Construct mock Company SearchResult for hook + // Note: In a real app we might pass the full object props + const companyObj: SearchResult | null = useMemo(() => { + return companySymbol && companyMarket ? { + symbol: companySymbol, + market: companyMarket, + company_name: companyName || companySymbol + } : null + }, [companySymbol, companyMarket, companyName]) + + // Use Hook + const { fetching, fetchData, updateStatus } = useFinancialData(companyObj, "Bloomberg") const loadData = async () => { if (!companyId) return @@ -28,8 +45,8 @@ export function BloombergView({ companyId, onBack, selectedCurrency = "Auto", us setLoading(true) setError("") try { - console.log("Fetching Bloomberg data for company:", companyId) - const result = await getFinancialData(companyId, "Bloomberg") + console.log(`Fetching Bloomberg data for company: ${companyId} (${frequency})`) + const result = await getFinancialData(companyId, "Bloomberg", frequency) console.log("Bloomberg data result:", result) setData(result) } catch (err: any) { @@ -42,18 +59,32 @@ export function BloombergView({ companyId, onBack, selectedCurrency = "Auto", us useEffect(() => { loadData() - }, [companyId, lastUpdate]) + }, [companyId, lastUpdate, frequency]) - if (loading) { + // Re-load data when fetch completes + useEffect(() => { + if (updateStatus?.status === 'completed') { + loadData() + } + }, [updateStatus?.status]) + + const handleSync = () => { + if (companyObj) { + // Force refresh, with auto currency (undefined), and selected frequency + fetchData(true, undefined, frequency) + } + } + + if (loading && !data) { return (
-

正在加载 Bloomberg 原始数据...

+

正在加载 Bloomberg 原始数据 ({frequency === 'Annual' ? '年度' : frequency === 'Semiannual' ? '半年度' : '季度'})...

) } - if (error) { + if (error && !data) { return ( @@ -66,10 +97,10 @@ export function BloombergView({ companyId, onBack, selectedCurrency = "Auto", us ) } - if (!data) return null + if (!data && !fetching) return null // 如果后端提供了统一数据字段,直接使用 - const mergedData = data.unified_data || [] + const mergedData = data?.unified_data || [] return (
@@ -84,16 +115,42 @@ export function BloombergView({ companyId, onBack, selectedCurrency = "Auto", us
-
- + +
- + ) } diff --git a/frontend/src/hooks/use-financial-data.ts b/frontend/src/hooks/use-financial-data.ts index 1f772c0..e5a2978 100644 --- a/frontend/src/hooks/use-financial-data.ts +++ b/frontend/src/hooks/use-financial-data.ts @@ -58,7 +58,7 @@ export function useFinancialData(company: SearchResult | null, dataSource: strin }, [updateId, fetching, checkStatus]) // Trigger data fetch - const fetchData = async (forceRefresh = false, currency?: string) => { + const fetchData = async (forceRefresh = false, currency?: string, frequency: 'Annual' | 'Quarterly' | 'Semiannual' = 'Annual') => { if (!company) return setFetching(true) setError("") @@ -69,7 +69,8 @@ export function useFinancialData(company: SearchResult | null, dataSource: strin company_name: company.company_name, data_source: dataSource, force_refresh: forceRefresh, - currency: currency + currency: currency, + frequency: frequency }) setUpdateId(response.update_id) } catch (err: any) { diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 9552964..c3b14a7 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -108,10 +108,11 @@ export async function getFetchStatus(updateId: number): Promise { const res = await fetch( - `${API_BASE}/data/financial?company_id=${companyId}&data_source=${dataSource}` + `${API_BASE}/data/financial?company_id=${companyId}&data_source=${dataSource}&frequency=${frequency}` ) if (!res.ok) throw new Error("Failed to get financial data") return res.json() diff --git a/frontend/src/lib/types.ts b/frontend/src/lib/types.ts index c37311e..8ac3bb2 100644 --- a/frontend/src/lib/types.ts +++ b/frontend/src/lib/types.ts @@ -59,6 +59,7 @@ export interface FetchDataRequest { data_source: string force_refresh?: boolean currency?: string + frequency?: 'Annual' | 'Quarterly' | 'Semiannual' } export interface FetchDataResponse {