#!/usr/bin/env python3 """ 测试股东数数据处理逻辑 """ import asyncio import sys import os import json from datetime import datetime, timedelta # 添加项目根目录到Python路径 sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'backend')) from tushare_legacy_client import TushareLegacyClient as TushareClient async def test_holder_num_processing(): """测试股东数数据处理逻辑""" print("🧪 测试股东数数据处理逻辑...") print("=" * 50) # 从环境变量或配置文件读取 token base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) config_path = os.path.join(base_dir, 'config', 'config.json') token = os.environ.get('TUSHARE_TOKEN') if not token and os.path.exists(config_path): with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) token = config.get('data_sources', {}).get('tushare', {}).get('api_key') if not token: print("❌ 未找到 Tushare token") return ts_code = '000001.SZ' years = 5 async with TushareClient(token=token) as client: # 模拟后端处理逻辑 end_date = datetime.now().strftime('%Y%m%d') start_date = (datetime.now() - timedelta(days=years * 365)).strftime('%Y%m%d') print(f"📊 查询股票: {ts_code}") print(f"📅 日期范围: {start_date} 到 {end_date}") data_rows = await client.query( api_name='stk_holdernumber', params={'ts_code': ts_code, 'start_date': start_date, 'end_date': end_date, 'limit': 5000} ) print(f'\n✅ 获取到 {len(data_rows)} 条原始数据') if data_rows: print('\n原始数据示例(前3条):') for i, row in enumerate(data_rows[:3]): print(f" 第{i+1}条: {json.dumps(row, indent=4, ensure_ascii=False)}") # 模拟后端处理逻辑 series = {} tmp = {} date_field = 'end_date' print('\n📝 开始处理数据...') for row in data_rows: date_val = row.get(date_field) if not date_val: print(f" ⚠️ 跳过无日期字段的行: {row}") continue year = str(date_val)[:4] month = int(str(date_val)[4:6]) if len(str(date_val)) >= 6 else None existing = tmp.get(year) if existing is None or str(row.get(date_field)) > str(existing.get(date_field)): tmp[year] = row tmp[year]['_month'] = month print(f'\n✅ 处理后共有 {len(tmp)} 个年份的数据') print('按年份分组的数据:') for year, row in sorted(tmp.items(), key=lambda x: x[0], reverse=True): print(f" {year}: holder_num={row.get('holder_num')}, end_date={row.get('end_date')}") # 提取 holder_num 字段 key = 'holder_num' for year, row in tmp.items(): month = row.get('_month') value = row.get(key) arr = series.setdefault(key, []) arr.append({'year': year, 'value': value, 'month': month}) print('\n📊 提取后的 series 数据:') print(json.dumps(series, indent=2, ensure_ascii=False)) # 排序(模拟后端逻辑) for key, arr in series.items(): uniq = {item['year']: item for item in arr} arr_sorted_desc = sorted(uniq.values(), key=lambda x: x['year'], reverse=True) arr_limited = arr_sorted_desc[:years] arr_sorted = sorted(arr_limited, key=lambda x: x['year']) # ascending series[key] = arr_sorted print('\n✅ 最终排序后的数据(按年份升序):') print(json.dumps(series, indent=2, ensure_ascii=False)) # 验证年份格式 print('\n🔍 验证年份格式:') for item in series.get('holder_num', []): year_str = item.get('year') print(f" 年份: '{year_str}' (类型: {type(year_str).__name__}, 长度: {len(str(year_str))})") if __name__ == "__main__": asyncio.run(test_holder_num_processing())