FA3-Datafetch/src/fetchers/jp_fetcher.py

516 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import os
import time
from .base import DataFetcher
from .ifind_client import IFindClient
from storage.file_io import DataStorage
class JpFetcher(DataFetcher):
def __init__(self, api_key: str):
# api_key is the iFinD Refresh Token
super().__init__(api_key)
self.cli = IFindClient(refresh_token=api_key)
self.storage = DataStorage()
self._basic_info_cache = {}
def _get_ifind_code(self, symbol: str) -> str:
"""保持逻辑一致性,如果是纯数字则补齐后缀 .T否则直接传"""
if symbol.isdigit():
return f"{symbol}.T"
return symbol
def _fetch_basic_info(self, symbol: str) -> dict:
"""获取公司的基本信息:中文名称、会计年结日、上市日期"""
code = self._get_ifind_code(symbol)
if code in self._basic_info_cache:
return self._basic_info_cache[code]
params = {
"codes": code,
"indipara": [
{"indicator": "corp_cn_name", "indiparams": []},
{"indicator": "accounting_date", "indiparams": []},
{"indicator": "ipo_date", "indiparams": []}
]
}
# print(f"iFinD API Request: endpoint=basic_data_service, params={params}")
res = self.cli.post("basic_data_service", params)
df = self._parse_ifind_tables(res)
if not df.empty:
self._save_raw_data(df, symbol, "basic_info_raw")
info = {
"name": "",
"accounting_date": "1231", # 默认 12-31
"ipo_date": ""
}
if not df.empty:
row = df.iloc[0]
info["name"] = str(row.get("corp_cn_name", ""))
# accounting_date 通常返回类似 "03-31" 或 "1231"
acc_date = str(row.get("accounting_date", "1231")).replace("-", "").replace("/", "")
# 好像是ifind的API有问题明明财报是0331但如果去读20240331就是空数据
# if acc_date:
# info["accounting_date"] = acc_date
info["ipo_date"] = str(row.get("ipo_date", "")).replace("-", "").replace("/", "")
self._basic_info_cache[code] = info
return info
def _save_raw_data(self, data: any, symbol: str, name: str):
if data is None:
return
# 如果是字典API 响应),直接保存
if isinstance(data, dict):
df = pd.DataFrame([data]) # 包装成单行 DF 或简单处理
else:
df = data
self.storage.save_data(df, 'JP', symbol, f"raw_{name}")
def _parse_ifind_tables(self, res: dict) -> pd.DataFrame:
"""通用解析 iFinD 返回结果的 tables 结构为 DataFrame"""
if not res:
return pd.DataFrame()
if res.get("errorcode") != 0:
print(f"iFinD API Error: {res.get('errmsg')} (code: {res.get('errorcode')})")
return pd.DataFrame()
tables = res.get("tables", [])
if not tables:
print("iFinD API Warning: No tables found in response.")
return pd.DataFrame()
# 提取第一个 table
table_info = tables[0]
table_data = table_info.get("table", {})
times = table_info.get("time", [])
if not table_data:
return pd.DataFrame()
# Ensure all values are lists to avoid pd.DataFrame ValueError with scalars
processed_table_data = {}
for k, v in table_data.items():
if not isinstance(v, list):
processed_table_data[k] = [v]
else:
processed_table_data[k] = v
df = pd.DataFrame(processed_table_data)
if times and len(times) == len(df):
df['end_date'] = [str(t).replace('-', '').replace('/', '').split(' ')[0] for t in times]
elif times and len(df) == 1:
df['end_date'] = str(times[0]).replace('-', '').replace('/', '').split(' ')[0]
# If still no end_date, look for it in columns
if 'end_date' not in df.columns:
for col in ['time', 'date', 'trade_date', 'REPORT_DATE']:
if col in df.columns:
df['end_date'] = df[col].astype(str).str.replace('-', '').str.replace('/', '').str.split(' ').str[0]
break
return df
def _filter_data(self, df: pd.DataFrame) -> pd.DataFrame:
if df.empty or 'end_date' not in df.columns:
return df
df = df.sort_values(by='end_date', ascending=False)
df = df.drop_duplicates(subset=['end_date'], keep='first')
if df.empty:
return df
latest_record = df.iloc[[0]]
try:
latest_date_str = str(latest_record['end_date'].values[0])
# Handle YoY logic: YYYYMMDD -> (YYYY-1)MMDD
last_year_date_str = str(int(latest_date_str) - 10000)
comparable_record = df[df['end_date'].astype(str) == last_year_date_str]
except:
comparable_record = pd.DataFrame()
# 对齐 CN 逻辑,日本公司虽然多是 0331 截止
is_annual = df['end_date'].astype(str).str.endswith('0331') | df['end_date'].astype(str).str.endswith('1231')
annual_records = df[is_annual]
combined = pd.concat([latest_record, comparable_record, annual_records])
combined = combined.drop_duplicates(subset=['end_date'])
combined = combined.sort_values(by='end_date', ascending=False)
return combined
def _fetch_financial_data_annual(self, symbol: str, indicator_configs: list) -> pd.DataFrame:
"""通用获取历年会计年结日的财务数据 (CNY 结算)"""
code = self._get_ifind_code(symbol)
basic_info = self._fetch_basic_info(symbol)
acc_date = basic_info.get("accounting_date", "1231")
current_year = int(time.strftime("%Y"))
# 1. First, determine the most recent valid year by trying backwards from current year
last_valid_year = None
# Try up to 3 years back to find the latest available report
for offset in range(3):
test_year = current_year - offset
test_date = f"{test_year}{acc_date}"
# Use the first indicator to test availability
first_indicator = indicator_configs[0]
params = {
"codes": code,
"indipara": [
{"indicator": first_indicator["indicator"], "indiparams": [test_date, first_indicator.get("type", "1"), "CNY"]}
]
}
res = self.cli.post("basic_data_service", params)
df = self._parse_ifind_tables(res)
if not df.empty:
# Check for non-null values
valid_val = df.iloc[0, 0] if not df.empty and df.shape[1] > 0 else None
if pd.notna(valid_val) and valid_val != 0:
last_valid_year = test_year
break
if last_valid_year is None:
last_valid_year = current_year
all_dfs = []
# 2. Fetch 5 years starting from the last valid year
for i in range(5):
target_year = last_valid_year - i
target_date = f"{target_year}{acc_date}"
params = {
"codes": code,
"indipara": [
{"indicator": item["indicator"], "indiparams": [target_date, item.get("type", "1"), "CNY"]}
for item in indicator_configs
]
}
# print(f"iFinD API Request: endpoint=basic_data_service, params={params}")
res = self.cli.post("basic_data_service", params)
df = self._parse_ifind_tables(res)
if not df.empty:
# 强制设置 end_date 以防 API 返回不一致
df['end_date'] = target_date
all_dfs.append(df)
if not all_dfs:
return pd.DataFrame()
# Remove empty or Check for all-NA columns DataFrames (Fixing FutureWarning)
all_dfs = [d for d in all_dfs if not d.empty and not d.isna().all().all()]
if not all_dfs:
return pd.DataFrame()
return pd.concat(all_dfs, ignore_index=True)
def get_income_statement(self, symbol: str) -> pd.DataFrame:
indicators = [
{"indicator": "revenue_oas"},
{"indicator": "gross_profit_oas"},
{"indicator": "sga_expenses_oas"},
{"indicator": "selling_marketing_expenses_oas"},
{"indicator": "ga_expenses_oas"},
{"indicator": "rd_expenses_oas"},
{"indicator": "income_tax_expense_oas"},
{"indicator": "net_income_attri_to_common_sh_oas"},
{"indicator": "operating_income_oas"}
]
df = self._fetch_financial_data_annual(symbol, indicators)
if df.empty: return df
self._save_raw_data(df, symbol, "income_statement_raw")
rename_map = {
'revenue_oas': 'revenue',
'gross_profit_oas': 'gross_profit',
'sga_expenses_oas': 'sga_exp',
'selling_marketing_expenses_oas': 'selling_marketing_exp',
'ga_expenses_oas': 'ga_exp',
'rd_expenses_oas': 'rd_exp',
'income_tax_expense_oas': 'income_tax',
'net_income_attri_to_common_sh_oas': 'net_income',
'operating_income_oas': 'operating_profit'
}
df_filtered = df.rename(columns=rename_map)
# 数值转换
for col in df_filtered.columns:
if col not in ['date', 'end_date']:
df_filtered[col] = pd.to_numeric(df_filtered[col], errors='coerce')
return self._filter_data(df_filtered)
def get_balance_sheet(self, symbol: str) -> pd.DataFrame:
indicators = [
{"indicator": "cash_equi_short_term_inve_oas"},
{"indicator": "accou_and_notes_recei_oas"},
{"indicator": "inventories_oas"},
{"indicator": "ppe_net_oas"},
{"indicator": "long_term_inv_and_receiv_oas"},
{"indicator": "goodwill_and_intasset_oas"},
{"indicator": "short_term_debt_oas"},
{"indicator": "short_term_borrowings_oas"},
{"indicator": "account_and_note_payable_oas"},
{"indicator": "contra_liabilities_current_oas"},
{"indicator": "advance_from_cust_current_oas"},
{"indicator": "defer_revenue_current_oas"},
{"indicator": "long_term_debt_oas"},
{"indicator": "long_term_borrowings_oas"},
{"indicator": "total_assets_oas"},
{"indicator": "equity_attri_to_companyowner_oas"},
{"indicator": "prepaid_expenses_current_oas"}
]
df = self._fetch_financial_data_annual(symbol, indicators)
if df.empty: return df
self._save_raw_data(df, symbol, "balance_sheet_raw")
rename_map = {
'cash_equi_short_term_inve_oas': 'cash',
'accou_and_notes_recei_oas': 'receivables',
'inventories_oas': 'inventory',
'ppe_net_oas': 'fixed_assets',
'long_term_inv_and_receiv_oas': 'long_term_investments',
'goodwill_and_intasset_oas': 'goodwill',
'short_term_debt_oas': 'short_term_debt',
'short_term_borrowings_oas': 'short_term_borrowings',
'account_and_note_payable_oas': 'accounts_payable',
'contra_liabilities_current_oas': 'contract_liabilities',
'advance_from_cust_current_oas': 'advances_from_customers',
'defer_revenue_current_oas': 'deferred_revenue',
'long_term_debt_oas': 'long_term_debt',
'long_term_borrowings_oas': 'long_term_borrowings',
'total_assets_oas': 'total_assets',
'equity_attri_to_companyowner_oas': 'total_equity',
'prepaid_expenses_current_oas': 'prepayment'
}
df_filtered = df.rename(columns=rename_map)
# 如果没有负债合计,用资产减权益
if 'total_liabilities' not in df_filtered.columns or df_filtered['total_liabilities'].isnull().all():
if 'total_assets' in df_filtered.columns and 'total_equity' in df_filtered.columns:
df_filtered['total_liabilities'] = df_filtered['total_assets'] - df_filtered['total_equity']
for col in df_filtered.columns:
if col not in ['date', 'end_date']:
df_filtered[col] = pd.to_numeric(df_filtered[col], errors='coerce')
return self._filter_data(df_filtered)
def get_cash_flow(self, symbol: str) -> pd.DataFrame:
indicators = [
{"indicator": "net_cash_flows_from_oa_oas"},
{"indicator": "purchase_of_ppe_and_ia_oas"},
{"indicator": "dividends_paid_oas"}
]
df = self._fetch_financial_data_annual(symbol, indicators)
if df.empty: return df
self._save_raw_data(df, symbol, "cash_flow_raw")
rename_map = {
'net_cash_flows_from_oa_oas': 'ocf',
'purchase_of_ppe_and_ia_oas': 'capex',
'dividends_paid_oas': 'dividends'
}
df_filtered = df.rename(columns=rename_map)
for col in df_filtered.columns:
if col not in ['date', 'end_date']:
df_filtered[col] = pd.to_numeric(df_filtered[col], errors='coerce')
if 'capex' in df_filtered.columns:
df_filtered['capex'] = df_filtered['capex'].abs()
return self._filter_data(df_filtered)
def get_market_metrics(self, symbol: str) -> dict:
"""获取公司基本信息(名称、上市日期等静态数据)"""
basic_info = self._fetch_basic_info(symbol)
metrics = {
"name": basic_info.get("name", ""),
"list_date": basic_info.get("ipo_date", "")
}
return metrics
def get_historical_metrics(self, symbol: str, dates: list) -> pd.DataFrame:
"""获取历史日期的收盘价和市值 (通过 cmd_history_quotation)"""
code = self._get_ifind_code(symbol)
if not dates: return pd.DataFrame()
results = []
# get_historical_metrics里面不要拿所有日期数据了而是一个一个数据拿
for d in dates:
d_str = str(d).replace('-', '').replace('/', '')
fmt_d = f"{d_str[:4]}-{d_str[4:6]}-{d_str[6:]}" if len(d_str) == 8 else d_str
params = {
"codes": code,
"startdate": fmt_d,
"enddate": fmt_d,
"functionpara": {"Interval": "D", "Days": "Alldays", "Fill": "Previous"},
"indipara": [
{"indicator": "pre_close", "indiparams": ["", "0", "CNY"]},
{"indicator": "market_value", "indiparams": ["", "CNY"]}
]
}
# print(f"iFinD API Request: endpoint=date_sequence, params={params}")
res = self.cli.post("date_sequence", params)
df_seq = self._parse_ifind_tables(res)
metrics = {'date_str': d_str, 'PE': 0.0, 'PB': 0.0, 'MarketCap': 0.0, 'Price': 0.0}
if not df_seq.empty:
# 找到最接近该日期且不晚于该日期的记录
match = df_seq[df_seq['end_date'] <= d_str].tail(1) if 'end_date' in df_seq.columns else df_seq.tail(1)
if not match.empty:
if 'pre_close' in match.columns:
metrics['Price'] = float(match['pre_close'].iloc[0] or 0.0)
if 'market_value' in match.columns:
metrics['MarketCap'] = float(match['market_value'].iloc[0] or 0.0)
results.append(metrics)
df_hist = pd.DataFrame(results)
self._save_raw_data(df_hist, symbol, "historical_metrics_raw")
return df_hist
def get_dividends(self, symbol: str) -> pd.DataFrame:
"""获取历年年度累计分红记录 (逐年获取)"""
code = self._get_ifind_code(symbol)
basic_info = self._fetch_basic_info(symbol)
acc_date = basic_info.get("accounting_date", "1231")
current_year = int(time.strftime("%Y"))
results = []
# 获取最近 5 年的数据
for i in range(5):
year_str = str(current_year - i)
params = {
"codes": code,
"indipara": [
{"indicator": "annual_cum_dividend", "indiparams": [year_str, "CNY"]}
]
}
# print(f"iFinD API Request: endpoint=basic_data_service, params={params}")
res = self.cli.post("basic_data_service", params)
df = self._parse_ifind_tables(res)
if not df.empty and 'annual_cum_dividend' in df.columns:
val = df['annual_cum_dividend'].iloc[0]
if pd.notna(val) and val != 0:
results.append({
'date_str': f"{year_str}{acc_date}",
'dividends': float(val)
})
if not results:
return pd.DataFrame()
df_div = pd.DataFrame(results)
self._save_raw_data(df_div, symbol, "dividends_raw")
return df_div
def get_repurchases(self, symbol: str) -> pd.DataFrame:
"""获取历年年度回购记录 (从 repur_num_new 获取)"""
code = self._get_ifind_code(symbol)
basic_info = self._fetch_basic_info(symbol)
acc_date = basic_info.get("accounting_date", "1231")
mm = acc_date[:2]
dd = acc_date[2:]
# 为了对应日期格式 YYYY-MM-DD
fmt_mm_dd = f"{mm}-{dd}"
current_year = int(time.strftime("%Y"))
results = []
# 获取最近 5 年的数据
for i in range(5):
target_year = current_year - i
start_date = f"{target_year - 1}-{fmt_mm_dd}"
end_date = f"{target_year}-{fmt_mm_dd}"
params = {
"codes": code,
"indipara": [
{"indicator": "repur_num_new", "indiparams": [start_date, end_date, "1"]}
]
}
# print(f"iFinD API Request: endpoint=basic_data_service, params={params}")
res = self.cli.post("basic_data_service", params)
df = self._parse_ifind_tables(res)
if not df.empty and 'repur_num_new' in df.columns:
val = df['repur_num_new'].iloc[0]
if pd.notna(val) and val != 0:
results.append({
'date_str': f"{target_year}{acc_date}",
'repurchases': float(val)
})
if not results:
return pd.DataFrame()
df_repur = pd.DataFrame(results)
self._save_raw_data(df_repur, symbol, "repurchases_raw")
return df_repur
def get_employee_count(self, symbol: str) -> pd.DataFrame:
"""获取历年员工人数"""
code = self._get_ifind_code(symbol)
basic_info = self._fetch_basic_info(symbol)
acc_date = basic_info.get("accounting_date", "1231")
mm = acc_date[:2]
dd = acc_date[2:]
current_year = int(time.strftime("%Y"))
results = []
# 获取最近 5 年的数据
for i in range(5):
target_year = current_year - i
target_date = f"{target_year}-{mm}-{dd}"
params = {
"codes": code,
"indipara": [
{"indicator": "staff_num", "indiparams": [target_date]}
]
}
res = self.cli.post("basic_data_service", params)
df = self._parse_ifind_tables(res)
if not df.empty and 'staff_num' in df.columns:
val = df['staff_num'].iloc[0]
if pd.notna(val) and val != 0:
results.append({
'date_str': f"{target_year}{acc_date}",
'employee_count': float(val)
})
if not results:
return pd.DataFrame()
df_emp = pd.DataFrame(results)
self._save_raw_data(df_emp, symbol, "employee_count_raw")
return df_emp