111 lines
4.3 KiB
Python
111 lines
4.3 KiB
Python
"""
|
||
测试脚本:检查是否能获取 300750.SZ 的 tax_to_ebt 数据
|
||
"""
|
||
import asyncio
|
||
import sys
|
||
import os
|
||
import json
|
||
|
||
# 添加 backend 目录到 Python 路径
|
||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "backend"))
|
||
|
||
from app.services.tushare_client import TushareClient
|
||
|
||
async def test_tax_to_ebt():
|
||
# 读取配置获取 token
|
||
config_path = os.path.join(os.path.dirname(__file__), "..", "config", "config.json")
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
config = json.load(f)
|
||
|
||
token = config.get("data_sources", {}).get("tushare", {}).get("api_key")
|
||
if not token:
|
||
print("错误:未找到 Tushare token")
|
||
return
|
||
|
||
client = TushareClient(token=token)
|
||
ts_code = "300750.SZ"
|
||
|
||
try:
|
||
print(f"正在查询 {ts_code} 的财务指标数据...")
|
||
|
||
# 先尝试不指定 fields,获取所有字段
|
||
print("\n=== 测试1: 不指定 fields 参数 ===")
|
||
data = await client.query(
|
||
api_name="fina_indicator",
|
||
params={"ts_code": ts_code, "limit": 10}
|
||
)
|
||
|
||
# 再尝试明确指定 fields,包含 tax_to_ebt
|
||
print("\n=== 测试2: 明确指定 fields 参数(包含 tax_to_ebt) ===")
|
||
data_with_fields = await client.query(
|
||
api_name="fina_indicator",
|
||
params={"ts_code": ts_code, "limit": 10},
|
||
fields="ts_code,ann_date,end_date,tax_to_ebt,roe,roa"
|
||
)
|
||
|
||
print(f"\n获取到 {len(data)} 条记录")
|
||
|
||
if data:
|
||
# 检查第一条记录的字段
|
||
first_record = data[0]
|
||
print(f"\n第一条记录的字段:")
|
||
print(f" ts_code: {first_record.get('ts_code')}")
|
||
print(f" end_date: {first_record.get('end_date')}")
|
||
print(f" ann_date: {first_record.get('ann_date')}")
|
||
|
||
# 检查是否有 tax_to_ebt 字段
|
||
if 'tax_to_ebt' in first_record:
|
||
tax_value = first_record.get('tax_to_ebt')
|
||
print(f"\n✅ 找到 tax_to_ebt 字段!")
|
||
print(f" tax_to_ebt 值: {tax_value}")
|
||
print(f" tax_to_ebt 类型: {type(tax_value)}")
|
||
else:
|
||
print(f"\n❌ 未找到 tax_to_ebt 字段")
|
||
print(f"可用字段列表: {list(first_record.keys())[:20]}...") # 只显示前20个字段
|
||
|
||
# 打印所有包含 tax 的字段
|
||
tax_fields = [k for k in first_record.keys() if 'tax' in k.lower()]
|
||
if tax_fields:
|
||
print(f"\n包含 'tax' 的字段:")
|
||
for field in tax_fields:
|
||
print(f" {field}: {first_record.get(field)}")
|
||
|
||
# 显示最近几条记录的 tax_to_ebt 值
|
||
print(f"\n最近几条记录的 tax_to_ebt 值(测试1):")
|
||
for i, record in enumerate(data[:5]):
|
||
end_date = record.get('end_date', 'N/A')
|
||
tax_value = record.get('tax_to_ebt', 'N/A')
|
||
print(f" {i+1}. {end_date}: tax_to_ebt = {tax_value}")
|
||
else:
|
||
print("❌ 未获取到任何数据(测试1)")
|
||
|
||
# 测试2:检查明确指定 fields 的结果
|
||
if data_with_fields:
|
||
print(f"\n测试2获取到 {len(data_with_fields)} 条记录")
|
||
first_record2 = data_with_fields[0]
|
||
if 'tax_to_ebt' in first_record2:
|
||
print(f"✅ 测试2找到 tax_to_ebt 字段!")
|
||
print(f" tax_to_ebt 值: {first_record2.get('tax_to_ebt')}")
|
||
else:
|
||
print(f"❌ 测试2也未找到 tax_to_ebt 字段")
|
||
print(f"可用字段: {list(first_record2.keys())}")
|
||
|
||
print(f"\n最近几条记录的 tax_to_ebt 值(测试2):")
|
||
for i, record in enumerate(data_with_fields[:5]):
|
||
end_date = record.get('end_date', 'N/A')
|
||
tax_value = record.get('tax_to_ebt', 'N/A')
|
||
print(f" {i+1}. {end_date}: tax_to_ebt = {tax_value}")
|
||
else:
|
||
print("❌ 未获取到任何数据(测试2)")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 查询出错: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
finally:
|
||
await client.aclose()
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(test_tax_to_ebt())
|
||
|