759 lines
31 KiB
Python
759 lines
31 KiB
Python
|
|
import pandas as pd
|
|
import time
|
|
import os
|
|
import psycopg2
|
|
import numpy as np
|
|
from typing import List, Union
|
|
from dotenv import load_dotenv
|
|
from pathlib import Path
|
|
import logging
|
|
from .ifind_client import IFindClient
|
|
|
|
# Get logger
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Explicitly load .env from project root
|
|
ROOT_DIR = Path(__file__).resolve().parent.parent.parent.parent
|
|
load_dotenv(ROOT_DIR / ".env")
|
|
|
|
class IFindHKClient:
|
|
"""
|
|
iFinD Client specifically for Hong Kong Market (Backend Version).
|
|
Uses 'THS' indicators and Chinese accounting standard mappings.
|
|
Implements direct Postgres persistence (Side-Channel) for background task compatibility.
|
|
"""
|
|
def __init__(self, api_key: str):
|
|
self.cli = IFindClient(refresh_token=api_key)
|
|
self.market = 'HK'
|
|
self._basic_info_cache = {}
|
|
self._active_years_cache = {}
|
|
|
|
def _get_db_connection(self):
|
|
"""Create a database connection."""
|
|
db_host = os.getenv("DB_HOST", "192.168.3.195")
|
|
db_user = os.getenv("DB_USER", "value")
|
|
db_pass = os.getenv("DB_PASSWORD", "Value609!")
|
|
db_name = os.getenv("DB_NAME", "fa3")
|
|
db_port = os.getenv("DB_PORT", "5432")
|
|
|
|
try:
|
|
conn = psycopg2.connect(
|
|
host=db_host, user=db_user, password=db_pass, dbname=db_name, port=db_port
|
|
)
|
|
conn.set_client_encoding('UTF8')
|
|
return conn
|
|
except Exception as e:
|
|
logger.error(f"DB Connection Error: {e}")
|
|
return None
|
|
|
|
def _map_dtype_to_sql(self, dtype):
|
|
"""Map pandas dtype to PostgreSQL type."""
|
|
if pd.api.types.is_integer_dtype(dtype):
|
|
return "BIGINT"
|
|
elif pd.api.types.is_float_dtype(dtype):
|
|
return "NUMERIC"
|
|
elif pd.api.types.is_bool_dtype(dtype):
|
|
return "BOOLEAN"
|
|
elif pd.api.types.is_datetime64_any_dtype(dtype):
|
|
return "TIMESTAMP"
|
|
else:
|
|
return "TEXT"
|
|
|
|
def _save_df_to_wide_table(self, table_name: str, df: pd.DataFrame, pk_cols: List[str]):
|
|
"""
|
|
Save DataFrame to a specific wide table using direct psycopg2 connection.
|
|
Creates table if not exists using DF columns.
|
|
Performs incremental UPSERT.
|
|
"""
|
|
if df is None or df.empty:
|
|
return
|
|
|
|
conn = self._get_db_connection()
|
|
if not conn: return
|
|
|
|
try:
|
|
# 1. Clean Data
|
|
df_clean = df.replace({np.nan: None})
|
|
|
|
# Convert date columns to YYYY-MM-DD format if needed
|
|
for col in df_clean.columns:
|
|
if 'date' in col.lower() and df_clean[col].dtype == 'object':
|
|
try:
|
|
sample = df_clean[col].dropna().astype(str).iloc[0] if not df_clean[col].dropna().empty else ""
|
|
if len(sample) == 8 and sample.isdigit():
|
|
df_clean[col] = df_clean[col].astype(str).apply(
|
|
lambda x: f"{x[:4]}-{x[4:6]}-{x[6:]}" if x and len(str(x))==8 else x
|
|
)
|
|
except:
|
|
pass
|
|
|
|
columns = list(df_clean.columns)
|
|
|
|
with conn.cursor() as cur:
|
|
# 2. Check/Create Table
|
|
cur.execute("SELECT to_regclass(%s)", (f"public.{table_name}",))
|
|
table_exists = cur.fetchone()[0] is not None
|
|
|
|
# Filter out valid columns for SQL
|
|
# Just use dataframe columns for initial creation
|
|
|
|
if not table_exists:
|
|
logger.info(f"🆕 [Database] Creating table {table_name}...")
|
|
col_defs = ['"id" SERIAL PRIMARY KEY']
|
|
for col in columns:
|
|
sql_type = self._map_dtype_to_sql(df_clean[col].dtype)
|
|
col_defs.append(f'"{col}" {sql_type}')
|
|
|
|
col_defs.append('"update_date" TIMESTAMP DEFAULT CURRENT_TIMESTAMP')
|
|
pk_str = ", ".join([f'"{c}"' for c in pk_cols])
|
|
constraint_name = f"uq_{table_name}"
|
|
|
|
create_sql = f"""
|
|
CREATE TABLE IF NOT EXISTS {table_name} (
|
|
{', '.join(col_defs)},
|
|
CONSTRAINT {constraint_name} UNIQUE ({pk_str})
|
|
);
|
|
"""
|
|
cur.execute(create_sql)
|
|
conn.commit()
|
|
|
|
# 3. Get existing columns to avoid insertion errors
|
|
cur.execute("""
|
|
SELECT column_name
|
|
FROM information_schema.columns
|
|
WHERE table_name = %s
|
|
""", (table_name,))
|
|
db_cols = {row[0] for row in cur.fetchall()}
|
|
|
|
# Check if 'id' column exists, if not add it (JIT schema evolution)
|
|
if 'id' not in db_cols:
|
|
try:
|
|
cur.execute(f'ALTER TABLE "{table_name}" ADD COLUMN "id" SERIAL')
|
|
conn.commit()
|
|
db_cols.add('id')
|
|
except Exception as e:
|
|
logger.error(f"Error adding id column to {table_name}: {e}")
|
|
conn.rollback()
|
|
|
|
valid_cols = [c for c in columns if c in db_cols]
|
|
if not valid_cols: return
|
|
|
|
# 4. Upsert
|
|
cols_str = ", ".join([f'"{c}"' for c in valid_cols])
|
|
vals_str = ", ".join(["%s"] * len(valid_cols))
|
|
updates = [f'"{c}" = EXCLUDED."{c}"' for c in valid_cols if c not in pk_cols]
|
|
if 'update_date' in db_cols:
|
|
updates.append('"update_date" = CURRENT_TIMESTAMP')
|
|
update_clause = f"DO UPDATE SET {', '.join(updates)}" if updates else "DO NOTHING"
|
|
pk_conflict = ", ".join([f'"{c}"' for c in pk_cols])
|
|
|
|
insert_sql = f"""
|
|
INSERT INTO {table_name} ({cols_str})
|
|
VALUES ({vals_str})
|
|
ON CONFLICT ({pk_conflict})
|
|
{update_clause}
|
|
"""
|
|
|
|
data_tuples = [tuple(x) for x in df_clean[valid_cols].to_numpy()]
|
|
cur.executemany(insert_sql, data_tuples)
|
|
conn.commit()
|
|
# logger.info(f"✅ Saved {len(data_tuples)} rows to {table_name}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error saving to {table_name}: {e}", exc_info=True)
|
|
conn.rollback()
|
|
finally:
|
|
conn.close()
|
|
|
|
def _parse_ifind_tables(self, res: dict, params: dict = None) -> pd.DataFrame:
|
|
if not res:
|
|
return pd.DataFrame()
|
|
|
|
error_code = res.get("errorcode", 0)
|
|
if error_code != 0:
|
|
logger.error(f"iFinD API Error: {res.get('errmsg')} (code: {error_code})")
|
|
if params:
|
|
logger.error(f"Failed Params: {params}")
|
|
return pd.DataFrame()
|
|
|
|
tables = res.get("tables", [])
|
|
if not tables:
|
|
return pd.DataFrame()
|
|
|
|
table_info = tables[0]
|
|
table_data = table_info.get("table", {})
|
|
times = table_info.get("time", [])
|
|
|
|
if not table_data:
|
|
return pd.DataFrame()
|
|
|
|
processed_table_data = {}
|
|
for k, v in table_data.items():
|
|
if not isinstance(v, list):
|
|
processed_table_data[k] = [v]
|
|
else:
|
|
processed_table_data[k] = v
|
|
|
|
df = pd.DataFrame(processed_table_data)
|
|
if times and len(times) == len(df):
|
|
df['end_date'] = [str(t).replace('-', '').replace('/', '').split(' ')[0] for t in times]
|
|
elif times and len(df) == 1:
|
|
df['end_date'] = str(times[0]).replace('-', '').replace('/', '').split(' ')[0]
|
|
|
|
if 'end_date' not in df.columns:
|
|
for col in ['time', 'date', 'trade_date', 'REPORT_DATE']:
|
|
if col in df.columns:
|
|
df['end_date'] = df[col].astype(str).str.replace('-', '').str.replace('/', '').str.split(' ').str[0]
|
|
break
|
|
|
|
return df
|
|
|
|
def _filter_data(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
if df.empty or 'end_date' not in df.columns:
|
|
return df
|
|
|
|
df = df.sort_values(by='end_date', ascending=False)
|
|
df = df.drop_duplicates(subset=['end_date'], keep='first')
|
|
|
|
if df.empty:
|
|
return df
|
|
|
|
latest_record = df.iloc[[0]]
|
|
try:
|
|
latest_date_str = str(latest_record['end_date'].values[0])
|
|
last_year_date_str = str(int(latest_date_str) - 10000)
|
|
comparable_record = df[df['end_date'].astype(str) == last_year_date_str]
|
|
except:
|
|
comparable_record = pd.DataFrame()
|
|
|
|
if comparable_record.empty:
|
|
dfs_to_concat = [latest_record, df]
|
|
else:
|
|
dfs_to_concat = [latest_record, comparable_record, df]
|
|
|
|
combined = pd.concat(dfs_to_concat)
|
|
combined = combined.drop_duplicates(subset=['end_date'])
|
|
combined = combined.sort_values(by='end_date', ascending=False)
|
|
return combined
|
|
|
|
def _fetch_basic_info(self, symbol: str, code: str) -> dict:
|
|
if code in self._basic_info_cache:
|
|
return self._basic_info_cache[code]
|
|
|
|
params = {
|
|
"codes": code,
|
|
"indipara": [
|
|
{"indicator": "corp_cn_name", "indiparams": []},
|
|
{"indicator": "accounting_date", "indiparams": []},
|
|
{"indicator": "ipo_date", "indiparams": []}
|
|
]
|
|
}
|
|
res = self.cli.post("basic_data_service", params)
|
|
df = self._parse_ifind_tables(res, params)
|
|
if not df.empty:
|
|
df['code'] = code # Inject ID
|
|
# In backend, we skip file saving and rely on DB
|
|
# self._save_raw_data(df, symbol, "basic_info_raw")
|
|
self._save_df_to_wide_table("ifind_hk_stock_basic", df, ["code"])
|
|
|
|
info = {
|
|
"name": "",
|
|
"accounting_date": "1231",
|
|
"ipo_date": ""
|
|
}
|
|
|
|
if not df.empty:
|
|
row = df.iloc[0]
|
|
info["name"] = str(row.get("corp_cn_name", ""))
|
|
info["acc_date"] = str(row.get("accounting_date", "1231"))
|
|
info["accounting_date"] = "1231"
|
|
info["ipo_date"] = str(row.get("ipo_date", "")).replace("-", "").replace("/", "")
|
|
|
|
self._basic_info_cache[code] = info
|
|
return info
|
|
|
|
def _fetch_financial_data_annual(self, symbol: str, code: str, indicator_configs: list) -> pd.DataFrame:
|
|
current_year = int(time.strftime("%Y"))
|
|
|
|
last_valid_year = None
|
|
for offset in range(3):
|
|
test_year = current_year - offset
|
|
test_date = f"{test_year}1231"
|
|
first_indicator = indicator_configs[0]
|
|
params = {
|
|
"codes": code,
|
|
"indipara": [
|
|
{"indicator": first_indicator["indicator"], "indiparams": [test_date, first_indicator.get("type", "1"), "CNY"]}
|
|
]
|
|
}
|
|
res = self.cli.post("basic_data_service", params)
|
|
df = self._parse_ifind_tables(res)
|
|
|
|
if not df.empty:
|
|
valid_val = df.iloc[0, 0] if not df.empty and df.shape[1] > 0 else None
|
|
if pd.notna(valid_val) and valid_val != 0:
|
|
last_valid_year = test_year
|
|
break
|
|
|
|
if last_valid_year is None:
|
|
last_valid_year = current_year
|
|
|
|
all_dfs = []
|
|
for i in range(5):
|
|
target_year = last_valid_year - i
|
|
target_date = f"{target_year}1231"
|
|
|
|
params = {
|
|
"codes": code,
|
|
"indipara": [
|
|
{"indicator": item["indicator"], "indiparams": [target_date, item.get("type", "1"), "CNY"]}
|
|
for item in indicator_configs
|
|
]
|
|
}
|
|
res = self.cli.post("basic_data_service", params)
|
|
df = self._parse_ifind_tables(res, params)
|
|
|
|
if not df.empty:
|
|
valid_cols = [c for c in df.columns if c not in ['end_date', 'date']]
|
|
if not df[valid_cols].isnull().all().all():
|
|
df['end_date'] = target_date
|
|
all_dfs.append(df)
|
|
|
|
if not all_dfs:
|
|
return pd.DataFrame()
|
|
|
|
# Cache the actual years found in financial statements
|
|
try:
|
|
years = sorted([int(str(d)[:4]) for d in pd.concat(all_dfs)['end_date'].unique() if str(d)[:4].isdigit()])
|
|
if years:
|
|
self._active_years_cache[code] = years
|
|
except:
|
|
pass
|
|
|
|
return pd.concat(all_dfs, ignore_index=True)
|
|
|
|
def get_income_statement(self, symbol: str, code: str) -> pd.DataFrame:
|
|
indicators = [
|
|
{"indicator": "total_oi"},
|
|
{"indicator": "prime_oi"},
|
|
{"indicator": "other_oi"},
|
|
{"indicator": "operating_cost"},
|
|
{"indicator": "operating_expense"},
|
|
{"indicator": "operating_fee"},
|
|
{"indicator": "p_depreciation_and_amortization"},
|
|
{"indicator": "gross_profit"},
|
|
{"indicator": "sales_ad_and_ga"},
|
|
{"indicator": "rad_cost"},
|
|
{"indicator": "sales_fee"},
|
|
{"indicator": "financial_expense"},
|
|
{"indicator": "sales_income"},
|
|
{"indicator": "sales_cost"},
|
|
{"indicator": "other_income"},
|
|
{"indicator": "manage_fee"},
|
|
{"indicator": "deprec_and_amorti"},
|
|
{"indicator": "total_other_opearting_expense"},
|
|
{"indicator": "p_total_cost"},
|
|
{"indicator": "operating_profit"},
|
|
{"indicator": "total_gal"},
|
|
{"indicator": "interest_income"},
|
|
{"indicator": "interest_net_pay"},
|
|
{"indicator": "interest_expense"},
|
|
{"indicator": "income_from_asso_and_joint"},
|
|
{"indicator": "other_gal_effct_profit_pre_tax"},
|
|
{"indicator": "conti_op_before_tax"},
|
|
{"indicator": "profit_before_noncurrent_items"},
|
|
{"indicator": "profit_and_loss_of_noncurrent_items"},
|
|
{"indicator": "profit_before_tax"},
|
|
{"indicator": "income_tax"},
|
|
{"indicator": "profit_after_tax"},
|
|
{"indicator": "minoritygal"},
|
|
{"indicator": "continue_operate_net_profit"},
|
|
{"indicator": "noncontinue_operate_net_profit"},
|
|
{"indicator": "other_special_items"},
|
|
{"indicator": "ni_attr_to_cs"},
|
|
{"indicator": "np_atms"},
|
|
{"indicator": "preferred_divid_and_other_adjust"},
|
|
{"indicator": "oci"},
|
|
{"indicator": "total_oci"},
|
|
{"indicator": "oci_from_parent"},
|
|
{"indicator": "oci_from_minority"},
|
|
{"indicator": "invest_property_fv_chg"},
|
|
{"indicator": "operating_amt"},
|
|
{"indicator": "oi_si"},
|
|
{"indicator": "operating_premium_profit_si"},
|
|
{"indicator": "to_toallied_corp_perf"},
|
|
{"indicator": "to_joint_control_entity_perf"},
|
|
{"indicator": "pre_tax_profit_si"},
|
|
{"indicator": "after_tax_profit_si"},
|
|
{"indicator": "profit_attrbt_to_nonholders"},
|
|
{"indicator": "total_income_atncs"}
|
|
]
|
|
|
|
df = self._fetch_financial_data_annual(symbol, code, indicators)
|
|
if df.empty: return df
|
|
# Backend assumption: no manual raw file saving.
|
|
|
|
# Save Wide Table
|
|
df['code'] = code # Ensure Primary Key part 1
|
|
df['symbol'] = symbol
|
|
# Primary Key: code + end_date
|
|
self._save_df_to_wide_table("ifind_hk_income_statement", df, ["code", "end_date"])
|
|
|
|
rename_map = {
|
|
'total_oi': 'revenue',
|
|
'operating_amt': 'turnover',
|
|
'gross_profit': 'gross_profit',
|
|
'sales_ad_and_ga': 'sga_exp',
|
|
'sales_fee': 'selling_marketing_exp',
|
|
'manage_fee': 'ga_exp',
|
|
'rad_cost': 'rd_exp',
|
|
'income_tax': 'income_tax',
|
|
'ni_attr_to_cs': 'net_income',
|
|
'operating_profit': 'operating_profit',
|
|
'depreciation': 'depreciation',
|
|
'deprec_and_amorti': 'depreciation',
|
|
'p_depreciation_and_amortization': 'depreciation'
|
|
}
|
|
|
|
df_filtered = df.rename(columns=rename_map)
|
|
df_filtered = df_filtered.loc[:, ~df_filtered.columns.duplicated()]
|
|
|
|
if 'ebit' not in df_filtered.columns and 'operating_profit' in df_filtered.columns:
|
|
df_filtered['ebit'] = df_filtered['operating_profit']
|
|
|
|
for col in df_filtered.columns:
|
|
if col not in ['date', 'end_date', 'code', 'symbol']:
|
|
df_filtered[col] = pd.to_numeric(df_filtered[col], errors='coerce')
|
|
|
|
return self._filter_data(df_filtered)
|
|
|
|
def get_balance_sheet(self, symbol: str, code: str) -> pd.DataFrame:
|
|
indicators = [
|
|
{"indicator": "cce"},
|
|
{"indicator": "st_investment"},
|
|
{"indicator": "total_cash"},
|
|
{"indicator": "account_receivable"},
|
|
{"indicator": "tradable_fnncl_asset"},
|
|
{"indicator": "derivative_fnncl_assets"},
|
|
{"indicator": "restriv_fund"},
|
|
{"indicator": "other_short_term_investment"},
|
|
{"indicator": "ar_nr"},
|
|
{"indicator": "total_ar"},
|
|
{"indicator": "or"},
|
|
{"indicator": "inventory"},
|
|
{"indicator": "flow_assets_dit"},
|
|
{"indicator": "pre_payment"},
|
|
{"indicator": "other_cunrrent_assets_si"},
|
|
{"indicator": "other_ca"},
|
|
{"indicator": "total_ca"},
|
|
{"indicator": "receivables_from_allied_corp"},
|
|
{"indicator": "current_assets_si"},
|
|
{"indicator": "prepay_deposits_etc"},
|
|
{"indicator": "receivables_from_jce"},
|
|
{"indicator": "receivables_from_ac"},
|
|
{"indicator": "recoverable_tax"},
|
|
{"indicator": "total_fixed_assets"},
|
|
{"indicator": "depreciation"},
|
|
{"indicator": "equity_and_lt_invest"},
|
|
{"indicator": "net_fixed_assets"},
|
|
{"indicator": "invest_property"},
|
|
{"indicator": "equity_investment"},
|
|
{"indicator": "investment_in_associate"},
|
|
{"indicator": "investment_in_joints"},
|
|
{"indicator": "held_to_maturity_invest"},
|
|
{"indicator": "goodwill_and_intangible_asset"},
|
|
{"indicator": "intangible_assets"},
|
|
{"indicator": "accum_amortized"},
|
|
{"indicator": "noncurrent_assets_dit"},
|
|
{"indicator": "other_noncurrent_assets_si"},
|
|
{"indicator": "dt_assets"},
|
|
{"indicator": "total_noncurrent_assets"},
|
|
{"indicator": "total_assets"},
|
|
{"indicator": "ac_equity"},
|
|
{"indicator": "lease_prepay"},
|
|
{"indicator": "noncurrent_assets_si"},
|
|
{"indicator": "st_lt_current_loan"},
|
|
{"indicator": "trade_financial_lia"},
|
|
{"indicator": "derivative_financial_lia"},
|
|
{"indicator": "ap_np"},
|
|
{"indicator": "accounts_payable"},
|
|
{"indicator": "advance_payment"},
|
|
{"indicator": "st_debt"},
|
|
{"indicator": "contra_liab"},
|
|
{"indicator": "tax_payable"},
|
|
{"indicator": "accrued_liab"},
|
|
{"indicator": "flow_debt_deferred_income"},
|
|
{"indicator": "other_cl"},
|
|
{"indicator": "other_cunrrent_liab_si"},
|
|
{"indicator": "total_cl"},
|
|
{"indicator": "accrued_expenses_etc"},
|
|
{"indicator": "money_payable_toac"},
|
|
{"indicator": "joint_control_entity_payable"},
|
|
{"indicator": "payable_to_associated_corp"},
|
|
{"indicator": "lt_debt"},
|
|
{"indicator": "long_term_loan"},
|
|
{"indicator": "other_noncurrent_liabi"},
|
|
{"indicator": "deferred_tax_liability"},
|
|
{"indicator": "ncl_deferred_income"},
|
|
{"indicator": "other_noncurrent_liab_si"},
|
|
{"indicator": "noncurrent_liab_si"},
|
|
{"indicator": "total_noncurrent_liab"},
|
|
{"indicator": "total_liab"},
|
|
{"indicator": "common_shares"},
|
|
{"indicator": "capital_reserve"},
|
|
{"indicator": "equity_premium"},
|
|
{"indicator": "treasury_stock"},
|
|
{"indicator": "accumgal"},
|
|
{"indicator": "equity_atsopc_sbi"},
|
|
{"indicator": "preferred_stock"},
|
|
{"indicator": "perpetual_debt"},
|
|
{"indicator": "reserve"},
|
|
{"indicator": "other_reserves"},
|
|
{"indicator": "retained_earnings"},
|
|
{"indicator": "oci_bs"},
|
|
{"indicator": "total_common_equity"},
|
|
{"indicator": "equity_belong_to_parent"},
|
|
{"indicator": "minority_interests"},
|
|
{"indicator": "other_equity_si"},
|
|
{"indicator": "total_equity"},
|
|
{"indicator": "total_lib_and_equity"},
|
|
{"indicator": "equity_si"},
|
|
{"indicator": "equity_atncs"}
|
|
]
|
|
|
|
df = self._fetch_financial_data_annual(symbol, code, indicators)
|
|
if df.empty: return df
|
|
|
|
df['code'] = code
|
|
df['symbol'] = symbol
|
|
self._save_df_to_wide_table("ifind_hk_balance_sheet", df, ["code", "end_date"])
|
|
|
|
rename_map = {
|
|
'cce': 'cash',
|
|
'ar_nr': 'receivables',
|
|
'inventory': 'inventory',
|
|
'net_fixed_assets': 'fixed_assets',
|
|
'equity_and_lt_invest': 'long_term_investments',
|
|
'goodwill_and_intangible_asset': 'goodwill',
|
|
'st_debt': 'short_term_debt',
|
|
'st_lt_current_loan': 'short_term_borrowings',
|
|
'ap_np': 'accounts_payable',
|
|
'contra_liab': 'contract_liabilities',
|
|
'advance_payment': 'advances_from_customers',
|
|
'flow_debt_deferred_income': 'deferred_revenue',
|
|
'lt_debt': 'long_term_debt',
|
|
'long_term_loan': 'long_term_borrowings',
|
|
'total_assets': 'total_assets',
|
|
'equity_belong_to_parent': 'total_equity',
|
|
'pre_payment': 'prepayment'
|
|
}
|
|
|
|
df_filtered = df.rename(columns=rename_map)
|
|
df_filtered = df_filtered.loc[:, ~df_filtered.columns.duplicated()]
|
|
|
|
if 'total_liabilities' not in df_filtered.columns or df_filtered['total_liabilities'].isnull().all():
|
|
if 'total_liab' in df_filtered.columns:
|
|
df_filtered['total_liabilities'] = df_filtered['total_liab']
|
|
elif 'total_assets' in df_filtered.columns and 'total_equity' in df_filtered.columns:
|
|
df_filtered['total_liabilities'] = df_filtered['total_assets'] - df_filtered['total_equity']
|
|
|
|
df_filtered = df_filtered.loc[:, ~df_filtered.columns.duplicated()]
|
|
|
|
for col in df_filtered.columns:
|
|
if col not in ['date', 'end_date', 'code', 'symbol']:
|
|
df_filtered[col] = pd.to_numeric(df_filtered[col], errors='coerce')
|
|
|
|
return self._filter_data(df_filtered)
|
|
|
|
def get_cash_flow(self, symbol: str, code: str) -> pd.DataFrame:
|
|
indicators = [
|
|
{"indicator": "ni"},
|
|
{"indicator": "depreciation_and_amortization"},
|
|
{"indicator": "operating_capital_change"},
|
|
{"indicator": "ncf_from_oa"},
|
|
{"indicator": "capital_cost"},
|
|
{"indicator": "invest_buy"},
|
|
{"indicator": "ncf_from_ia"},
|
|
{"indicator": "increase_in_share_capital"},
|
|
{"indicator": "decrease_in_share_capital"},
|
|
{"indicator": "total_dividends_paid"},
|
|
{"indicator": "ncf_from_fa"}
|
|
]
|
|
|
|
df = self._fetch_financial_data_annual(symbol, code, indicators)
|
|
if df.empty: return df
|
|
|
|
df['code'] = code
|
|
df['symbol'] = symbol
|
|
self._save_df_to_wide_table("ifind_hk_cash_flow", df, ["code", "end_date"])
|
|
|
|
rename_map = {
|
|
'ncf_from_oa': 'ocf',
|
|
'capital_cost': 'capex',
|
|
'total_dividends_paid': 'dividends'
|
|
}
|
|
|
|
df_filtered = df.rename(columns=rename_map)
|
|
df_filtered = df_filtered.loc[:, ~df_filtered.columns.duplicated()]
|
|
for col in df_filtered.columns:
|
|
if col not in ['date', 'end_date', 'code', 'symbol']:
|
|
df_filtered[col] = pd.to_numeric(df_filtered[col], errors='coerce')
|
|
|
|
if 'capex' in df_filtered.columns:
|
|
df_filtered['capex'] = df_filtered['capex'].abs()
|
|
|
|
return self._filter_data(df_filtered)
|
|
|
|
def get_market_metrics(self, symbol: str, code: str) -> dict:
|
|
basic_info = self._fetch_basic_info(symbol, code)
|
|
|
|
metrics = {
|
|
"name": basic_info.get("name", ""),
|
|
"list_date": basic_info.get("ipo_date", ""),
|
|
"accounting_date": basic_info.get("accounting_date", ""),
|
|
"price": 0,
|
|
"market_cap": 0,
|
|
"pe": 0,
|
|
"pb": 0,
|
|
"dividend_yield": 0
|
|
}
|
|
|
|
# In backend logic we cannot easily read file.
|
|
# But we can query DB for income statement dates!
|
|
# Or just fallback to 'active_years' logic.
|
|
# For simplicity in migration, let's use the DB query if possible, or just the fallback logic.
|
|
|
|
report_dates = []
|
|
# Try to get dates from active_years_cache first (populated by get_income_statement call in flow)
|
|
if code in self._active_years_cache:
|
|
years = self._active_years_cache[code]
|
|
report_dates = [f"{y}1231" for y in years]
|
|
else:
|
|
# Fallback
|
|
current_year = int(time.strftime("%Y"))
|
|
report_dates = [f"{current_year - i}1231" for i in range(5)]
|
|
|
|
# Fetch daily_basic data for each report date
|
|
all_daily_basic = []
|
|
for date_str in report_dates:
|
|
if len(str(date_str)) == 8:
|
|
formatted_date = f"{str(date_str)[:4]}-{str(date_str)[4:6]}-{str(date_str)[6:]}"
|
|
else:
|
|
formatted_date = str(date_str)
|
|
|
|
params = {
|
|
"codes": code,
|
|
"functionpara": {"date_sequence": formatted_date},
|
|
"startdate": "",
|
|
"enddate": "",
|
|
"indipara": [
|
|
{"indicator": "close_price", "indiparams": ["", "0", "BB"]},
|
|
{"indicator": "market_value", "indiparams": ["", "BB"]},
|
|
{"indicator": "pb_latest", "indiparams": ["", "100"]},
|
|
{"indicator": "dividend_yield_ttm_ex_sd", "indiparams": [""]},
|
|
{"indicator": "pe_ttm", "indiparams": ["", "100"]}
|
|
]
|
|
}
|
|
res = self.cli.post("date_sequence", params)
|
|
df = self._parse_ifind_tables(res)
|
|
|
|
if not df.empty:
|
|
df['code'] = code
|
|
df['end_date'] = str(date_str).replace('-', '')
|
|
all_daily_basic.append(df)
|
|
|
|
# Save to DB
|
|
if all_daily_basic:
|
|
combined_df = pd.concat(all_daily_basic, ignore_index=True)
|
|
self._save_df_to_wide_table("ifind_hk_daily_basic", combined_df, ["code", "end_date"])
|
|
|
|
# Use the first (most recent) data for metrics dict
|
|
latest_df = all_daily_basic[0]
|
|
if not latest_df.empty:
|
|
row = latest_df.iloc[0]
|
|
metrics["price"] = float(row.get("close_price") or 0)
|
|
metrics["market_cap"] = float(row.get("market_value") or 0)
|
|
metrics["pe"] = float(row.get("pe_ttm") or 0)
|
|
metrics["pb"] = float(row.get("pb_latest") or 0)
|
|
metrics["dividend_yield"] = float(row.get("dividend_yield_ttm_ex_sd") or 0)
|
|
|
|
return metrics
|
|
|
|
def get_dividends(self, symbol: str, code: str) -> pd.DataFrame:
|
|
if code in self._active_years_cache:
|
|
years_to_fetch = sorted(self._active_years_cache[code], reverse=True)
|
|
else:
|
|
current_year = int(time.strftime("%Y"))
|
|
years_to_fetch = [current_year - i for i in range(5)]
|
|
|
|
results = []
|
|
|
|
for year_int in years_to_fetch:
|
|
year_str = str(year_int)
|
|
params = {
|
|
"codes": code,
|
|
"indipara": [
|
|
{"indicator": "annual_cum_dividend", "indiparams": [year_str, "CNY"]}
|
|
]
|
|
}
|
|
res = self.cli.post("basic_data_service", params)
|
|
df = self._parse_ifind_tables(res)
|
|
|
|
if not df.empty and 'annual_cum_dividend' in df.columns:
|
|
df['code'] = code
|
|
end_date = f"{year_str}1231"
|
|
df['end_date'] = end_date
|
|
self._save_df_to_wide_table("ifind_hk_dividend", df, ["code", "end_date"])
|
|
|
|
val = df['annual_cum_dividend'].iloc[0]
|
|
if pd.notna(val) and val != 0:
|
|
results.append({
|
|
'date_str': end_date,
|
|
'dividends': float(val)
|
|
})
|
|
|
|
if not results:
|
|
return pd.DataFrame()
|
|
|
|
df_div = pd.DataFrame(results)
|
|
return df_div
|
|
|
|
def get_repurchases(self, symbol: str, code: str) -> pd.DataFrame:
|
|
if code in self._active_years_cache:
|
|
years_to_fetch = sorted(self._active_years_cache[code], reverse=True)
|
|
else:
|
|
current_year = int(time.strftime("%Y"))
|
|
years_to_fetch = [current_year - i for i in range(5)]
|
|
|
|
results = []
|
|
|
|
for target_year in years_to_fetch:
|
|
start_date = f"{target_year - 1}-12-31"
|
|
end_date = f"{target_year}-12-31"
|
|
|
|
params = {
|
|
"codes": code,
|
|
"indipara": [
|
|
{"indicator": "repurchase_volume_interval", "indiparams": [start_date, end_date]},
|
|
{"indicator": "repurchase_amount_interval", "indiparams": [start_date, end_date]},
|
|
{"indicator": "repurchase_low_price", "indiparams": [start_date, end_date]},
|
|
{"indicator": "repurchase_high_price", "indiparams": [start_date, end_date]}
|
|
]
|
|
}
|
|
res = self.cli.post("basic_data_service", params)
|
|
df = self._parse_ifind_tables(res)
|
|
|
|
if not df.empty:
|
|
valid_row = False
|
|
for col in df.columns:
|
|
val = df[col].iloc[0]
|
|
if pd.notna(val) and val != 0:
|
|
valid_row = True
|
|
break
|
|
|
|
if valid_row:
|
|
df['code'] = code
|
|
df['end_date'] = end_date.replace('-', '')
|
|
self._save_df_to_wide_table("ifind_hk_repurchase", df, ["code", "end_date"])
|