import os
import sys
import argparse
import subprocess
import shutil
import yaml
import re
import time
from datetime import datetime
from dotenv import load_dotenv
from google import genai
from google.genai import types
import markdown
def load_config():
load_dotenv(override=True)
api_key = os.getenv("GEMINI_API_KEY") or os.getenv("OPENAI_API_KEY")
model_name = os.getenv("LLM_MODEL", "gemini-2.0-flash-exp")
return api_key, model_name
def run_main_script(market, symbol):
print(f"Running main.py for {market} {symbol}...")
try:
# Assuming main.py is in current directory
# Using sys.executable to ensure the same python environment
cmd = [sys.executable, "main.py", market, symbol]
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
print(f"Error running main.py: {e}")
sys.exit(1)
def get_company_info(market, symbol):
# Path: data/MARKET/SYMBOL/report.md
base_dir = os.path.join("data", market)
symbol_dir = os.path.join(base_dir, symbol)
# Try exact match first
if not os.path.exists(symbol_dir):
# Try to find a directory that starts with the symbol
candidates = [d for d in os.listdir(base_dir) if d.startswith(symbol) and os.path.isdir(os.path.join(base_dir, d))]
if candidates:
# Use the first match (e.g., 688334.SH)
symbol_dir = os.path.join(base_dir, candidates[0])
print(f"Redirecting to found directory: {candidates[0]}")
report_path = os.path.join(symbol_dir, "report.md")
if not os.path.exists(report_path):
print(f"Error: {report_path} not found.")
sys.exit(1)
company_name = symbol # Fallback
try:
with open(report_path, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.splitlines()
if not lines:
return symbol
# Attempt 1: Header Regex "# Name (Code) - Financial Report"
header_match = re.search(r'^#\s+(.+?)\s+\(', lines[0])
if header_match:
company_name = header_match.group(1).strip()
# Attempt 2: Table | Code | Name | ...
else:
for line in lines:
if f"| {symbol}" in line or f"| {symbol.upper()}" in line:
parts = line.split('|')
if len(parts) >= 3:
company_name = parts[2].strip()
break
except Exception as e:
print(f"Warning: Could not extract company name: {e}")
return company_name
def create_report_file(market, symbol, company_name):
date_str = datetime.now().strftime("%Y%m%d")
folder_name = "Reports"
file_name = f"{symbol}_{market}_{company_name}_{date_str}_基本面分析报告.md"
os.makedirs(folder_name, exist_ok=True)
target_path = os.path.join(folder_name, file_name)
# Find data directory again (should ideally be passed, but simple lookup works)
base_dir = os.path.join("data", market)
symbol_dir = os.path.join(base_dir, symbol)
if not os.path.exists(symbol_dir):
candidates = [d for d in os.listdir(base_dir) if d.startswith(symbol) and os.path.isdir(os.path.join(base_dir, d))]
if candidates:
symbol_dir = os.path.join(base_dir, candidates[0])
source_html = os.path.join(symbol_dir, "report.html")
if os.path.exists(source_html):
# Create a relative link to the HTML file instead of copying content
report_dir = os.path.dirname(target_path)
rel_path = os.path.relpath(source_html, report_dir)
with open(target_path, 'w', encoding='utf-8') as f:
f.write(f"# {company_name} ({symbol}) - 基本面分析报告\n\n")
f.write(f"**生成日期**: {date_str}\n\n")
else:
with open(target_path, 'w', encoding='utf-8') as f:
f.write(f"# {company_name} ({symbol}) - 基本面分析报告\n\n")
f.write(f"**生成日期**: {date_str}\n\n")
return target_path
def load_prompts():
prompts = {}
prompt_dir = "Prompt"
mapping = {
"company_profile": "公司简介.md",
"fundamental_analysis": "基本面分析.md",
"insider_analysis": "内部人与机构动向分析.md",
"bullish_analysis": "看涨分析.md",
"bearish_analysis": "看跌分析.md"
}
for key, filename in mapping.items():
try:
with open(os.path.join(prompt_dir, filename), 'r', encoding='utf-8') as f:
prompts[key] = f.read()
except FileNotFoundError:
print(f"Warning: Prompt file {filename} not found.")
return prompts
def call_llm(api_key, model_name, system_prompt, user_prompt, context, enable_search=False):
start_time = time.time()
# Combine system and user prompts for Gemini
full_prompt = f"{system_prompt}\n\n{user_prompt}\n\nExisting Report Data for context:\n{context}"
# Create client
client = genai.Client(api_key=api_key)
# Configure tools if search is enabled
config_params = {}
if enable_search:
grounding_tool = types.Tool(google_search=types.GoogleSearch())
config_params['tools'] = [grounding_tool]
config = types.GenerateContentConfig(**config_params)
try:
response = client.models.generate_content(
model=model_name,
contents=full_prompt,
config=config
)
end_time = time.time()
duration = end_time - start_time
# Extract usage information
usage = {
'prompt_tokens': response.usage_metadata.prompt_token_count if hasattr(response, 'usage_metadata') else 0,
'completion_tokens': response.usage_metadata.candidates_token_count if hasattr(response, 'usage_metadata') else 0,
'total_tokens': response.usage_metadata.total_token_count if hasattr(response, 'usage_metadata') else 0
}
# Create a simple usage object
class Usage:
def __init__(self, prompt_tokens, completion_tokens, total_tokens):
self.prompt_tokens = prompt_tokens
self.completion_tokens = completion_tokens
self.total_tokens = total_tokens
usage_obj = Usage(usage['prompt_tokens'], usage['completion_tokens'], usage['total_tokens'])
return response.text, usage_obj, duration
except Exception as e:
print(f"API Call Failed: {e}")
import traceback
traceback.print_exc()
return f"\n\nError generating section: {e}\n\n", None, 0
def render_html_report(report_file, market, symbol):
"""
将Markdown报告渲染成HTML,并在顶部嵌入财务数据HTML图表
Args:
report_file: Markdown报告文件路径
market: 市场代码
symbol: 股票代码
Returns:
生成的HTML文件路径
"""
# 读取Markdown报告
with open(report_file, 'r', encoding='utf-8') as f:
md_content = f.read()
# 查找财务数据HTML文件
base_dir = os.path.join("data", market)
symbol_dir = os.path.join(base_dir, symbol)
if not os.path.exists(symbol_dir):
candidates = [d for d in os.listdir(base_dir) if d.startswith(symbol) and os.path.isdir(os.path.join(base_dir, d))]
if candidates:
symbol_dir = os.path.join(base_dir, candidates[0])
financial_html_path = os.path.join(symbol_dir, "report.html")
financial_html_content = ""
if os.path.exists(financial_html_path):
with open(financial_html_path, 'r', encoding='utf-8') as f:
financial_html_content = f.read()
# 将Markdown转换为HTML,添加更多扩展以支持完整的Markdown语法
md = markdown.Markdown(extensions=[
'tables', # 表格支持
'fenced_code', # 代码块支持
'codehilite', # 代码高亮
'nl2br', # 换行转
'sane_lists', # 更好的列表处理
'attr_list', # 属性列表
'def_list', # 定义列表
'abbr', # 缩写
'footnotes', # 脚注
'md_in_html' # HTML中的Markdown
])
report_html_content = md.convert(md_content)
# 构建完整的HTML文档
html_template = f"""
财务数据图表未找到
'}