from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks from fastapi.responses import HTMLResponse, FileResponse from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select from sqlalchemy.orm import selectinload from app.database import get_db from app.schemas import StockSearchRequest, StockSearchResponse, AnalysisRequest, ReportResponse, AnalysisStatus, ConfigUpdateRequest from app.models import Report, Setting from app.services import analysis_service import os import markdown from weasyprint import HTML import io import tempfile from urllib.parse import quote from bs4 import BeautifulSoup router = APIRouter() @router.get("/health") def health_check(): return {"status": "healthy"} @router.post("/search", response_model=list[StockSearchResponse]) async def search_stock(request: StockSearchRequest, db: AsyncSession = Depends(get_db)): setting = await db.get(Setting, "GEMINI_API_KEY") api_key = setting.value if setting else os.getenv("GEMINI_API_KEY") if not api_key: raise HTTPException(status_code=500, detail="API Key not configured") # Get AI model setting model_setting = await db.get(Setting, "AI_MODEL") model = model_setting.value if model_setting else "gemini-2.0-flash" result = await analysis_service.search_stock(request.query, api_key, model) if isinstance(result, dict) and "error" in result: if isinstance(result, str) and "```json" in result: pass raise HTTPException(status_code=400, detail=str(result)) return result @router.post("/analyze", response_model=ReportResponse) async def start_analysis(request: AnalysisRequest, background_tasks: BackgroundTasks, db: AsyncSession = Depends(get_db)): # Get AI model setting model_setting = await db.get(Setting, "AI_MODEL") model = model_setting.value if model_setting else "gemini-2.0-flash" new_report = Report( market=request.market, symbol=request.symbol, company_name=request.company_name, status=AnalysisStatus.PENDING, ai_model=model ) db.add(new_report) await db.commit() await db.refresh(new_report) setting = await db.get(Setting, "GEMINI_API_KEY") api_key = setting.value if setting else os.getenv("GEMINI_API_KEY") if not api_key: new_report.status = AnalysisStatus.FAILED await db.commit() raise HTTPException(status_code=500, detail="API Key not configured") # Trigger background task background_tasks.add_task( analysis_service.run_analysis_task, new_report.id, request.market, request.symbol, api_key ) # Re-fetch with selectinload to avoid lazy loading issues result = await db.execute(select(Report).options(selectinload(Report.sections)).where(Report.id == new_report.id)) report_with_sections = result.scalar_one() return report_with_sections @router.get("/reports", response_model=list[ReportResponse]) async def get_reports(db: AsyncSession = Depends(get_db)): result = await db.execute(select(Report).options(selectinload(Report.sections)).order_by(Report.created_at.desc())) return result.scalars().all() @router.get("/reports/{report_id}", response_model=ReportResponse) async def get_report(report_id: int, db: AsyncSession = Depends(get_db)): result = await db.execute(select(Report).options(selectinload(Report.sections)).where(Report.id == report_id)) report = result.scalar_one_or_none() if not report: raise HTTPException(status_code=404, detail="Report not found") return report @router.get("/reports/{report_id}/html", response_class=HTMLResponse) async def get_report_html(report_id: int, db: AsyncSession = Depends(get_db)): result = await db.execute(select(Report).options(selectinload(Report.sections)).where(Report.id == report_id)) report = result.scalar_one_or_none() if not report: raise HTTPException(status_code=404, detail="Report not found") # Get Financial HTML (Charts) root_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../")) base_dir = os.path.join(root_dir, "data", report.market) symbol_dir = os.path.join(base_dir, report.symbol) # Fuzzy match logic financial_html = "" try: if not os.path.exists(symbol_dir) and os.path.exists(base_dir): candidates = [d for d in os.listdir(base_dir) if d.startswith(report.symbol) and os.path.isdir(os.path.join(base_dir, d))] if candidates: symbol_dir = os.path.join(base_dir, candidates[0]) start_html_path = os.path.join(symbol_dir, "report.html") if os.path.exists(start_html_path): with open(start_html_path, 'r', encoding='utf-8') as f: financial_html = f.read() else: financial_html = "
财务图表尚未生成,数据获取可能仍在进行中。
" except Exception as e: financial_html = f"加载财务图表时出错: {str(e)}
" # Only return financial charts, no analysis sections final_html = f"""财务图表尚未生成,数据获取可能仍在进行中。
" except Exception as e: financial_html = f"加载财务图表时出错: {str(e)}
" # Build analysis sections HTML sections_html = "" section_names = { 'company_profile': '公司简介', 'fundamental_analysis': '基本面分析', 'insider_analysis': '内部人士分析', 'bullish_analysis': '看涨分析', 'bearish_analysis': '看跌分析' } import re for section in sorted(report.sections, key=lambda s: list(section_names.keys()).index(s.section_name) if s.section_name in section_names else 999): # Pre-process markdown to fix list formatting content = section.content # 1. First, find cases where numbered lists are clumped in one line like "1. xxx 2. yyy" # and split them into multiple lines. content = re.sub(r'(\s)(\d+\.\s)', r'\n\2', content) # 2. Ensure list items have proper spacing lines = content.split('\n') fixed_lines = [] in_list = False for i, line in enumerate(lines): stripped = line.strip() # Check if this line is a list item (bullet or numbered) # Regex \d+\.\s matches "1. ", "2. ", etc. is_list_item = stripped.startswith('* ') or stripped.startswith('- ') or re.match(r'^\d+\.\s', stripped) if is_list_item: # Add blank line before first list item if not in_list and fixed_lines and fixed_lines[-1].strip(): fixed_lines.append('') in_list = True fixed_lines.append(line) else: # Add blank line after last list item if in_list and stripped: fixed_lines.append('') in_list = False fixed_lines.append(line) content = '\n'.join(fixed_lines) section_content = markdown.markdown(content, extensions=['tables', 'fenced_code']) sections_html += f"""