Fundamental_Analysis/archive/python/backend/app/data_providers/ifind.py
Lv, Qi a6cca48fed chore(cleanup): remove redundant data-distance-service stub tests
- Covered by data-persistence-service tests (db/api).
- No references or compose entries.
2025-11-16 20:52:26 +08:00

132 lines
5.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from .base import BaseDataProvider
from typing import Any, Dict, List, Optional
import pandas as pd
from datetime import datetime
# 假设 iFinDPy 库已安装在环境中
# 重要提示: 用户需要根据官方文档手动安装 iFinDPy
try:
from iFinDPy import THS_iFinDLogin, THS_BD, THS_HQ
except ImportError:
print("错误: iFinDPy 模块未找到。请确保已按照同花顺官方指引完成安装。")
# 定义虚拟函数以避免在未安装时程序崩溃
def THS_iFinDLogin(*args, **kwargs): return -1
def THS_BD(*args, **kwargs): return pd.DataFrame()
def THS_HQ(*args, **kwargs): return pd.DataFrame()
class TonghsProvider(BaseDataProvider):
_is_logged_in = False
def __init__(self, token: Optional[str] = None):
# 使用从 iFinD 用户中心获取的 Refresh Token 进行登录
if not TonghsProvider._is_logged_in:
if not token:
raise ValueError("同花顺 iFinDPy Refresh Token 未在配置中提供。")
# 调用登录函数,直接传入 token
# 注意: 具体的关键字参数名可能需要根据 iFinDPy 的实际文档确认,这里假设为 'token' 或直接作为第一个参数
login_result = THS_iFinDLogin(token)
if login_result == 0:
print("同花顺 iFinDPy 登录成功。")
TonghsProvider._is_logged_in = True
else:
print(f"同花顺 iFinDPy 登录失败,错误码: {login_result}")
raise ConnectionError("无法登录到同花顺 iFinDPy 服务,请检查您的 Refresh Token 是否正确。")
async def get_stock_basic(self, stock_code: str) -> Optional[Dict[str, Any]]:
try:
# TODO: 请用户确认用于获取公司基本信息的指标 (indicators)
indicators = "ths_stock_short_name_stock;ths_listed_market_stock;ths_industry_stock;ths_ipo_date_stock"
data = THS_BD(stock_code, indicators, "")
if data.empty:
return None
# --- 数据归一化 ---
# iFinDPy 返回的数据通常是 DataFrame我们需要将其转换为字典
info = data.iloc[0].to_dict()
return {
"ts_code": stock_code,
"name": info.get("ths_stock_short_name_stock"),
"area": info.get("ths_listed_market_stock"),
"industry": info.get("ths_industry_stock"),
"list_date": info.get("ths_ipo_date_stock"),
}
except Exception as e:
print(f"同花顺 iFinDPy get_stock_basic 执行失败, 股票代码 {stock_code}: {e}")
return None
async def get_daily_price(self, stock_code: str, start_date: str, end_date: str) -> List[Dict[str, Any]]:
try:
# TODO: 请用户确认用于获取日线行情的指标
indicators = "open;high;low;close;volume"
# iFinDPy 的日期格式通常是 YYYY-MM-DD
date_range = f"{start_date};{end_date}"
data = THS_HQ(stock_code, indicators, date_range)
if data.empty:
return []
# --- 数据归一化 ---
data = data.reset_index()
data.rename(columns={
"time": "trade_date",
"open": "open",
"high": "high",
"low": "low",
"close": "close",
"volume": "vol"
}, inplace=True)
return data.to_dict('records')
except Exception as e:
print(f"同花顺 iFinDPy get_daily_price 执行失败, 股票代码 {stock_code}: {e}")
return []
async def get_financial_statements(self, stock_code: str, report_dates: List[str]) -> List[Dict[str, Any]]:
try:
# TODO: 请用户确认获取财务报表的指标
# 这可能需要多次调用 THS_BD 并合并结果
# 示例:一次性获取多个报告期的数据
# 将 report_dates 转换为 iFinDPy 接受的格式,例如 "2022-12-31;2021-12-31"
dates_param = ";".join(report_dates)
# 需要的指标
income_indicators = "ths_np_stock" # 净利润
bs_indicators = "ths_total_assets_stock;ths_total_liab_stock" # 总资产;总负债
revenue_indicators = "ths_revenue_stock" # 营业收入
# 获取数据
income_data = THS_BD(stock_code, income_indicators, f"reportDate={dates_param}")
bs_data = THS_BD(stock_code, bs_indicators, f"reportDate={dates_param}")
revenue_data = THS_BD(stock_code, revenue_indicators, f"reportDate={dates_param}")
# 合并数据
financials_df = pd.concat([income_data, bs_data, revenue_data], axis=1)
financials_df = financials_df.loc[:,~financials_df.columns.duplicated()]
financials_df = financials_df.reset_index().rename(columns={"index": "end_date"})
# --- 数据归一化 ---
financials_df.rename(columns={
"ths_revenue_stock": "revenue",
"ths_np_stock": "net_income",
"ths_total_assets_stock": "total_assets",
"ths_total_liab_stock": "total_liabilities",
}, inplace=True)
financials_df["ts_code"] = stock_code
return financials_df.to_dict('records')
except Exception as e:
print(f"同花顺 iFinDPy get_financial_statements 执行失败, 股票代码 {stock_code}: {e}")
return []
async def get_financial_statement(self, stock_code: str, report_date: str) -> Optional[Dict[str, Any]]:
results = await self.get_financial_statements(stock_code, [report_date])
return results[0] if results else None