""" 测试脚本:检查是否能获取 300750.SZ 的 tax_to_ebt 数据 """ import asyncio import sys import os import json # 添加 backend 目录到 Python 路径 sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "backend")) from app.services.tushare_client import TushareClient async def test_tax_to_ebt(): # 读取配置获取 token config_path = os.path.join(os.path.dirname(__file__), "..", "config", "config.json") with open(config_path, "r", encoding="utf-8") as f: config = json.load(f) token = config.get("data_sources", {}).get("tushare", {}).get("api_key") if not token: print("错误:未找到 Tushare token") return client = TushareClient(token=token) ts_code = "300750.SZ" try: print(f"正在查询 {ts_code} 的财务指标数据...") # 先尝试不指定 fields,获取所有字段 print("\n=== 测试1: 不指定 fields 参数 ===") data = await client.query( api_name="fina_indicator", params={"ts_code": ts_code, "limit": 10} ) # 再尝试明确指定 fields,包含 tax_to_ebt print("\n=== 测试2: 明确指定 fields 参数(包含 tax_to_ebt) ===") data_with_fields = await client.query( api_name="fina_indicator", params={"ts_code": ts_code, "limit": 10}, fields="ts_code,ann_date,end_date,tax_to_ebt,roe,roa" ) print(f"\n获取到 {len(data)} 条记录") if data: # 检查第一条记录的字段 first_record = data[0] print(f"\n第一条记录的字段:") print(f" ts_code: {first_record.get('ts_code')}") print(f" end_date: {first_record.get('end_date')}") print(f" ann_date: {first_record.get('ann_date')}") # 检查是否有 tax_to_ebt 字段 if 'tax_to_ebt' in first_record: tax_value = first_record.get('tax_to_ebt') print(f"\n✅ 找到 tax_to_ebt 字段!") print(f" tax_to_ebt 值: {tax_value}") print(f" tax_to_ebt 类型: {type(tax_value)}") else: print(f"\n❌ 未找到 tax_to_ebt 字段") print(f"可用字段列表: {list(first_record.keys())[:20]}...") # 只显示前20个字段 # 打印所有包含 tax 的字段 tax_fields = [k for k in first_record.keys() if 'tax' in k.lower()] if tax_fields: print(f"\n包含 'tax' 的字段:") for field in tax_fields: print(f" {field}: {first_record.get(field)}") # 显示最近几条记录的 tax_to_ebt 值 print(f"\n最近几条记录的 tax_to_ebt 值(测试1):") for i, record in enumerate(data[:5]): end_date = record.get('end_date', 'N/A') tax_value = record.get('tax_to_ebt', 'N/A') print(f" {i+1}. {end_date}: tax_to_ebt = {tax_value}") else: print("❌ 未获取到任何数据(测试1)") # 测试2:检查明确指定 fields 的结果 if data_with_fields: print(f"\n测试2获取到 {len(data_with_fields)} 条记录") first_record2 = data_with_fields[0] if 'tax_to_ebt' in first_record2: print(f"✅ 测试2找到 tax_to_ebt 字段!") print(f" tax_to_ebt 值: {first_record2.get('tax_to_ebt')}") else: print(f"❌ 测试2也未找到 tax_to_ebt 字段") print(f"可用字段: {list(first_record2.keys())}") print(f"\n最近几条记录的 tax_to_ebt 值(测试2):") for i, record in enumerate(data_with_fields[:5]): end_date = record.get('end_date', 'N/A') tax_value = record.get('tax_to_ebt', 'N/A') print(f" {i+1}. {end_date}: tax_to_ebt = {tax_value}") else: print("❌ 未获取到任何数据(测试2)") except Exception as e: print(f"❌ 查询出错: {e}") import traceback traceback.print_exc() finally: await client.aclose() if __name__ == "__main__": asyncio.run(test_tax_to_ebt())