import pandas as pd import time import os import psycopg2 import numpy as np from typing import List, Union from dotenv import load_dotenv from pathlib import Path import logging from .ifind_client import IFindClient # Get logger logger = logging.getLogger(__name__) # Explicitly load .env from project root ROOT_DIR = Path(__file__).resolve().parent.parent.parent.parent load_dotenv(ROOT_DIR / ".env") class IFindHKClient: """ iFinD Client specifically for Hong Kong Market (Backend Version). Uses 'THS' indicators and Chinese accounting standard mappings. Implements direct Postgres persistence (Side-Channel) for background task compatibility. """ def __init__(self, api_key: str): self.cli = IFindClient(refresh_token=api_key) self.market = 'HK' self._basic_info_cache = {} self._active_years_cache = {} def _get_db_connection(self): """Create a database connection.""" db_host = os.getenv("DB_HOST", "192.168.3.195") db_user = os.getenv("DB_USER", "value") db_pass = os.getenv("DB_PASSWORD", "Value609!") db_name = os.getenv("DB_NAME", "fa3") db_port = os.getenv("DB_PORT", "5432") try: conn = psycopg2.connect( host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port ) conn.set_client_encoding('UTF8') return conn except Exception as e: logger.error(f"DB Connection Error: {e}") return None def _map_dtype_to_sql(self, dtype): """Map pandas dtype to PostgreSQL type.""" if pd.api.types.is_integer_dtype(dtype): return "BIGINT" elif pd.api.types.is_float_dtype(dtype): return "NUMERIC" elif pd.api.types.is_bool_dtype(dtype): return "BOOLEAN" elif pd.api.types.is_datetime64_any_dtype(dtype): return "TIMESTAMP" else: return "TEXT" def _save_df_to_wide_table(self, table_name: str, df: pd.DataFrame, pk_cols: List[str]): """ Save DataFrame to a specific wide table using direct psycopg2 connection. Creates table if not exists using DF columns. Performs incremental UPSERT. """ if df is None or df.empty: return conn = self._get_db_connection() if not conn: return try: # 1. Clean Data df_clean = df.replace({np.nan: None}) # Convert date columns to YYYY-MM-DD format if needed for col in df_clean.columns: if 'date' in col.lower() and df_clean[col].dtype == 'object': try: sample = df_clean[col].dropna().astype(str).iloc[0] if not df_clean[col].dropna().empty else "" if len(sample) == 8 and sample.isdigit(): df_clean[col] = df_clean[col].astype(str).apply( lambda x: f"{x[:4]}-{x[4:6]}-{x[6:]}" if x and len(str(x))==8 else x ) except: pass columns = list(df_clean.columns) with conn.cursor() as cur: # 2. Check/Create Table cur.execute("SELECT to_regclass(%s)", (f"public.{table_name}",)) table_exists = cur.fetchone()[0] is not None # Filter out valid columns for SQL # Just use dataframe columns for initial creation if not table_exists: logger.info(f"🆕 [Database] Creating table {table_name}...") col_defs = ['"id" SERIAL PRIMARY KEY'] for col in columns: sql_type = self._map_dtype_to_sql(df_clean[col].dtype) col_defs.append(f'"{col}" {sql_type}') col_defs.append('"update_date" TIMESTAMP DEFAULT CURRENT_TIMESTAMP') pk_str = ", ".join([f'"{c}"' for c in pk_cols]) constraint_name = f"uq_{table_name}" create_sql = f""" CREATE TABLE IF NOT EXISTS {table_name} ( {', '.join(col_defs)}, CONSTRAINT {constraint_name} UNIQUE ({pk_str}) ); """ cur.execute(create_sql) conn.commit() # 3. Get existing columns to avoid insertion errors cur.execute(""" SELECT column_name FROM information_schema.columns WHERE table_name = %s """, (table_name,)) db_cols = {row[0] for row in cur.fetchall()} # Check if 'id' column exists, if not add it (JIT schema evolution) if 'id' not in db_cols: try: cur.execute(f'ALTER TABLE "{table_name}" ADD COLUMN "id" SERIAL') conn.commit() db_cols.add('id') except Exception as e: logger.error(f"Error adding id column to {table_name}: {e}") conn.rollback() valid_cols = [c for c in columns if c in db_cols] if not valid_cols: return # 4. Upsert cols_str = ", ".join([f'"{c}"' for c in valid_cols]) vals_str = ", ".join(["%s"] * len(valid_cols)) updates = [f'"{c}" = EXCLUDED."{c}"' for c in valid_cols if c not in pk_cols] if 'update_date' in db_cols: updates.append('"update_date" = CURRENT_TIMESTAMP') update_clause = f"DO UPDATE SET {', '.join(updates)}" if updates else "DO NOTHING" pk_conflict = ", ".join([f'"{c}"' for c in pk_cols]) insert_sql = f""" INSERT INTO {table_name} ({cols_str}) VALUES ({vals_str}) ON CONFLICT ({pk_conflict}) {update_clause} """ data_tuples = [tuple(x) for x in df_clean[valid_cols].to_numpy()] cur.executemany(insert_sql, data_tuples) conn.commit() # logger.info(f"✅ Saved {len(data_tuples)} rows to {table_name}") except Exception as e: logger.error(f"Error saving to {table_name}: {e}", exc_info=True) conn.rollback() finally: conn.close() def _parse_ifind_tables(self, res: dict, params: dict = None) -> pd.DataFrame: if not res: return pd.DataFrame() error_code = res.get("errorcode", 0) if error_code != 0: logger.error(f"iFinD API Error: {res.get('errmsg')} (code: {error_code})") if params: logger.error(f"Failed Params: {params}") return pd.DataFrame() tables = res.get("tables", []) if not tables: return pd.DataFrame() table_info = tables[0] table_data = table_info.get("table", {}) times = table_info.get("time", []) if not table_data: return pd.DataFrame() processed_table_data = {} for k, v in table_data.items(): if not isinstance(v, list): processed_table_data[k] = [v] else: processed_table_data[k] = v df = pd.DataFrame(processed_table_data) if times and len(times) == len(df): df['end_date'] = [str(t).replace('-', '').replace('/', '').split(' ')[0] for t in times] elif times and len(df) == 1: df['end_date'] = str(times[0]).replace('-', '').replace('/', '').split(' ')[0] if 'end_date' not in df.columns: for col in ['time', 'date', 'trade_date', 'REPORT_DATE']: if col in df.columns: df['end_date'] = df[col].astype(str).str.replace('-', '').str.replace('/', '').str.split(' ').str[0] break return df def _filter_data(self, df: pd.DataFrame) -> pd.DataFrame: if df.empty or 'end_date' not in df.columns: return df df = df.sort_values(by='end_date', ascending=False) df = df.drop_duplicates(subset=['end_date'], keep='first') if df.empty: return df latest_record = df.iloc[[0]] try: latest_date_str = str(latest_record['end_date'].values[0]) last_year_date_str = str(int(latest_date_str) - 10000) comparable_record = df[df['end_date'].astype(str) == last_year_date_str] except: comparable_record = pd.DataFrame() if comparable_record.empty: dfs_to_concat = [latest_record, df] else: dfs_to_concat = [latest_record, comparable_record, df] combined = pd.concat(dfs_to_concat) combined = combined.drop_duplicates(subset=['end_date']) combined = combined.sort_values(by='end_date', ascending=False) return combined def _fetch_basic_info(self, symbol: str, code: str) -> dict: if code in self._basic_info_cache: return self._basic_info_cache[code] params = { "codes": code, "indipara": [ {"indicator": "corp_cn_name", "indiparams": []}, {"indicator": "accounting_date", "indiparams": []}, {"indicator": "ipo_date", "indiparams": []} ] } res = self.cli.post("basic_data_service", params) df = self._parse_ifind_tables(res, params) if not df.empty: df['code'] = code # Inject ID # In backend, we skip file saving and rely on DB # self._save_raw_data(df, symbol, "basic_info_raw") self._save_df_to_wide_table("ifind_hk_stock_basic", df, ["code"]) info = { "name": "", "accounting_date": "1231", "ipo_date": "" } if not df.empty: row = df.iloc[0] info["name"] = str(row.get("corp_cn_name", "")) info["acc_date"] = str(row.get("accounting_date", "1231")) info["accounting_date"] = "1231" info["ipo_date"] = str(row.get("ipo_date", "")).replace("-", "").replace("/", "") self._basic_info_cache[code] = info return info def _fetch_financial_data_annual(self, symbol: str, code: str, indicator_configs: list) -> pd.DataFrame: current_year = int(time.strftime("%Y")) last_valid_year = None for offset in range(3): test_year = current_year - offset test_date = f"{test_year}1231" first_indicator = indicator_configs[0] params = { "codes": code, "indipara": [ {"indicator": first_indicator["indicator"], "indiparams": [test_date, first_indicator.get("type", "1"), "CNY"]} ] } res = self.cli.post("basic_data_service", params) df = self._parse_ifind_tables(res) if not df.empty: valid_val = df.iloc[0, 0] if not df.empty and df.shape[1] > 0 else None if pd.notna(valid_val) and valid_val != 0: last_valid_year = test_year break if last_valid_year is None: last_valid_year = current_year all_dfs = [] for i in range(5): target_year = last_valid_year - i target_date = f"{target_year}1231" params = { "codes": code, "indipara": [ {"indicator": item["indicator"], "indiparams": [target_date, item.get("type", "1"), "CNY"]} for item in indicator_configs ] } res = self.cli.post("basic_data_service", params) df = self._parse_ifind_tables(res, params) if not df.empty: valid_cols = [c for c in df.columns if c not in ['end_date', 'date']] if not df[valid_cols].isnull().all().all(): df['end_date'] = target_date all_dfs.append(df) if not all_dfs: return pd.DataFrame() # Cache the actual years found in financial statements try: years = sorted([int(str(d)[:4]) for d in pd.concat(all_dfs)['end_date'].unique() if str(d)[:4].isdigit()]) if years: self._active_years_cache[code] = years except: pass return pd.concat(all_dfs, ignore_index=True) def get_income_statement(self, symbol: str, code: str) -> pd.DataFrame: indicators = [ {"indicator": "total_oi"}, {"indicator": "prime_oi"}, {"indicator": "other_oi"}, {"indicator": "operating_cost"}, {"indicator": "operating_expense"}, {"indicator": "operating_fee"}, {"indicator": "p_depreciation_and_amortization"}, {"indicator": "gross_profit"}, {"indicator": "sales_ad_and_ga"}, {"indicator": "rad_cost"}, {"indicator": "sales_fee"}, {"indicator": "financial_expense"}, {"indicator": "sales_income"}, {"indicator": "sales_cost"}, {"indicator": "other_income"}, {"indicator": "manage_fee"}, {"indicator": "deprec_and_amorti"}, {"indicator": "total_other_opearting_expense"}, {"indicator": "p_total_cost"}, {"indicator": "operating_profit"}, {"indicator": "total_gal"}, {"indicator": "interest_income"}, {"indicator": "interest_net_pay"}, {"indicator": "interest_expense"}, {"indicator": "income_from_asso_and_joint"}, {"indicator": "other_gal_effct_profit_pre_tax"}, {"indicator": "conti_op_before_tax"}, {"indicator": "profit_before_noncurrent_items"}, {"indicator": "profit_and_loss_of_noncurrent_items"}, {"indicator": "profit_before_tax"}, {"indicator": "income_tax"}, {"indicator": "profit_after_tax"}, {"indicator": "minoritygal"}, {"indicator": "continue_operate_net_profit"}, {"indicator": "noncontinue_operate_net_profit"}, {"indicator": "other_special_items"}, {"indicator": "ni_attr_to_cs"}, {"indicator": "np_atms"}, {"indicator": "preferred_divid_and_other_adjust"}, {"indicator": "oci"}, {"indicator": "total_oci"}, {"indicator": "oci_from_parent"}, {"indicator": "oci_from_minority"}, {"indicator": "invest_property_fv_chg"}, {"indicator": "operating_amt"}, {"indicator": "oi_si"}, {"indicator": "operating_premium_profit_si"}, {"indicator": "to_toallied_corp_perf"}, {"indicator": "to_joint_control_entity_perf"}, {"indicator": "pre_tax_profit_si"}, {"indicator": "after_tax_profit_si"}, {"indicator": "profit_attrbt_to_nonholders"}, {"indicator": "total_income_atncs"} ] df = self._fetch_financial_data_annual(symbol, code, indicators) if df.empty: return df # Backend assumption: no manual raw file saving. # Save Wide Table df['code'] = code # Ensure Primary Key part 1 df['symbol'] = symbol # Primary Key: code + end_date self._save_df_to_wide_table("ifind_hk_income_statement", df, ["code", "end_date"]) rename_map = { 'total_oi': 'revenue', 'operating_amt': 'turnover', 'gross_profit': 'gross_profit', 'sales_ad_and_ga': 'sga_exp', 'sales_fee': 'selling_marketing_exp', 'manage_fee': 'ga_exp', 'rad_cost': 'rd_exp', 'income_tax': 'income_tax', 'ni_attr_to_cs': 'net_income', 'operating_profit': 'operating_profit', 'depreciation': 'depreciation', 'deprec_and_amorti': 'depreciation', 'p_depreciation_and_amortization': 'depreciation' } df_filtered = df.rename(columns=rename_map) df_filtered = df_filtered.loc[:, ~df_filtered.columns.duplicated()] if 'ebit' not in df_filtered.columns and 'operating_profit' in df_filtered.columns: df_filtered['ebit'] = df_filtered['operating_profit'] for col in df_filtered.columns: if col not in ['date', 'end_date', 'code', 'symbol']: df_filtered[col] = pd.to_numeric(df_filtered[col], errors='coerce') return self._filter_data(df_filtered) def get_balance_sheet(self, symbol: str, code: str) -> pd.DataFrame: indicators = [ {"indicator": "cce"}, {"indicator": "st_investment"}, {"indicator": "total_cash"}, {"indicator": "account_receivable"}, {"indicator": "tradable_fnncl_asset"}, {"indicator": "derivative_fnncl_assets"}, {"indicator": "restriv_fund"}, {"indicator": "other_short_term_investment"}, {"indicator": "ar_nr"}, {"indicator": "total_ar"}, {"indicator": "or"}, {"indicator": "inventory"}, {"indicator": "flow_assets_dit"}, {"indicator": "pre_payment"}, {"indicator": "other_cunrrent_assets_si"}, {"indicator": "other_ca"}, {"indicator": "total_ca"}, {"indicator": "receivables_from_allied_corp"}, {"indicator": "current_assets_si"}, {"indicator": "prepay_deposits_etc"}, {"indicator": "receivables_from_jce"}, {"indicator": "receivables_from_ac"}, {"indicator": "recoverable_tax"}, {"indicator": "total_fixed_assets"}, {"indicator": "depreciation"}, {"indicator": "equity_and_lt_invest"}, {"indicator": "net_fixed_assets"}, {"indicator": "invest_property"}, {"indicator": "equity_investment"}, {"indicator": "investment_in_associate"}, {"indicator": "investment_in_joints"}, {"indicator": "held_to_maturity_invest"}, {"indicator": "goodwill_and_intangible_asset"}, {"indicator": "intangible_assets"}, {"indicator": "accum_amortized"}, {"indicator": "noncurrent_assets_dit"}, {"indicator": "other_noncurrent_assets_si"}, {"indicator": "dt_assets"}, {"indicator": "total_noncurrent_assets"}, {"indicator": "total_assets"}, {"indicator": "ac_equity"}, {"indicator": "lease_prepay"}, {"indicator": "noncurrent_assets_si"}, {"indicator": "st_lt_current_loan"}, {"indicator": "trade_financial_lia"}, {"indicator": "derivative_financial_lia"}, {"indicator": "ap_np"}, {"indicator": "accounts_payable"}, {"indicator": "advance_payment"}, {"indicator": "st_debt"}, {"indicator": "contra_liab"}, {"indicator": "tax_payable"}, {"indicator": "accrued_liab"}, {"indicator": "flow_debt_deferred_income"}, {"indicator": "other_cl"}, {"indicator": "other_cunrrent_liab_si"}, {"indicator": "total_cl"}, {"indicator": "accrued_expenses_etc"}, {"indicator": "money_payable_toac"}, {"indicator": "joint_control_entity_payable"}, {"indicator": "payable_to_associated_corp"}, {"indicator": "lt_debt"}, {"indicator": "long_term_loan"}, {"indicator": "other_noncurrent_liabi"}, {"indicator": "deferred_tax_liability"}, {"indicator": "ncl_deferred_income"}, {"indicator": "other_noncurrent_liab_si"}, {"indicator": "noncurrent_liab_si"}, {"indicator": "total_noncurrent_liab"}, {"indicator": "total_liab"}, {"indicator": "common_shares"}, {"indicator": "capital_reserve"}, {"indicator": "equity_premium"}, {"indicator": "treasury_stock"}, {"indicator": "accumgal"}, {"indicator": "equity_atsopc_sbi"}, {"indicator": "preferred_stock"}, {"indicator": "perpetual_debt"}, {"indicator": "reserve"}, {"indicator": "other_reserves"}, {"indicator": "retained_earnings"}, {"indicator": "oci_bs"}, {"indicator": "total_common_equity"}, {"indicator": "equity_belong_to_parent"}, {"indicator": "minority_interests"}, {"indicator": "other_equity_si"}, {"indicator": "total_equity"}, {"indicator": "total_lib_and_equity"}, {"indicator": "equity_si"}, {"indicator": "equity_atncs"} ] df = self._fetch_financial_data_annual(symbol, code, indicators) if df.empty: return df df['code'] = code df['symbol'] = symbol self._save_df_to_wide_table("ifind_hk_balance_sheet", df, ["code", "end_date"]) rename_map = { 'cce': 'cash', 'ar_nr': 'receivables', 'inventory': 'inventory', 'net_fixed_assets': 'fixed_assets', 'equity_and_lt_invest': 'long_term_investments', 'goodwill_and_intangible_asset': 'goodwill', 'st_debt': 'short_term_debt', 'st_lt_current_loan': 'short_term_borrowings', 'ap_np': 'accounts_payable', 'contra_liab': 'contract_liabilities', 'advance_payment': 'advances_from_customers', 'flow_debt_deferred_income': 'deferred_revenue', 'lt_debt': 'long_term_debt', 'long_term_loan': 'long_term_borrowings', 'total_assets': 'total_assets', 'equity_belong_to_parent': 'total_equity', 'pre_payment': 'prepayment' } df_filtered = df.rename(columns=rename_map) df_filtered = df_filtered.loc[:, ~df_filtered.columns.duplicated()] if 'total_liabilities' not in df_filtered.columns or df_filtered['total_liabilities'].isnull().all(): if 'total_liab' in df_filtered.columns: df_filtered['total_liabilities'] = df_filtered['total_liab'] elif 'total_assets' in df_filtered.columns and 'total_equity' in df_filtered.columns: df_filtered['total_liabilities'] = df_filtered['total_assets'] - df_filtered['total_equity'] df_filtered = df_filtered.loc[:, ~df_filtered.columns.duplicated()] for col in df_filtered.columns: if col not in ['date', 'end_date', 'code', 'symbol']: df_filtered[col] = pd.to_numeric(df_filtered[col], errors='coerce') return self._filter_data(df_filtered) def get_cash_flow(self, symbol: str, code: str) -> pd.DataFrame: indicators = [ {"indicator": "ni"}, {"indicator": "depreciation_and_amortization"}, {"indicator": "operating_capital_change"}, {"indicator": "ncf_from_oa"}, {"indicator": "capital_cost"}, {"indicator": "invest_buy"}, {"indicator": "ncf_from_ia"}, {"indicator": "increase_in_share_capital"}, {"indicator": "decrease_in_share_capital"}, {"indicator": "total_dividends_paid"}, {"indicator": "ncf_from_fa"} ] df = self._fetch_financial_data_annual(symbol, code, indicators) if df.empty: return df df['code'] = code df['symbol'] = symbol self._save_df_to_wide_table("ifind_hk_cash_flow", df, ["code", "end_date"]) rename_map = { 'ncf_from_oa': 'ocf', 'capital_cost': 'capex', 'total_dividends_paid': 'dividends' } df_filtered = df.rename(columns=rename_map) df_filtered = df_filtered.loc[:, ~df_filtered.columns.duplicated()] for col in df_filtered.columns: if col not in ['date', 'end_date', 'code', 'symbol']: df_filtered[col] = pd.to_numeric(df_filtered[col], errors='coerce') if 'capex' in df_filtered.columns: df_filtered['capex'] = df_filtered['capex'].abs() return self._filter_data(df_filtered) def get_market_metrics(self, symbol: str, code: str) -> dict: basic_info = self._fetch_basic_info(symbol, code) metrics = { "name": basic_info.get("name", ""), "list_date": basic_info.get("ipo_date", ""), "accounting_date": basic_info.get("accounting_date", ""), "price": 0, "market_cap": 0, "pe": 0, "pb": 0, "dividend_yield": 0 } # In backend logic we cannot easily read file. # But we can query DB for income statement dates! # Or just fallback to 'active_years' logic. # For simplicity in migration, let's use the DB query if possible, or just the fallback logic. report_dates = [] # Try to get dates from active_years_cache first (populated by get_income_statement call in flow) if code in self._active_years_cache: years = self._active_years_cache[code] report_dates = [f"{y}1231" for y in years] else: # Fallback current_year = int(time.strftime("%Y")) report_dates = [f"{current_year - i}1231" for i in range(5)] # Fetch daily_basic data for each report date all_daily_basic = [] for date_str in report_dates: if len(str(date_str)) == 8: formatted_date = f"{str(date_str)[:4]}-{str(date_str)[4:6]}-{str(date_str)[6:]}" else: formatted_date = str(date_str) params = { "codes": code, "functionpara": {"date_sequence": formatted_date}, "startdate": "", "enddate": "", "indipara": [ {"indicator": "close_price", "indiparams": ["", "0", "BB"]}, {"indicator": "market_value", "indiparams": ["", "BB"]}, {"indicator": "pb_latest", "indiparams": ["", "100"]}, {"indicator": "dividend_yield_ttm_ex_sd", "indiparams": [""]}, {"indicator": "pe_ttm", "indiparams": ["", "100"]} ] } res = self.cli.post("date_sequence", params) df = self._parse_ifind_tables(res) if not df.empty: df['code'] = code df['end_date'] = str(date_str).replace('-', '') all_daily_basic.append(df) # Save to DB if all_daily_basic: combined_df = pd.concat(all_daily_basic, ignore_index=True) self._save_df_to_wide_table("ifind_hk_daily_basic", combined_df, ["code", "end_date"]) # Use the first (most recent) data for metrics dict latest_df = all_daily_basic[0] if not latest_df.empty: row = latest_df.iloc[0] metrics["price"] = float(row.get("close_price") or 0) metrics["market_cap"] = float(row.get("market_value") or 0) metrics["pe"] = float(row.get("pe_ttm") or 0) metrics["pb"] = float(row.get("pb_latest") or 0) metrics["dividend_yield"] = float(row.get("dividend_yield_ttm_ex_sd") or 0) return metrics def get_dividends(self, symbol: str, code: str) -> pd.DataFrame: if code in self._active_years_cache: years_to_fetch = sorted(self._active_years_cache[code], reverse=True) else: current_year = int(time.strftime("%Y")) years_to_fetch = [current_year - i for i in range(5)] results = [] for year_int in years_to_fetch: year_str = str(year_int) params = { "codes": code, "indipara": [ {"indicator": "annual_cum_dividend", "indiparams": [year_str, "CNY"]} ] } res = self.cli.post("basic_data_service", params) df = self._parse_ifind_tables(res) if not df.empty and 'annual_cum_dividend' in df.columns: df['code'] = code end_date = f"{year_str}1231" df['end_date'] = end_date self._save_df_to_wide_table("ifind_hk_dividend", df, ["code", "end_date"]) val = df['annual_cum_dividend'].iloc[0] if pd.notna(val) and val != 0: results.append({ 'date_str': end_date, 'dividends': float(val) }) if not results: return pd.DataFrame() df_div = pd.DataFrame(results) return df_div def get_repurchases(self, symbol: str, code: str) -> pd.DataFrame: if code in self._active_years_cache: years_to_fetch = sorted(self._active_years_cache[code], reverse=True) else: current_year = int(time.strftime("%Y")) years_to_fetch = [current_year - i for i in range(5)] results = [] for target_year in years_to_fetch: start_date = f"{target_year - 1}-12-31" end_date = f"{target_year}-12-31" params = { "codes": code, "indipara": [ {"indicator": "repurchase_volume_interval", "indiparams": [start_date, end_date]}, {"indicator": "repurchase_amount_interval", "indiparams": [start_date, end_date]}, {"indicator": "repurchase_low_price", "indiparams": [start_date, end_date]}, {"indicator": "repurchase_high_price", "indiparams": [start_date, end_date]} ] } res = self.cli.post("basic_data_service", params) df = self._parse_ifind_tables(res) if not df.empty: valid_row = False for col in df.columns: val = df[col].iloc[0] if pd.notna(val) and val != 0: valid_row = True break if valid_row: df['code'] = code df['end_date'] = end_date.replace('-', '') self._save_df_to_wide_table("ifind_hk_repurchase", df, ["code", "end_date"])