from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks from fastapi.responses import HTMLResponse, FileResponse from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from sqlalchemy.orm import selectinload from app.legacy.database_old import get_db from app.legacy.schemas import StockSearchRequest, StockSearchResponse, AnalysisRequest, ReportResponse, AnalysisStatus, ConfigUpdateRequest from app.legacy.models_old import Report, Setting from app.services import analysis_service import os import markdown from weasyprint import HTML import io import tempfile from urllib.parse import quote from bs4 import BeautifulSoup from pydantic import BaseModel from typing import List, Optional, Dict, Any from app.services.analysis_service import get_genai_client router = APIRouter() @router.get("/health") def health_check(): return {"status": "healthy"} @router.post("/search", response_model=list[StockSearchResponse]) async def search_stock(request: StockSearchRequest, db: AsyncSession = Depends(get_db)): setting = await db.get(Setting, "GEMINI_API_KEY") api_key = setting.value if setting else os.getenv("GEMINI_API_KEY") if not api_key: raise HTTPException(status_code=500, detail="API Key not configured") # Get AI model setting model_setting = await db.get(Setting, "AI_MODEL") model = model_setting.value if model_setting else "gemini-2.0-flash" result = await analysis_service.search_stock(request.query, api_key, model) if isinstance(result, dict) and "error" in result: if isinstance(result, str) and "```json" in result: pass raise HTTPException(status_code=400, detail=str(result)) return result @router.post("/analyze", response_model=ReportResponse) async def start_analysis(request: AnalysisRequest, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db)): # Get AI model if request.model: model = request.model else: model_setting = await db.get(Setting, "AI_MODEL") model = model_setting.value if model_setting else "gemini-2.0-flash" new_report = Report( market=request.market, symbol=request.symbol, company_name=request.company_name, status=AnalysisStatus.PENDING, ai_model=model ) db.add(new_report) await db.commit() await db.refresh(new_report) setting = await db.get(Setting, "GEMINI_API_KEY") api_key = setting.value if setting else os.getenv("GEMINI_API_KEY") if not api_key: new_report.status = AnalysisStatus.FAILED await db.commit() raise HTTPException(status_code=500, detail="API Key not configured") # Trigger background task background_tasks.add_task( analysis_service.run_analysis_task, new_report.id, request.market, request.symbol, api_key, request.data_source ) # Re-fetch with selectinload to avoid lazy loading issues result = await db.execute(select(Report).options(selectinload(Report.sections)).where(Report.id == new_report.id)) report_with_sections = result.scalar_one() return report_with_sections @router.get("/reports", response_model=list[ReportResponse]) async def get_reports(db: AsyncSession = Depends(get_db)): result = await db.execute(select(Report).options(selectinload(Report.sections)).order_by(Report.created_at.desc())) return result.scalars().all() @router.get("/reports") async def get_all_reports(db: AsyncSession = Depends(get_db)): """获取所有报告列表""" result = await db.execute( select(Report).order_by(Report.created_at.desc()).limit(100) ) reports = result.scalars().all() return [{ "id": r.id, "market": r.market, "symbol": r.symbol, "company_name": r.company_name, "status": r.status, "ai_model": r.ai_model, "created_at": r.created_at.isoformat() if r.created_at else None } for r in reports] @router.get("/reports/{report_id}", response_model=ReportResponse) async def get_report(report_id: int, db: AsyncSession = Depends(get_db)): result = await db.execute(select(Report).options(selectinload(Report.sections)).where(Report.id == report_id)) report = result.scalar_one_or_none() if not report: raise HTTPException(status_code=404, detail="Report not found") return report @router.get("/reports/{report_id}/html", response_class=HTMLResponse) async def get_report_html(report_id: int, db: AsyncSession = Depends(get_db)): result = await db.execute(select(Report).options(selectinload(Report.sections)).where(Report.id == report_id)) report = result.scalar_one_or_none() if not report: raise HTTPException(status_code=404, detail="Report not found") # Get Financial HTML (Charts) root_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../")) base_dir = os.path.join(root_dir, "data", report.market) symbol_dir = os.path.join(base_dir, report.symbol) # Fuzzy match logic financial_html = "" try: if not os.path.exists(symbol_dir) and os.path.exists(base_dir): candidates = [d for d in os.listdir(base_dir) if d.startswith(report.symbol) and os.path.isdir(os.path.join(base_dir, d))] if candidates: symbol_dir = os.path.join(base_dir, candidates[0]) start_html_path = os.path.join(symbol_dir, "report.html") if os.path.exists(start_html_path): with open(start_html_path, 'r', encoding='utf-8') as f: financial_html = f.read() else: financial_html = "

财务图表尚未生成,数据获取可能仍在进行中。

" except Exception as e: financial_html = f"

加载财务图表时出错: {str(e)}

" # If content is not ready, add auto-refresh meta tag meta_refresh = "" if "财务图表尚未生成" in financial_html: meta_refresh = '' # Only return financial charts, no analysis sections final_html = f""" {meta_refresh} {report.company_name} - 财务数据 {financial_html} """ return HTMLResponse(content=final_html, headers={"Cache-Control": "no-store, no-cache, must-revalidate", "Pragma": "no-cache", "Expires": "0"}) @router.get("/config") async def get_config(db: AsyncSession = Depends(get_db)): result = await db.execute(select(Setting)) settings = result.scalars().all() config_map = {s.key: s.value for s in settings} if "GEMINI_API_KEY" in config_map: config_map["GEMINI_API_KEY"] = "********" + config_map["GEMINI_API_KEY"][-4:] elif os.getenv("GEMINI_API_KEY"): val = os.getenv("GEMINI_API_KEY") config_map["GEMINI_API_KEY"] = "********" + val[-4:] else: config_map["GEMINI_API_KEY"] = "" return config_map @router.post("/config") async def update_config(request: ConfigUpdateRequest, db: AsyncSession = Depends(get_db)): setting = await db.get(Setting, request.key) if not setting: setting = Setting(key=request.key, value=request.value) db.add(setting) else: setting.value = request.value await db.commit() return {"status": "updated", "key": request.key} @router.get("/reports/{report_id}/pdf") async def download_report_pdf(report_id: int, db: AsyncSession = Depends(get_db)): result = await db.execute(select(Report).options(selectinload(Report.sections)).where(Report.id == report_id)) report = result.scalar_one_or_none() if not report: raise HTTPException(status_code=404, detail="Report not found") # Get Financial HTML (Charts) root_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../")) base_dir = os.path.join(root_dir, "data", report.market) symbol_dir = os.path.join(base_dir, report.symbol) # Fuzzy match logic financial_html = "" try: if not os.path.exists(symbol_dir) and os.path.exists(base_dir): candidates = [d for d in os.listdir(base_dir) if d.startswith(report.symbol) and os.path.isdir(os.path.join(base_dir, d))] if candidates: symbol_dir = os.path.join(base_dir, candidates[0]) start_html_path = os.path.join(symbol_dir, "report.html") if os.path.exists(start_html_path): with open(start_html_path, 'r', encoding='utf-8') as f: financial_html = f.read() # Parse and clean HTML - keep table styles but remove containers and backgrounds soup = BeautifulSoup(financial_html, 'html.parser') # Modify style tags to remove background colors for style_tag in soup.find_all('style'): style_content = style_tag.string if style_content: # Remove CSS variables for backgrounds style_content = '\n'.join([line for line in style_content.split('\n') if not any(bg in line.lower() for bg in ['--bg:', '--card-bg:', '--header-bg:', '--section-bg:'])]) # Remove background properties from body style_content = style_content.replace('background: var(--bg);', 'background: white;') style_content = style_content.replace('background: var(--card-bg);', '') style_content = style_content.replace('background: var(--header-bg);', '') style_tag.string = style_content # Remove container divs but keep content for div in soup.find_all('div', class_='report-container'): div.unwrap() # Remove inline styles from container divs only, preserve td/th styles (including background colors) for div in soup.find_all('div'): if div.get('style'): del div['style'] # Limit table columns to first 6 for table in soup.find_all('table'): for row in table.find_all('tr'): cells = row.find_all(['th', 'td']) if len(cells) > 6: for cell in cells[6:]: cell.decompose() financial_html = str(soup) else: financial_html = "

财务图表尚未生成,数据获取可能仍在进行中。

" except Exception as e: financial_html = f"

加载财务图表时出错: {str(e)}

" # Build analysis sections HTML sections_html = "" section_names = { 'company_profile': '公司简介', 'fundamental_analysis': '基本面分析', 'insider_analysis': '内部人士分析', 'bullish_analysis': '看涨分析', 'bearish_analysis': '看跌分析' } import re for section in sorted(report.sections, key=lambda s: list(section_names.keys()).index(s.section_name) if s.section_name in section_names else 999): # Pre-process markdown to fix list formatting content = section.content # 1. First, find cases where numbered lists are clumped in one line like "1. xxx 2. yyy" # and split them into multiple lines. content = re.sub(r'(\s)(\d+\.\s)', r'\n\2', content) # 2. Ensure list items have proper spacing lines = content.split('\n') fixed_lines = [] in_list = False for i, line in enumerate(lines): stripped = line.strip() # Check if this line is a list item (bullet or numbered) # Regex \d+\.\s matches "1. ", "2. ", etc. is_list_item = stripped.startswith('* ') or stripped.startswith('- ') or re.match(r'^\d+\.\s', stripped) if is_list_item: # Add blank line before first list item if not in_list and fixed_lines and fixed_lines[-1].strip(): fixed_lines.append('') in_list = True fixed_lines.append(line) else: # Add blank line after last list item if in_list and stripped: fixed_lines.append('') in_list = False fixed_lines.append(line) content = '\n'.join(fixed_lines) section_content = markdown.markdown(content, extensions=['tables', 'fenced_code']) sections_html += f"""
{section_content}
""" # Complete PDF HTML # Timezone conversion for display from datetime import datetime, timezone, timedelta shanghai_tz = timezone(timedelta(hours=8)) # Use report creation time for the report date in PDF report_time = report.created_at.astimezone(shanghai_tz) if report.created_at.tzinfo else report.created_at.replace(tzinfo=timezone.utc).astimezone(shanghai_tz) # Current time for filename (download date) now_shanghai = datetime.now(shanghai_tz) download_date_str = now_shanghai.strftime('%Y%m%d') complete_html = f""" {report.company_name} - 分析报告

{report.company_name}

{report.market} {report.symbol}
分析日期: {report_time.strftime('%Y年%m月%d日')}
{financial_html}
{sections_html} """ # Generate PDF try: # Create a temporary file for the PDF with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: pdf_path = tmp_file.name HTML(string=complete_html).write_pdf(pdf_path) # Return the PDF file # Filename format: Company_Symbol_Date_Report.pdf filename = f"{report.company_name}_{report.symbol}_{download_date_str}_分析报告.pdf" # Use RFC 5987 encoding for non-ASCII filenames filename_encoded = quote(filename) return FileResponse( path=pdf_path, media_type='application/pdf', headers={ 'Content-Disposition': f"attachment; filename*=UTF-8''{filename_encoded}" } ) except Exception as e: class ChatMessage(BaseModel): role: str content: str class ChatRequest(BaseModel): messages: List[ChatMessage] model: str system_prompt: Optional[str] = None @router.post("/chat") async def chat_with_ai(request: ChatRequest): try: client = get_genai_client() # Prepare content for Gemini # Gemini 2.0 Flash expects simple string or list of contents # We need to format the conversation history and system prompt full_prompt = "" if request.system_prompt: full_prompt += f"System: {request.system_prompt}\n\n" for msg in request.messages: role_label = "User" if msg.role == "user" else "Model" full_prompt += f"{role_label}: {msg.content}\n" full_prompt += "Model: " # Prompt for completion response = client.models.generate_content( model=request.model, contents=full_prompt ) return {"response": response.text} except Exception as e: raise HTTPException(status_code=500, detail=str(e))