import pandas as pd import time import os import psycopg2 import numpy as np from typing import List, Union from dotenv import load_dotenv from pathlib import Path import logging from .ifind_client import IFindClient # Get logger logger = logging.getLogger(__name__) # Explicitly load .env from project root ROOT_DIR = Path(__file__).resolve().parent.parent.parent.parent load_dotenv(ROOT_DIR / ".env") class IFindIntClient: """ Generic iFinD Client for International Markets (JP, VN, US) - Backend Version. Uses 'OAS' (Original Accounting Standards) standardized indicators. Implements generic wide-table persistence (Side-Channel) for background task compatibility. """ def __init__(self, api_key: str, market: str): self.cli = IFindClient(refresh_token=api_key) self.market = market self._basic_info_cache = {} self._active_years_cache = {} def _get_db_connection(self): """Create a database connection.""" db_host = os.getenv("DB_HOST", "192.168.3.195") db_user = os.getenv("DB_USER", "value") db_pass = os.getenv("DB_PASSWORD", "Value609!") db_name = os.getenv("DB_NAME", "fa3") db_port = os.getenv("DB_PORT", "5432") try: conn = psycopg2.connect( host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port ) conn.set_client_encoding('UTF8') return conn except Exception as e: logger.error(f"DB Connection Error: {e}") return None def _map_dtype_to_sql(self, dtype): """Map pandas dtype to PostgreSQL type.""" if pd.api.types.is_integer_dtype(dtype): return "BIGINT" elif pd.api.types.is_float_dtype(dtype): return "NUMERIC" elif pd.api.types.is_bool_dtype(dtype): return "BOOLEAN" elif pd.api.types.is_datetime64_any_dtype(dtype): return "TIMESTAMP" else: return "TEXT" def _save_df_to_wide_table(self, table_name: str, df: pd.DataFrame, pk_cols: List[str]): """ Save DataFrame to a specific wide table using direct psycopg2 connection. Creates table if not exists using DF columns. Performs incremental UPSERT. """ if df is None or df.empty: return conn = self._get_db_connection() if not conn: return try: # 1. Clean Data df_clean = df.replace({np.nan: None}) # Convert date columns for col in df_clean.columns: if 'date' in col.lower() and df_clean[col].dtype == 'object': try: sample = df_clean[col].dropna().astype(str).iloc[0] if not df_clean[col].dropna().empty else "" if len(sample) == 8 and sample.isdigit(): df_clean[col] = df_clean[col].astype(str).apply( lambda x: f"{x[:4]}-{x[4:6]}-{x[6:]}" if x and len(str(x))==8 else x ) except: pass columns = list(df_clean.columns) with conn.cursor() as cur: # 2. Check/Create Table cur.execute("SELECT to_regclass(%s)", (f"public.{table_name}",)) table_exists = cur.fetchone()[0] is not None if not table_exists: logger.info(f"🆕 [Database] Creating table {table_name}...") col_defs = ['"id" SERIAL PRIMARY KEY'] for col in columns: sql_type = self._map_dtype_to_sql(df_clean[col].dtype) col_defs.append(f'"{col}" {sql_type}') col_defs.append('"update_date" TIMESTAMP DEFAULT CURRENT_TIMESTAMP') pk_str = ", ".join([f'"{c}"' for c in pk_cols]) constraint_name = f"uq_{table_name}" create_sql = f""" CREATE TABLE IF NOT EXISTS {table_name} ( {', '.join(col_defs)}, CONSTRAINT {constraint_name} UNIQUE ({pk_str}) ); """ cur.execute(create_sql) conn.commit() # 3. Get existing columns definition cur.execute(""" SELECT column_name FROM information_schema.columns WHERE table_name = %s """, (table_name,)) db_cols = {row[0] for row in cur.fetchall()} # Check if 'id' column exists, if not add it if 'id' not in db_cols: try: cur.execute(f'ALTER TABLE "{table_name}" ADD COLUMN "id" SERIAL') conn.commit() db_cols.add('id') except Exception as e: logger.error(f"Error adding id column to {table_name}: {e}") conn.rollback() valid_cols = [c for c in columns if c in db_cols] if not valid_cols: return # 4. Upsert cols_str = ", ".join([f'"{c}"' for c in valid_cols]) vals_str = ", ".join(["%s"] * len(valid_cols)) updates = [f'"{c}" = EXCLUDED."{c}"' for c in valid_cols if c not in pk_cols] if 'update_date' in db_cols: updates.append('"update_date" = CURRENT_TIMESTAMP') update_clause = f"DO UPDATE SET {', '.join(updates)}" if updates else "DO NOTHING" pk_conflict = ", ".join([f'"{c}"' for c in pk_cols]) insert_sql = f""" INSERT INTO {table_name} ({cols_str}) VALUES ({vals_str}) ON CONFLICT ({pk_conflict}) {update_clause} """ data_tuples = [tuple(x) for x in df_clean[valid_cols].to_numpy()] cur.executemany(insert_sql, data_tuples) conn.commit() except Exception as e: logger.error(f"Error saving to {table_name}: {e}", exc_info=True) conn.rollback() finally: conn.close() def _parse_ifind_tables(self, res: dict) -> pd.DataFrame: if not res: return pd.DataFrame() if res.get("errorcode") != 0: logger.error(f"iFinD API Error: {res.get('errmsg')} (code: {res.get('errorcode')})") return pd.DataFrame() tables = res.get("tables", []) if not tables: return pd.DataFrame() table_info = tables[0] table_data = table_info.get("table", {}) times = table_info.get("time", []) if not table_data: return pd.DataFrame() processed_table_data = {} for k, v in table_data.items(): if not isinstance(v, list): processed_table_data[k] = [v] else: processed_table_data[k] = v df = pd.DataFrame(processed_table_data) if times and len(times) == len(df): df['end_date'] = [str(t).replace('-', '').replace('/', '').split(' ')[0] for t in times] elif times and len(df) == 1: df['end_date'] = str(times[0]).replace('-', '').replace('/', '').split(' ')[0] if 'end_date' not in df.columns: for col in ['time', 'date', 'trade_date', 'REPORT_DATE']: if col in df.columns: df['end_date'] = df[col].astype(str).str.replace('-', '').str.replace('/', '').str.split(' ').str[0] break return df def _filter_data(self, df: pd.DataFrame) -> pd.DataFrame: if df.empty or 'end_date' not in df.columns: return df df = df.sort_values(by='end_date', ascending=False) df = df.drop_duplicates(subset=['end_date'], keep='first') if df.empty: return df latest_record = df.iloc[[0]] try: latest_date_str = str(latest_record['end_date'].values[0]) last_year_date_str = str(int(latest_date_str) - 10000) comparable_record = df[df['end_date'].astype(str) == last_year_date_str] except: comparable_record = pd.DataFrame() is_annual = df['end_date'].astype(str).str.endswith('1231') | df['end_date'].astype(str).str.endswith('0331') annual_records = df[is_annual] combined = pd.concat([latest_record, comparable_record, annual_records]) combined = combined.drop_duplicates(subset=['end_date']) combined = combined.sort_values(by='end_date', ascending=False) return combined def _fetch_basic_info(self, symbol: str, code: str) -> dict: if code in self._basic_info_cache: return self._basic_info_cache[code] params = { "codes": code, "indipara": [ {"indicator": "corp_cn_name", "indiparams": []}, {"indicator": "accounting_date", "indiparams": []}, {"indicator": "ipo_date", "indiparams": []} ] } res = self.cli.post("basic_data_service", params) df = self._parse_ifind_tables(res) if not df.empty: df['code'] = code # Inject ID # self._save_raw_data(df, symbol, "basic_info_raw") # Usually generic international doesn't save stock_basic table, but we can if needed. # Keeping consistent with original logic which only saved raw file. pass info = { "name": "", "accounting_date": "1231", "ipo_date": "" } if not df.empty: row = df.iloc[0] info["name"] = str(row.get("corp_cn_name", "")) acc_date = str(row.get("accounting_date", "1231")).replace("-", "").replace("/", "") if acc_date: info["accounting_date"] = acc_date info["ipo_date"] = str(row.get("ipo_date", "")).replace("-", "").replace("/", "") self._basic_info_cache[code] = info return info def _fetch_financial_data_annual(self, symbol: str, code: str, indicator_configs: list) -> pd.DataFrame: basic_info = self._fetch_basic_info(symbol, code) acc_date = basic_info.get("accounting_date", "1231") current_year = int(time.strftime("%Y")) last_valid_year = None # 1. Determine most recent valid year for offset in range(3): test_year = current_year - offset test_date = f"{test_year}{acc_date}" first_indicator = indicator_configs[0] params = { "codes": code, "indipara": [ {"indicator": first_indicator["indicator"], "indiparams": [test_date, first_indicator.get("type", "1"), "CNY"]} ] } res = self.cli.post("basic_data_service", params) df = self._parse_ifind_tables(res) if not df.empty: valid_val = df.iloc[0, 0] if not df.empty and df.shape[1] > 0 else None if pd.notna(valid_val) and valid_val != 0: last_valid_year = test_year break if last_valid_year is None: last_valid_year = current_year all_dfs = [] for i in range(5): target_year = last_valid_year - i target_date = f"{target_year}{acc_date}" params = { "codes": code, "indipara": [ {"indicator": item["indicator"], "indiparams": [target_date, item.get("type", "1"), "CNY"]} for item in indicator_configs ] } res = self.cli.post("basic_data_service", params) df = self._parse_ifind_tables(res) if not df.empty: df['end_date'] = target_date all_dfs.append(df) # Filter and concat all_dfs = [d for d in all_dfs if not d.empty and not d.isna().all().all()] if not all_dfs: return pd.DataFrame() # Cache the actual years found in financial statements try: years = sorted([int(str(d)[:4]) for d in pd.concat(all_dfs)['end_date'].unique() if str(d)[:4].isdigit()]) if years: self._active_years_cache[code] = years except Exception as e: logger.error(f"Error caching years: {e}") return pd.concat(all_dfs, ignore_index=True) def get_income_statement(self, symbol: str, code: str) -> pd.DataFrame: indicators = [ {"indicator": "revenue_oas"}, {"indicator": "gross_profit_oas"}, {"indicator": "sga_expenses_oas"}, {"indicator": "selling_marketing_expenses_oas"}, {"indicator": "ga_expenses_oas"}, {"indicator": "rd_expenses_oas"}, {"indicator": "income_tax_expense_oas"}, {"indicator": "net_income_attri_to_common_sh_oas"}, {"indicator": "operating_income_oas"} ] df = self._fetch_financial_data_annual(symbol, code, indicators) if df.empty: return df # Backend assumption: no file save # DB Save df['code'] = code df['market'] = self.market # Inject Market df['symbol'] = symbol self._save_df_to_wide_table("ifind_int_income_statement", df, ["code", "market", "end_date"]) rename_map = { 'revenue_oas': 'revenue', 'gross_profit_oas': 'gross_profit', 'sga_expenses_oas': 'sga_exp', 'selling_marketing_expenses_oas': 'selling_marketing_exp', 'ga_expenses_oas': 'ga_exp', 'rd_expenses_oas': 'rd_exp', 'income_tax_expense_oas': 'income_tax', 'net_income_attri_to_common_sh_oas': 'net_income', 'operating_income_oas': 'operating_profit' } df_filtered = df.rename(columns=rename_map) df_filtered = df_filtered.loc[:, ~df_filtered.columns.duplicated()] for col in df_filtered.columns: if col not in ['date', 'end_date', 'code', 'symbol', 'market']: df_filtered[col] = pd.to_numeric(df_filtered[col], errors='coerce') return self._filter_data(df_filtered) def get_balance_sheet(self, symbol: str, code: str) -> pd.DataFrame: indicators = [ {"indicator": "cash_equi_short_term_inve_oas"}, {"indicator": "accou_and_notes_recei_oas"}, {"indicator": "inventories_oas"}, {"indicator": "ppe_net_oas"}, {"indicator": "long_term_inv_and_receiv_oas"}, {"indicator": "goodwill_and_intasset_oas"}, {"indicator": "short_term_debt_oas"}, {"indicator": "short_term_borrowings_oas"}, {"indicator": "account_and_note_payable_oas"}, {"indicator": "contra_liabilities_current_oas"}, {"indicator": "advance_from_cust_current_oas"}, {"indicator": "defer_revenue_current_oas"}, {"indicator": "long_term_debt_oas"}, {"indicator": "long_term_borrowings_oas"}, {"indicator": "total_assets_oas"}, {"indicator": "equity_attri_to_companyowner_oas"}, {"indicator": "prepaid_expenses_current_oas"} ] df = self._fetch_financial_data_annual(symbol, code, indicators) if df.empty: return df df['code'] = code df['market'] = self.market df['symbol'] = symbol self._save_df_to_wide_table("ifind_int_balance_sheet", df, ["code", "market", "end_date"]) rename_map = { 'cash_equi_short_term_inve_oas': 'cash', 'accou_and_notes_recei_oas': 'receivables', 'inventories_oas': 'inventory', 'ppe_net_oas': 'fixed_assets', 'long_term_inv_and_receiv_oas': 'long_term_investments', 'goodwill_and_intasset_oas': 'goodwill', 'short_term_debt_oas': 'short_term_debt', 'short_term_borrowings_oas': 'short_term_borrowings', 'account_and_note_payable_oas': 'accounts_payable', 'contra_liabilities_current_oas': 'contract_liabilities', 'advance_from_cust_current_oas': 'advances_from_customers', 'defer_revenue_current_oas': 'deferred_revenue', 'long_term_debt_oas': 'long_term_debt', 'long_term_borrowings_oas': 'long_term_borrowings', 'total_assets_oas': 'total_assets', 'equity_attri_to_companyowner_oas': 'total_equity', 'prepaid_expenses_current_oas': 'prepayment' } df_filtered = df.rename(columns=rename_map) df_filtered = df_filtered.loc[:, ~df_filtered.columns.duplicated()] if 'total_liabilities' not in df_filtered.columns or df_filtered['total_liabilities'].isnull().all(): if 'total_assets' in df_filtered.columns and 'total_equity' in df_filtered.columns: df_filtered['total_liabilities'] = df_filtered['total_assets'] - df_filtered['total_equity'] for col in df_filtered.columns: if col not in ['date', 'end_date', 'code', 'symbol', 'market']: df_filtered[col] = pd.to_numeric(df_filtered[col], errors='coerce') return self._filter_data(df_filtered) def get_cash_flow(self, symbol: str, code: str) -> pd.DataFrame: indicators = [ {"indicator": "net_cash_flows_from_oa_oas"}, {"indicator": "purchase_of_ppe_and_ia_oas"}, {"indicator": "dividends_paid_oas"} ] df = self._fetch_financial_data_annual(symbol, code, indicators) if df.empty: return df df['code'] = code df['market'] = self.market df['symbol'] = symbol self._save_df_to_wide_table("ifind_int_cash_flow", df, ["code", "market", "end_date"]) rename_map = { 'net_cash_flows_from_oa_oas': 'ocf', 'purchase_of_ppe_and_ia_oas': 'capex', 'dividends_paid_oas': 'dividends' } df_filtered = df.rename(columns=rename_map) df_filtered = df_filtered.loc[:, ~df_filtered.columns.duplicated()] for col in df_filtered.columns: if col not in ['date', 'end_date', 'code', 'symbol', 'market']: df_filtered[col] = pd.to_numeric(df_filtered[col], errors='coerce') if 'capex' in df_filtered.columns: df_filtered['capex'] = df_filtered['capex'].abs() return self._filter_data(df_filtered) def get_market_metrics(self, symbol: str, code: str) -> dict: basic_info = self._fetch_basic_info(symbol, code) metrics = { "name": basic_info.get("name", ""), "list_date": basic_info.get("ipo_date", ""), "price": 0, "market_cap": 0, "pe": 0, "pb": 0, "dividend_yield": 0 } # In backend, rely on DB query or fallbacks report_dates = [] if code in self._active_years_cache: years = self._active_years_cache[code] report_dates = [f"{y}1231" for y in years] else: current_year = int(time.strftime("%Y")) report_dates = [f"{current_year - i}1231" for i in range(5)] # Fetch daily_basic data for each report date all_daily_basic = [] for date_str in report_dates: if len(str(date_str)) == 8: formatted_date = f"{str(date_str)[:4]}-{str(date_str)[4:6]}-{str(date_str)[6:]}" else: formatted_date = str(date_str) params = { "codes": code, "functionpara": {"date_sequence": formatted_date}, "startdate": "", "enddate": "", "indipara": [ {"indicator": "close_price", "indiparams": ["", "0", "BB"]}, {"indicator": "market_value", "indiparams": ["", "BB"]}, {"indicator": "pb_latest", "indiparams": ["", "100"]}, {"indicator": "dividend_yield_ttm_ex_sd", "indiparams": [""]}, {"indicator": "pe_ttm", "indiparams": ["", "100"]} ] } res = self.cli.post("date_sequence", params) df = self._parse_ifind_tables(res) if not df.empty: df['code'] = code df['market'] = self.market df['end_date'] = str(date_str).replace('-', '') all_daily_basic.append(df) # Save to DB if all_daily_basic: combined_df = pd.concat(all_daily_basic, ignore_index=True) self._save_df_to_wide_table("ifind_int_daily_basic", combined_df, ["code", "market", "end_date"]) # Use the first (most recent) data for metrics dict latest_df = all_daily_basic[0] if not latest_df.empty: row = latest_df.iloc[0] metrics["price"] = float(row.get("close_price") or 0) metrics["market_cap"] = float(row.get("market_value") or 0) metrics["pe"] = float(row.get("pe_ttm") or 0) metrics["pb"] = float(row.get("pb_latest") or 0) metrics["dividend_yield"] = float(row.get("dividend_yield_ttm_ex_sd") or 0) return metrics def get_dividends(self, symbol: str, code: str) -> pd.DataFrame: basic_info = self._fetch_basic_info(symbol, code) acc_date = basic_info.get("accounting_date", "1231") if code in self._active_years_cache: years_to_fetch = sorted(self._active_years_cache[code], reverse=True) else: current_year = int(time.strftime("%Y")) years_to_fetch = [current_year - i for i in range(5)] results = [] for year in years_to_fetch: year_str = str(year) params = { "codes": code, "indipara": [ {"indicator": "annual_cum_dividend", "indiparams": [year_str, "CNY"]} ] } res = self.cli.post("basic_data_service", params) df = self._parse_ifind_tables(res) if not df.empty and 'annual_cum_dividend' in df.columns: end_date = f"{year_str}{acc_date}" df['code'] = code df['market'] = self.market df['end_date'] = end_date self._save_df_to_wide_table("ifind_int_dividend", df, ["code", "market", "end_date"]) val = df['annual_cum_dividend'].iloc[0] if pd.notna(val) and val != 0: results.append({ 'date_str': end_date, 'dividends': float(val) }) if not results: return pd.DataFrame() df_div = pd.DataFrame(results) return df_div def get_repurchases(self, symbol: str, code: str) -> pd.DataFrame: basic_info = self._fetch_basic_info(symbol, code) acc_date = basic_info.get("accounting_date", "1231") mm = acc_date[:2] dd = acc_date[2:] fmt_mm_dd = f"{mm}-{dd}" if code in self._active_years_cache: years_to_fetch = sorted(self._active_years_cache[code], reverse=True) else: current_year = int(time.strftime("%Y")) years_to_fetch = [current_year - i for i in range(5)] results = [] for target_year in years_to_fetch: start_date = f"{target_year - 1}-{fmt_mm_dd}" end_date = f"{target_year}-{fmt_mm_dd}" params = { "codes": code, "indipara": [ {"indicator": "repur_num_new", "indiparams": [start_date, end_date, "1"]} ] } res = self.cli.post("basic_data_service", params) df = self._parse_ifind_tables(res) if not df.empty and 'repur_num_new' in df.columns: target_date = f"{target_year}{acc_date}" df['code'] = code df['market'] = self.market df['end_date'] = target_date self._save_df_to_wide_table("ifind_int_repurchase", df, ["code", "market", "end_date"]) val = df['repur_num_new'].iloc[0] if pd.notna(val) and val != 0: results.append({ 'date_str': target_date, 'repurchases': float(val) }) if not results: return pd.DataFrame() df_repur = pd.DataFrame(results) return df_repur def get_employee_count(self, symbol: str, code: str) -> pd.DataFrame: basic_info = self._fetch_basic_info(symbol, code) acc_date = basic_info.get("accounting_date", "1231") mm = acc_date[:2] dd = acc_date[2:] if code in self._active_years_cache: years_to_fetch = sorted(self._active_years_cache[code], reverse=True) else: current_year = int(time.strftime("%Y")) years_to_fetch = [current_year - i for i in range(5)] results = [] for target_year in years_to_fetch: target_date = f"{target_year}-{mm}-{dd}" params = { "codes": code, "indipara": [ {"indicator": "staff_num", "indiparams": [target_date]} ] } res = self.cli.post("basic_data_service", params) df = self._parse_ifind_tables(res) if not df.empty and 'staff_num' in df.columns: df['code'] = code df['market'] = self.market df['end_date'] = f"{target_year}{acc_date}" self._save_df_to_wide_table("ifind_int_employee", df, ["code", "market", "end_date"]) val = df['staff_num'].iloc[0] if pd.notna(val) and val != 0: results.append({ 'date_str': f"{target_year}{acc_date}", 'employee_count': float(val) }) if not results: return pd.DataFrame() df_emp = pd.DataFrame(results) return df_emp def get_financial_ratios(self, symbol: str, code: str) -> pd.DataFrame: current_year = int(time.strftime("%Y")) basic_info = self._fetch_basic_info(symbol, code) acc_date = basic_info.get("accounting_date", "1231") last_valid_year = None for offset in range(3): test_year = current_year - offset test_date = f"{test_year}{acc_date}" params = { "codes": code, "indipara": [{"indicator": "roe", "indiparams": [test_date]}] } res = self.cli.post("basic_data_service", params) df = self._parse_ifind_tables(res) if not df.empty: val = df.iloc[0, 0] if not df.empty and df.shape[1] > 0 else None if pd.notna(val) and val != 0: last_valid_year = test_year break if last_valid_year is None: last_valid_year = current_year all_dfs = [] # Use cached years from financial statements if code in self._active_years_cache: years_to_fetch = sorted(self._active_years_cache[code], reverse=True) else: # Fallback to last_valid_year approach years_to_fetch = [last_valid_year - i for i in range(5)] for target_year in years_to_fetch: date_str = f"{target_year}{acc_date}" year_str = str(target_year) indipara = [] for key in ["salary_pp", "revenue_pp", "profit_pp"]: indipara.append({"indicator": key, "indiparams": [year_str, "100"]}) ratio_keys = [ "roe", "roa", "roic", "sales_fee_to_or", "manage_fee_to_revenue", "rad_expense_to_total_income", "operating_revenue_yoy", "np_atsopc_yoy", "ibdebt_ratio_asset_base", "inventory_turnover_days", "receivable_turnover_days", "accounts_payable_turnover_days", "fixed_asset_turnover_ratio", "total_capital_turnover" ] for key in ratio_keys: indipara.append({"indicator": key, "indiparams": [date_str]}) params = { "codes": code, "indipara": indipara } res = self.cli.post("basic_data_service", params) # Use 'params' to debug if needed, but logging is inside _parse in client base df = self._parse_ifind_tables(res) if not df.empty: if 'end_date' not in df.columns: df['end_date'] = date_str df = df.dropna(axis=1, how='all') all_dfs.append(df) if not all_dfs: return pd.DataFrame() combined = pd.concat(all_dfs, ignore_index=True) # Ratio table save combined['code'] = code combined['market'] = self.market self._save_df_to_wide_table("ifind_int_financial_ratios", combined, ["code", "market", "end_date"]) return combined