Fundamental_Analysis/scripts/test-holder-processing.py
xucheng ff7dc0c95a feat(backend): introduce DataManager and multi-provider; analysis orchestration; streaming endpoints; remove legacy tushare_client; enhance logging
feat(frontend): integrate Prisma and reports API/pages

chore(config): add data_sources.yaml; update analysis-config.json

docs: add 2025-11-03 dev log; update user guide

scripts: enhance dev.sh; add tushare_legacy_client

deps: update backend and frontend dependencies
2025-11-03 21:48:08 +08:00

116 lines
4.2 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
测试股东数数据处理逻辑
"""
import asyncio
import sys
import os
import json
from datetime import datetime, timedelta
# 添加项目根目录到Python路径
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'backend'))
from tushare_legacy_client import TushareLegacyClient as TushareClient
async def test_holder_num_processing():
"""测试股东数数据处理逻辑"""
print("🧪 测试股东数数据处理逻辑...")
print("=" * 50)
# 从环境变量或配置文件读取 token
base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
config_path = os.path.join(base_dir, 'config', 'config.json')
token = os.environ.get('TUSHARE_TOKEN')
if not token and os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
config = json.load(f)
token = config.get('data_sources', {}).get('tushare', {}).get('api_key')
if not token:
print("❌ 未找到 Tushare token")
return
ts_code = '000001.SZ'
years = 5
async with TushareClient(token=token) as client:
# 模拟后端处理逻辑
end_date = datetime.now().strftime('%Y%m%d')
start_date = (datetime.now() - timedelta(days=years * 365)).strftime('%Y%m%d')
print(f"📊 查询股票: {ts_code}")
print(f"📅 日期范围: {start_date}{end_date}")
data_rows = await client.query(
api_name='stk_holdernumber',
params={'ts_code': ts_code, 'start_date': start_date, 'end_date': end_date, 'limit': 5000}
)
print(f'\n✅ 获取到 {len(data_rows)} 条原始数据')
if data_rows:
print('\n原始数据示例前3条:')
for i, row in enumerate(data_rows[:3]):
print(f"{i+1}条: {json.dumps(row, indent=4, ensure_ascii=False)}")
# 模拟后端处理逻辑
series = {}
tmp = {}
date_field = 'end_date'
print('\n📝 开始处理数据...')
for row in data_rows:
date_val = row.get(date_field)
if not date_val:
print(f" ⚠️ 跳过无日期字段的行: {row}")
continue
year = str(date_val)[:4]
month = int(str(date_val)[4:6]) if len(str(date_val)) >= 6 else None
existing = tmp.get(year)
if existing is None or str(row.get(date_field)) > str(existing.get(date_field)):
tmp[year] = row
tmp[year]['_month'] = month
print(f'\n✅ 处理后共有 {len(tmp)} 个年份的数据')
print('按年份分组的数据:')
for year, row in sorted(tmp.items(), key=lambda x: x[0], reverse=True):
print(f" {year}: holder_num={row.get('holder_num')}, end_date={row.get('end_date')}")
# 提取 holder_num 字段
key = 'holder_num'
for year, row in tmp.items():
month = row.get('_month')
value = row.get(key)
arr = series.setdefault(key, [])
arr.append({'year': year, 'value': value, 'month': month})
print('\n📊 提取后的 series 数据:')
print(json.dumps(series, indent=2, ensure_ascii=False))
# 排序(模拟后端逻辑)
for key, arr in series.items():
uniq = {item['year']: item for item in arr}
arr_sorted_desc = sorted(uniq.values(), key=lambda x: x['year'], reverse=True)
arr_limited = arr_sorted_desc[:years]
arr_sorted = sorted(arr_limited, key=lambda x: x['year']) # ascending
series[key] = arr_sorted
print('\n✅ 最终排序后的数据(按年份升序):')
print(json.dumps(series, indent=2, ensure_ascii=False))
# 验证年份格式
print('\n🔍 验证年份格式:')
for item in series.get('holder_num', []):
year_str = item.get('year')
print(f" 年份: '{year_str}' (类型: {type(year_str).__name__}, 长度: {len(str(year_str))})")
if __name__ == "__main__":
asyncio.run(test_holder_num_processing())