From d28f3c5266d5a2a5f83eaa3aec54c9132c61e547 Mon Sep 17 00:00:00 2001 From: "Lv, Qi" Date: Wed, 19 Nov 2025 17:30:52 +0800 Subject: [PATCH] feat: update analysis workflow and fix LLM client connection issues - Enhance LlmClient to handle malformed URLs and HTML error responses - Improve logging in report-generator-service worker - Update frontend API routes and hooks for analysis - Update various service configurations and persistence logic --- docker-compose.yml | 2 + ...ve]_implement_alphavantage_test_button.md} | 0 ...20251118_analysis_workflow_optimization.md | 90 ++++++ frontend/Dockerfile.prod | 55 ++++ frontend/next.config.mjs | 5 +- .../src/app/api/analysis-results/route.ts | 29 ++ .../src/app/api/configs/llm/test/route.ts | 41 +++ .../src/app/api/financials/[...slug]/route.ts | 69 ++++- .../src/app/config/components/AIConfigTab.tsx | 50 ++- .../[symbol]/hooks/useAnalysisRunner.ts | 5 +- .../report/[symbol]/hooks/useReportData.ts | 6 +- frontend/src/app/report/[symbol]/page.tsx | 19 +- frontend/src/components/TradingViewWidget.tsx | 3 +- frontend/src/components/ui/dialog.tsx | 33 +- frontend/src/hooks/useApi.ts | 13 + scripts/deploy_to_harbor.sh | 287 ++++++++++++++++++ .../src/mapping.rs | 1 + .../src/persistence.rs | 10 + .../src/worker.rs | 41 ++- services/api-gateway/src/api.rs | 82 +++-- services/api-gateway/src/config.rs | 4 + services/api-gateway/src/persistence.rs | 28 +- services/common-contracts/src/dtos.rs | 3 +- services/common-contracts/src/messages.rs | 2 + .../src/api/analysis.rs | 77 +++-- .../src/api/companies.rs | 1 + .../src/api/configs.rs | 1 - .../src/api/market_data.rs | 2 +- .../data-persistence-service/src/models.rs | 4 +- .../data-persistence-service/src/seeding.rs | 1 - .../tests/api_tests.rs | 1 + .../tests/db_tests.rs | 1 + .../finnhub-provider-service/src/mapping.rs | 1 + .../finnhub-provider-service/src/worker.rs | 1 + services/report-generator-service/Cargo.lock | 274 ++++++++++++++++- services/report-generator-service/Cargo.toml | 3 +- services/report-generator-service/src/api.rs | 30 +- .../src/llm_client.rs | 101 +++--- .../src/message_consumer.rs | 37 ++- .../report-generator-service/src/worker.rs | 32 +- .../src/persistence.rs | 10 + .../tushare-provider-service/src/tushare.rs | 1 + .../tushare-provider-service/src/worker.rs | 62 +++- .../yfinance-provider-service/src/mapping.rs | 1 + .../src/persistence.rs | 10 + .../yfinance-provider-service/src/worker.rs | 51 +++- 46 files changed, 1435 insertions(+), 145 deletions(-) rename docs/3_project_management/tasks/{pending/20251118_[Pending]_implement_alphavantage_test_button.md => completed/20251118_[Active]_implement_alphavantage_test_button.md} (100%) create mode 100644 docs/3_project_management/tasks/pending/20251118_analysis_workflow_optimization.md create mode 100644 frontend/Dockerfile.prod create mode 100644 frontend/src/app/api/analysis-results/route.ts create mode 100644 frontend/src/app/api/configs/llm/test/route.ts create mode 100755 scripts/deploy_to_harbor.sh diff --git a/docker-compose.yml b/docker-compose.yml index a93565d..3c84409 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -83,6 +83,7 @@ services: SERVER_PORT: 4000 NATS_ADDR: nats://nats:4222 DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1 + REPORT_GENERATOR_SERVICE_URL: http://report-generator-service:8004 # provider_services via explicit JSON for deterministic parsing PROVIDER_SERVICES: '["http://alphavantage-provider-service:8000", "http://tushare-provider-service:8001", "http://finnhub-provider-service:8002", "http://yfinance-provider-service:8003"]' RUST_LOG: info,axum=info @@ -94,6 +95,7 @@ services: - tushare-provider-service - finnhub-provider-service - yfinance-provider-service + - report-generator-service networks: - app-network healthcheck: diff --git a/docs/3_project_management/tasks/pending/20251118_[Pending]_implement_alphavantage_test_button.md b/docs/3_project_management/tasks/completed/20251118_[Active]_implement_alphavantage_test_button.md similarity index 100% rename from docs/3_project_management/tasks/pending/20251118_[Pending]_implement_alphavantage_test_button.md rename to docs/3_project_management/tasks/completed/20251118_[Active]_implement_alphavantage_test_button.md diff --git a/docs/3_project_management/tasks/pending/20251118_analysis_workflow_optimization.md b/docs/3_project_management/tasks/pending/20251118_analysis_workflow_optimization.md new file mode 100644 index 0000000..1485c2c --- /dev/null +++ b/docs/3_project_management/tasks/pending/20251118_analysis_workflow_optimization.md @@ -0,0 +1,90 @@ +# 分析流程优化与数据缓存机制修复 + +## 1. 问题背景 (Problem Statement) + +根据系统日志分析与代码排查,当前系统存在以下关键问题: + +1. **数据源重复请求 (Missing Cache Logic)**: + * `yfinance-provider-service` (及其他数据服务) 在接收到任务指令时,未检查本地数据库是否存在有效数据,而是直接向外部 API 发起请求。 + * 这导致每次用户点击,都会触发耗时约 1.5s 的外部抓取,既慢又浪费资源。 + +2. **任务依赖与执行时序错乱 (Race Condition)**: + * `api-gateway` 在接收到请求时,**同时**触发了数据抓取 (`DATA_FETCH_QUEUE`) 和分析报告生成 (`ANALYSIS_COMMANDS_QUEUE`)。 + * 导致 `report-generator-service` 在数据还没抓回来时就启动了,读到空数据(或旧数据)后瞬间完成,导致 Token 消耗为 0,报告内容为空。 + +3. **前端无法展示数据 (Frontend Data Visibility)**: + * **根本原因**: API Gateway 路由缺失与路径映射错误。 + * 前端 BFF (`frontend/src/app/api/financials/...`) 试图请求 `${BACKEND_BASE}/market-data/financial-statements/...`。 + * 然而,`api-gateway` **并未暴露** 此路由(仅暴露了 `/v1/companies/{symbol}/profile`)。 + * 因此,前端获取财务数据的请求全部 404 失败,导致界面始终显示 "暂无可展示的数据",即使用户多次运行也无效。 + +## 2. 目标 (Goals) + +1. **实现"读写穿透"缓存策略**: 数据服务在抓取前必须先检查本地数据库数据的时效性。 +2. **构建事件驱动的依赖工作流**: 分析服务必须严格等待数据服务完成后触发(通过 NATS 事件链)。 +3. **修复数据访问层**: 确保 API Gateway 正确暴露并转发财务数据接口,使前端可见。 +4. **定义数据时效性标准**: 针对不同类型数据实施差异化的缓存过期策略。 + +## 3. 详细技术方案 (Technical Plan) + +### 3.1. 数据时效性与缓存策略 (Data Freshness Policy) + +针对基本面分析场景,不同数据的更新频率和时效性要求如下: + +| 数据类型 | 内容示例 | 更新频率 | 建议 TTL (缓存有效期) | 更新策略 | +| :--- | :--- | :--- | :--- | :--- | +| **公司概况 (Profile)** | 名称、行业、简介、高管 | 极低 (年/不定期) | **30 天** | **Stale-While-Revalidate (SWR)**
过期后先返回旧数据,后台异步更新。 | +| **财务报表 (Financials)** | 营收、利润、资产负债 (季/年) | 季度 (4/8/10月) | **24 小时** | **Cache-Aside**
每次请求先查库。若 `updated_at > 24h`,则强制 Fetch;否则直接返回库中数据。
*注: 对于同一天内的重复请求,将直接命中缓存,0延迟。* | +| **市场数据 (Market Data)** | PE/PB/市值/价格 | 实时/日频 | **1 小时** | 基本面分析不需要秒级价格。取最近一小时内的快照即可。若需实时价格,使用专用实时接口。 | + +### 3.2. 数据服务层 (Providers) +* **涉及服务**: `yfinance-provider-service`, `alphavantage-provider-service`, `finnhub-provider-service`. +* **逻辑变更**: + 1. 订阅 `FetchCommand`。 + 2. **Step 1 (Check DB)**: 调用 `PersistenceClient` 获取目标 Symbol 数据的 `updated_at`。 + 3. **Step 2 (Decision)**: + * 若 `now - updated_at < TTL`: **Hit Cache**. Log "Cache Hit", 跳过外部请求,直接进入 Step 4。 + * 若数据不存在 或 `now - updated_at > TTL`: **Miss Cache**. Log "Cache Miss", 执行外部 API 抓取。 + 4. **Step 3 (Upsert)**: 将抓取的数据存入 DB (Update `updated_at` = now)。 + 5. **Step 4 (Publish Event)**: 发布 `CompanyDataPersistedEvent` (包含 symbol, data_types: ["profile", "financials"])。 + +### 3.3. 工作流编排 (Workflow Orchestration) +* **API Gateway**: + * 移除 `POST /data-requests` 中自动触发 Analysis 的逻辑。 + * 只发布 `FetchCompanyDataCommand`。 +* **Report Generator**: + * **不再监听** `StartAnalysisCommand` (作为触发源)。 + * 改为监听 `CompanyDataPersistedEvent`。 + * 收到事件后,检查事件中的 `request_id` 是否关联了待处理的分析任务(或者简单的:收到数据更新就检查是否有待跑的分析模板)。 + * *临时方案*: 为了简化,可以在 API Gateway 发送 Fetch 命令时,在 payload 里带上 `trigger_analysis: true` 和 `template_id`。Data Provider 在发出的 `PersistedEvent` 里透传这些字段。Report Generator 看到 `trigger_analysis: true` 才执行。 + +### 3.4. API 修复 (Fixing Visibility) +* **Backend (API Gateway)**: + * 在 `create_v1_router` 中新增路由: + * `GET /v1/market-data/financial-statements/{symbol}` -> 转发至 Data Persistence Service。 + * `GET /v1/market-data/quotes/{symbol}` -> 转发至 Data Persistence Service (可选)。 +* **Frontend (Next.js API Route)**: + * 修改 `frontend/src/app/api/financials/[...slug]/route.ts`。 + * 将请求路径从 `${BACKEND_BASE}/market-data/...` 修正为 `${BACKEND_BASE}/v1/market-data/...` (匹配 Gateway 新路由)。 + * 或者直接修正为 Data Persistence Service 的正确路径 (但最佳实践是走 Gateway)。 + +## 4. 执行计划 (Action Items) + +### Phase 1: API & Frontend 可见性修复 (立即执行) +1. [x] **API Gateway**: 添加 `/v1/market-data/financial-statements/{symbol}` 路由。 +2. [x] **Frontend**: 修正 `route.ts` 中的后端请求路径。(通过修正 Gateway 路由适配前端) +3. [ ] **验证**: 打开页面,应能看到(哪怕是旧的)财务图表数据,不再显示 404/无数据。 + +### Phase 2: 缓存与时效性逻辑 (核心) +4. [x] **Data Providers**: 在 `worker.rs` 中实现 TTL 检查逻辑 (Profile: 30d, Financials: 24h)。(YFinance 已实现,其他 Provider 已适配事件) +5. [x] **Persistence Service**: 确保 `get_company_profile` 和 `get_financials` 返回 `updated_at` 字段(如果还没有的话)。 + +### Phase 3: 事件驱动工作流 (解决 Race Condition) +6. [x] **Contracts**: 定义新事件 `CompanyDataPersistedEvent` (含 `trigger_analysis` 标记)。 +7. [x] **API Gateway**: 停止直接发送 Analysis 命令,将其参数打包进 Fetch 命令。 +8. [x] **Data Providers**: 完成任务后发布 `PersistedEvent`。 +9. [x] **Report Generator**: 监听 `PersistedEvent` 触发分析。 + +## 5. 待确认 +* 是否需要为每个数据源单独设置 TTL?(暂定统一策略) +* 前端是否需要显示数据的"上次更新时间"?(建议加上,增强用户信任) diff --git a/frontend/Dockerfile.prod b/frontend/Dockerfile.prod new file mode 100644 index 0000000..eb34a05 --- /dev/null +++ b/frontend/Dockerfile.prod @@ -0,0 +1,55 @@ +FROM node:20-alpine AS base + +# 1. Install dependencies only when needed +FROM base AS deps +# Check https://github.com/nodejs/docker-node/tree/b4117f9333da4138b03a546ec926ef50a31506c3#nodealpine to understand why libc6-compat might be needed. +RUN apk add --no-cache libc6-compat + +WORKDIR /app + +# Install dependencies +COPY frontend/package.json frontend/package-lock.json* ./ +RUN npm ci + +# 2. Rebuild the source code only when needed +FROM base AS builder +WORKDIR /app +COPY --from=deps /app/node_modules ./node_modules +COPY frontend ./ + +# Environment variables +ENV NEXT_TELEMETRY_DISABLED=1 +ENV NODE_ENV=production + +RUN npm run build + +# 3. Production image, copy all the files and run next +FROM base AS runner +WORKDIR /app + +ENV NODE_ENV=production +ENV NEXT_TELEMETRY_DISABLED=1 + +RUN addgroup --system --gid 1001 nodejs +RUN adduser --system --uid 1001 nextjs + +COPY --from=builder /app/public ./public + +# Set the correct permission for prerender cache +RUN mkdir .next +RUN chown nextjs:nodejs .next + +# Automatically leverage output traces to reduce image size +# https://nextjs.org/docs/advanced-features/output-file-tracing +COPY --from=builder --chown=nextjs:nodejs /app/.next/standalone ./ +COPY --from=builder --chown=nextjs:nodejs /app/.next/static ./.next/static + +USER nextjs + +EXPOSE 3000 + +ENV PORT=3000 +ENV HOSTNAME="0.0.0.0" + +CMD ["node", "server.js"] + diff --git a/frontend/next.config.mjs b/frontend/next.config.mjs index ef06953..1f1640a 100644 --- a/frontend/next.config.mjs +++ b/frontend/next.config.mjs @@ -13,6 +13,9 @@ const nextConfig = { experimental: { proxyTimeout: 300000, // 300 seconds (5 minutes) }, + // Optimize for Docker deployment only in production + // 当 NODE_ENV 为 production 时开启 standalone 模式 + output: process.env.NODE_ENV === 'production' ? 'standalone' : undefined, }; -export default nextConfig; \ No newline at end of file +export default nextConfig; diff --git a/frontend/src/app/api/analysis-results/route.ts b/frontend/src/app/api/analysis-results/route.ts new file mode 100644 index 0000000..5fb38a9 --- /dev/null +++ b/frontend/src/app/api/analysis-results/route.ts @@ -0,0 +1,29 @@ +import { NextRequest } from 'next/server'; + +const BACKEND_BASE = process.env.BACKEND_INTERNAL_URL || process.env.NEXT_PUBLIC_BACKEND_URL; + +export async function GET(req: NextRequest) { + if (!BACKEND_BASE) { + return new Response('BACKEND_INTERNAL_URL/NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 }); + } + const { searchParams } = new URL(req.url); + const symbol = searchParams.get('symbol'); + + if (!symbol) { + return new Response('Missing symbol parameter', { status: 400 }); + } + + const resp = await fetch(`${BACKEND_BASE}/analysis-results?symbol=${encodeURIComponent(symbol)}`, { cache: 'no-store' }); + + if (!resp.ok) { + if (resp.status === 404) { + // Return empty list if not found, to avoid UI errors + return Response.json([]); + } + return new Response(resp.statusText, { status: resp.status }); + } + + const data = await resp.json(); + return Response.json(data); +} + diff --git a/frontend/src/app/api/configs/llm/test/route.ts b/frontend/src/app/api/configs/llm/test/route.ts new file mode 100644 index 0000000..892cfd1 --- /dev/null +++ b/frontend/src/app/api/configs/llm/test/route.ts @@ -0,0 +1,41 @@ +import { NextRequest } from 'next/server'; + +const BACKEND_BASE = process.env.BACKEND_INTERNAL_URL || process.env.NEXT_PUBLIC_BACKEND_URL; + +export async function POST(req: NextRequest) { + if (!BACKEND_BASE) { + return new Response('BACKEND_INTERNAL_URL/NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 }); + } + + try { + const body = await req.json(); + + // 将请求转发到 API Gateway + const targetUrl = `${BACKEND_BASE.replace(/\/$/, '')}/configs/llm/test`; + + const backendRes = await fetch(targetUrl, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify(body), + }); + + const backendResBody = await backendRes.text(); + + return new Response(backendResBody, { + status: backendRes.status, + headers: { + 'Content-Type': 'application/json', + }, + }); + + } catch (error: any) { + console.error('LLM测试代理失败:', error); + return new Response(JSON.stringify({ success: false, message: error.message || '代理请求时发生未知错误' }), { + status: 500, + headers: { 'Content-Type': 'application/json' }, + }); + } +} + diff --git a/frontend/src/app/api/financials/[...slug]/route.ts b/frontend/src/app/api/financials/[...slug]/route.ts index 277b281..f08b330 100644 --- a/frontend/src/app/api/financials/[...slug]/route.ts +++ b/frontend/src/app/api/financials/[...slug]/route.ts @@ -23,10 +23,12 @@ export async function GET( const years = url.searchParams.get('years') || '10'; // Fetch financials from backend - const finResp = await fetch( - `${BACKEND_BASE}/market-data/financial-statements/${encodeURIComponent(symbol)}?metrics=${encodeURIComponent('')}`, - { cache: 'no-store' } - ); + // Corrected path to match new API Gateway route + const metricsParam = url.searchParams.get('metrics') || ''; + const fetchUrl = `${BACKEND_BASE}/market-data/financial-statements/${encodeURIComponent(symbol)}` + + (metricsParam ? `?metrics=${encodeURIComponent(metricsParam)}` : ''); + + const finResp = await fetch(fetchUrl, { cache: 'no-store' }); if (!finResp.ok) { if (finResp.status === 404) { @@ -52,6 +54,7 @@ export async function GET( }); // Fetch Company Profile to populate name/industry + // Corrected path to match new API Gateway route const profileResp = await fetch(`${BACKEND_BASE}/companies/${encodeURIComponent(symbol)}/profile`, { cache: 'no-store' }); let profileData: any = {}; if (profileResp.ok) { @@ -104,6 +107,64 @@ export async function GET( const text = await resp.text(); return new Response(text, { status: resp.status, headers: { 'Content-Type': 'application/json' } }); } + + // 2. Match /api/financials/{market}/{symbol}/analysis/{type}/stream + // slug length = 5 + // slug[0] = market + // slug[1] = symbol + // slug[2] = 'analysis' + // slug[3] = analysisType (module_id) + // slug[4] = 'stream' + if (slug.length === 5 && slug[2] === 'analysis' && slug[4] === 'stream') { + const symbol = slug[1]; + const analysisType = slug[3]; + + const encoder = new TextEncoder(); + + const stream = new ReadableStream({ + async start(controller) { + // Polling logic + // We try for up to 60 seconds + const maxRetries = 30; + let found = false; + + for (let i = 0; i < maxRetries; i++) { + try { + const resp = await fetch(`${BACKEND_BASE}/analysis-results?symbol=${encodeURIComponent(symbol)}&module_id=${encodeURIComponent(analysisType)}`, { cache: 'no-store' }); + + if (resp.ok) { + const results = await resp.json(); + // Assuming results are sorted by created_at DESC (backend behavior) + if (Array.isArray(results) && results.length > 0) { + const latest = results[0]; + // If result is found, send it and exit + if (latest && latest.content) { + controller.enqueue(encoder.encode(latest.content)); + found = true; + break; + } + } + } + } catch (e) { + console.error("Error polling analysis results", e); + } + + // Wait 2 seconds before next poll + await new Promise(resolve => setTimeout(resolve, 2000)); + } + controller.close(); + } + }); + + return new Response(stream, { + headers: { + 'Content-Type': 'text/plain; charset=utf-8', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive' + } + }); + } + // 其他旧 financials 端点在新架构中未实现:返回空对象以避免前端 JSON 解析错误 return Response.json({}, { status: 200 }); } diff --git a/frontend/src/app/config/components/AIConfigTab.tsx b/frontend/src/app/config/components/AIConfigTab.tsx index 5d3d2b5..a668ed5 100644 --- a/frontend/src/app/config/components/AIConfigTab.tsx +++ b/frontend/src/app/config/components/AIConfigTab.tsx @@ -9,8 +9,16 @@ import { Badge } from "@/components/ui/badge"; import { ScrollArea } from "@/components/ui/scroll-area"; import { Spinner } from "@/components/ui/spinner"; import { Separator } from "@/components/ui/separator"; -import { useLlmProviders, updateLlmProviders, discoverProviderModels } from '@/hooks/useApi'; +import { useLlmProviders, updateLlmProviders, discoverProviderModels, testLlmModel } from '@/hooks/useApi'; import type { LlmProvidersConfig, LlmModel } from '@/types'; +import { + Dialog, + DialogContent, + DialogDescription, + DialogFooter, + DialogHeader, + DialogTitle, +} from "@/components/ui/dialog"; export function AIConfigTab({ newProviderBaseUrl, @@ -45,6 +53,10 @@ export function AIConfigTab({ const [newModelMenuOpen, setNewModelMenuOpen] = useState>({}); const [newModelHighlightIndex, setNewModelHighlightIndex] = useState>({}); + // Test State + const [testResult, setTestResult] = useState<{ success: boolean; message: string } | null>(null); + const [isTesting, setIsTesting] = useState(null); + // Refs for Auto-save const hasInitializedLlmRef = useRef(false); const autoSaveTimerRef = useRef | null>(null); @@ -465,6 +477,24 @@ export function AIConfigTab({ /> + + + + ); } - diff --git a/frontend/src/app/report/[symbol]/hooks/useAnalysisRunner.ts b/frontend/src/app/report/[symbol]/hooks/useAnalysisRunner.ts index 44772ba..34282e5 100644 --- a/frontend/src/app/report/[symbol]/hooks/useAnalysisRunner.ts +++ b/frontend/src/app/report/[symbol]/hooks/useAnalysisRunner.ts @@ -56,7 +56,10 @@ export function useAnalysisRunner( // Derive effective analysis config from template set, falling back to global config if needed const activeAnalysisConfig = useMemo(() => { if (activeTemplateSet) { - return { analysis_modules: activeTemplateSet.modules }; + return { + ...financialConfig, + analysis_modules: activeTemplateSet.modules, + }; } return financialConfig; // Fallback to global config (legacy behavior) }, [activeTemplateSet, financialConfig]); diff --git a/frontend/src/app/report/[symbol]/hooks/useReportData.ts b/frontend/src/app/report/[symbol]/hooks/useReportData.ts index 63d2b57..9e66a40 100644 --- a/frontend/src/app/report/[symbol]/hooks/useReportData.ts +++ b/frontend/src/app/report/[symbol]/hooks/useReportData.ts @@ -1,5 +1,5 @@ import { useParams, useSearchParams } from 'next/navigation'; -import { useChinaFinancials, useFinancials, useFinancialConfig, useAnalysisTemplateSets, useSnapshot, useRealtimeQuote } from '@/hooks/useApi'; +import { useChinaFinancials, useFinancials, useFinancialConfig, useAnalysisTemplateSets, useSnapshot, useRealtimeQuote, useAnalysisResults } from '@/hooks/useApi'; export function useReportData() { const params = useParams(); @@ -42,6 +42,7 @@ export function useReportData() { const { data: realtime, error: realtimeError, isLoading: realtimeLoading } = useRealtimeQuote(normalizedMarket, unifiedSymbol, { maxAgeSeconds: 30, refreshIntervalMs: 5000 }); const { data: financialConfig } = useFinancialConfig(); const { data: templateSets } = useAnalysisTemplateSets(); + const { data: historicalAnalysis } = useAnalysisResults(unifiedSymbol); return { symbol, @@ -59,7 +60,8 @@ export function useReportData() { realtimeLoading, realtimeError, financialConfig: financialConfig as any, - templateSets + templateSets, + historicalAnalysis }; } diff --git a/frontend/src/app/report/[symbol]/page.tsx b/frontend/src/app/report/[symbol]/page.tsx index 0a8819b..deed765 100644 --- a/frontend/src/app/report/[symbol]/page.tsx +++ b/frontend/src/app/report/[symbol]/page.tsx @@ -25,9 +25,11 @@ export default function ReportPage() { realtimeLoading, realtimeError, financialConfig, + templateSets, } = useReportData(); const { + activeAnalysisConfig, analysisTypes, analysisStates, analysisRecords, @@ -45,7 +47,9 @@ export default function ReportPage() { retryAnalysis, hasRunningTask, isAnalysisRunning, - } = useAnalysisRunner(financials, financialConfig, normalizedMarket, unifiedSymbol, isLoading, error); + selectedTemplateId, + setSelectedTemplateId, + } = useAnalysisRunner(financials, financialConfig, normalizedMarket, unifiedSymbol, isLoading, error, templateSets); return (
@@ -63,6 +67,9 @@ export default function ReportPage() { onStartAnalysis={triggerAnalysis} onStopAnalysis={stopAll} onContinueAnalysis={continuePending} + templateSets={templateSets} + selectedTemplateId={selectedTemplateId} + onSelectTemplate={setSelectedTemplateId} />
@@ -81,7 +88,7 @@ export default function ReportPage() { 财务数据 {analysisTypes.map(type => ( - {type === 'company_profile' ? '公司简介' : (financialConfig?.analysis_modules?.[type]?.name || type)} + {type === 'company_profile' ? '公司简介' : (activeAnalysisConfig?.analysis_modules?.[type]?.name || type)} ))} 执行详情 @@ -102,7 +109,7 @@ export default function ReportPage() { financials={financials} isLoading={isLoading} error={error} - financialConfig={financialConfig} + financialConfig={activeAnalysisConfig} /> @@ -112,7 +119,7 @@ export default function ReportPage() { analysisType={analysisType} state={analysisStates[analysisType] || { content: '', loading: false, error: null }} financials={financials} - analysisConfig={financialConfig} + analysisConfig={activeAnalysisConfig} retryAnalysis={retryAnalysis} currentAnalysisTask={currentAnalysisTask} /> @@ -134,5 +141,3 @@ export default function ReportPage() { ); } - - diff --git a/frontend/src/components/TradingViewWidget.tsx b/frontend/src/components/TradingViewWidget.tsx index 250aa16..aa09511 100644 --- a/frontend/src/components/TradingViewWidget.tsx +++ b/frontend/src/components/TradingViewWidget.tsx @@ -118,8 +118,9 @@ export function TradingViewWidget({ if (container.isConnected) { container.appendChild(script); } - } catch { + } catch (e) { // 忽略偶发性 contentWindow 不可用的报错 + console.warn('TradingView widget mount error:', e); } }); } diff --git a/frontend/src/components/ui/dialog.tsx b/frontend/src/components/ui/dialog.tsx index d3ca2a7..0b0e716 100644 --- a/frontend/src/components/ui/dialog.tsx +++ b/frontend/src/components/ui/dialog.tsx @@ -10,24 +10,39 @@ type DialogProps = BaseProps & { onOpenChange?: (open: boolean) => void; }; -export const Dialog: React.FC = ({ children }) => { - return
{children}
; +export const Dialog: React.FC = ({ children, open, onOpenChange }) => { + if (!open) return null; + return ( +
+
+ {children} + +
+
+ ); }; export const DialogContent: React.FC = ({ children, className }) => { return
{children}
; }; -export const DialogHeader: React.FC = ({ children }) => { - return
{children}
; +export const DialogHeader: React.FC = ({ children, className }) => { + return
{children}
; }; -export const DialogTitle: React.FC = ({ children }) => { - return

{children}

; +export const DialogTitle: React.FC = ({ children, className }) => { + return

{children}

; }; -export const DialogFooter: React.FC = ({ children }) => { - return
{children}
; +export const DialogDescription: React.FC = ({ children, className }) => { + return

{children}

; }; - +export const DialogFooter: React.FC = ({ children, className }) => { + return
{children}
; +}; diff --git a/frontend/src/hooks/useApi.ts b/frontend/src/hooks/useApi.ts index ba8e03d..553c345 100644 --- a/frontend/src/hooks/useApi.ts +++ b/frontend/src/hooks/useApi.ts @@ -351,6 +351,19 @@ export async function discoverProviderModelsPreview(apiBaseUrl: string, apiKey: return res.json(); } +export async function testLlmModel(apiBaseUrl: string, apiKey: string, modelId: string) { + const res = await fetch('/api/configs/llm/test', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ api_base_url: apiBaseUrl, api_key: apiKey, model_id: modelId }), + }); + if (!res.ok) { + const text = await res.text().catch(() => ''); + throw new Error(text || `HTTP ${res.status}`); + } + return res.json(); +} + // --- Analysis Template Sets Config Hooks (NEW) --- export function useAnalysisTemplateSets() { diff --git a/scripts/deploy_to_harbor.sh b/scripts/deploy_to_harbor.sh new file mode 100755 index 0000000..b8d368e --- /dev/null +++ b/scripts/deploy_to_harbor.sh @@ -0,0 +1,287 @@ +#!/bin/bash + +# 遇到错误立即退出 +set -e + +# 配置变量 +REGISTRY="harbor.3prism.ai" +PROJECT="fundamental_analysis" +VERSION="latest" # 或者使用 $(date +%Y%m%d%H%M%S) 生成时间戳版本 +NAMESPACE="$REGISTRY/$PROJECT" + +# 颜色输出 +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +RED='\033[0;31m' +NC='\033[0m' # No Color + +echo -e "${GREEN}=== 开始构建并推送镜像到 $NAMESPACE ===${NC}" + +# 定义服务列表及其 Dockerfile 路径 +# 格式: "服务名:Dockerfile路径" +SERVICES=( + "data-persistence-service:services/data-persistence-service/Dockerfile" + "api-gateway:services/api-gateway/Dockerfile" + "alphavantage-provider-service:services/alphavantage-provider-service/Dockerfile" + "tushare-provider-service:services/tushare-provider-service/Dockerfile" + "finnhub-provider-service:services/finnhub-provider-service/Dockerfile" + "yfinance-provider-service:services/yfinance-provider-service/Dockerfile" + "report-generator-service:services/report-generator-service/Dockerfile" + "frontend:frontend/Dockerfile.prod" +) + +# 总大小计数器 +TOTAL_SIZE=0 + +for entry in "${SERVICES[@]}"; do + KEY="${entry%%:*}" + DOCKERFILE="${entry#*:}" + IMAGE_NAME="$NAMESPACE/$KEY:$VERSION" + + echo -e "\n${YELLOW}>>> 正在构建 $KEY ...${NC}" + echo "使用 Dockerfile: $DOCKERFILE" + + # 构建镜像 + # 注意:构建上下文始终为项目根目录 (.) + docker build -t "$IMAGE_NAME" -f "$DOCKERFILE" . + + # 获取镜像大小 (MB) + SIZE_BYTES=$(docker inspect "$IMAGE_NAME" --format='{{.Size}}') + SIZE_MB=$(echo "scale=2; $SIZE_BYTES / 1024 / 1024" | bc) + + echo -e "${GREEN}√ $KEY 构建完成. 大小: ${SIZE_MB} MB${NC}" + + # 累加大小 + TOTAL_SIZE=$(echo "$TOTAL_SIZE + $SIZE_BYTES" | bc) + + # 检查单个镜像大小是否异常 (例如超过 500MB 对于 Rust 微服务来说通常是不正常的,除非包含大模型) + if (( $(echo "$SIZE_MB > 500" | bc -l) )); then + echo -e "${RED}警告: $KEY 镜像大小超过 500MB,请检查 Dockerfile 是否包含不必要的文件!${NC}" + # 这里我们可以选择暂停询问用户,或者只是警告 + fi + + echo -e "${YELLOW}>>> 正在推送 $KEY 到 Harbor ...${NC}" + docker push "$IMAGE_NAME" +done + +TOTAL_SIZE_MB=$(echo "scale=2; $TOTAL_SIZE / 1024 / 1024" | bc) +echo -e "\n${GREEN}=== 所有镜像处理完成 ===${NC}" +echo -e "${GREEN}总大小: ${TOTAL_SIZE_MB} MB${NC}" + +# 检查总大小是否超过 1GB (1024 MB) +if (( $(echo "$TOTAL_SIZE_MB > 1024" | bc -l) )); then + echo -e "${RED}警告: 总镜像大小超过 1GB,请注意远程仓库的空间限制!${NC}" +else + echo -e "${GREEN}总大小在 1GB 限制范围内。${NC}" +fi + +# 生成服务器使用的 docker-compose.server.yml +echo -e "\n${YELLOW}>>> 正在生成服务器部署文件 docker-compose.server.yml ...${NC}" + +cat > docker-compose.server.yml </dev/null || exit 1"] + interval: 5s + timeout: 5s + retries: 12 + + alphavantage-provider-service: + image: $NAMESPACE/alphavantage-provider-service:$VERSION + container_name: alphavantage-provider-service + restart: unless-stopped + environment: + SERVER_PORT: 8000 + NATS_ADDR: nats://nats:4222 + DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1 + RUST_LOG: info,axum=info + RUST_BACKTRACE: "1" + depends_on: + - nats + - data-persistence-service + networks: + - app-network + healthcheck: + test: ["CMD-SHELL", "curl -fsS http://localhost:8000/health >/dev/null || exit 1"] + interval: 5s + timeout: 5s + retries: 12 + + tushare-provider-service: + image: $NAMESPACE/tushare-provider-service:$VERSION + container_name: tushare-provider-service + restart: unless-stopped + environment: + SERVER_PORT: 8001 + NATS_ADDR: nats://nats:4222 + DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1 + TUSHARE_API_URL: http://api.waditu.com + RUST_LOG: info,axum=info + RUST_BACKTRACE: "1" + depends_on: + - nats + - data-persistence-service + networks: + - app-network + healthcheck: + test: ["CMD-SHELL", "curl -fsS http://localhost:8001/health >/dev/null || exit 1"] + interval: 5s + timeout: 5s + retries: 12 + + finnhub-provider-service: + image: $NAMESPACE/finnhub-provider-service:$VERSION + container_name: finnhub-provider-service + restart: unless-stopped + environment: + SERVER_PORT: 8002 + NATS_ADDR: nats://nats:4222 + DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1 + FINNHUB_API_URL: https://finnhub.io/api/v1 + RUST_LOG: info,axum=info + RUST_BACKTRACE: "1" + depends_on: + - nats + - data-persistence-service + networks: + - app-network + healthcheck: + test: ["CMD-SHELL", "curl -fsS http://localhost:8002/health >/dev/null || exit 1"] + interval: 5s + timeout: 5s + retries: 12 + + yfinance-provider-service: + image: $NAMESPACE/yfinance-provider-service:$VERSION + container_name: yfinance-provider-service + restart: unless-stopped + environment: + SERVER_PORT: 8003 + NATS_ADDR: nats://nats:4222 + DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1 + RUST_LOG: info,axum=info + RUST_BACKTRACE: "1" + depends_on: + - nats + - data-persistence-service + networks: + - app-network + dns: + - 8.8.8.8 + - 8.8.4.4 + healthcheck: + test: ["CMD-SHELL", "curl -fsS http://localhost:8003/health >/dev/null || exit 1"] + interval: 5s + timeout: 5s + retries: 12 + + report-generator-service: + image: $NAMESPACE/report-generator-service:$VERSION + container_name: report-generator-service + restart: unless-stopped + environment: + SERVER_PORT: 8004 + NATS_ADDR: nats://nats:4222 + DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1 + RUST_LOG: info,axum=info + RUST_BACKTRACE: "1" + depends_on: + - nats + - data-persistence-service + networks: + - app-network + healthcheck: + test: ["CMD-SHELL", "curl -fsS http://localhost:8004/health >/dev/null || exit 1"] + interval: 5s + timeout: 5s + retries: 12 + +volumes: + pgdata: + nats_data: + +networks: + app-network: +EOF + +echo -e "${GREEN}生成完成: docker-compose.server.yml${NC}" +echo -e "请将此文件复制到远程服务器,并执行: docker-compose -f docker-compose.server.yml up -d" + diff --git a/services/alphavantage-provider-service/src/mapping.rs b/services/alphavantage-provider-service/src/mapping.rs index f8074d7..513e797 100644 --- a/services/alphavantage-provider-service/src/mapping.rs +++ b/services/alphavantage-provider-service/src/mapping.rs @@ -43,6 +43,7 @@ pub fn parse_company_profile(v: Value) -> anyhow::Result { "pe_ratio": v.get("PERatio"), "beta": v.get("Beta") })), + updated_at: None, }) } diff --git a/services/alphavantage-provider-service/src/persistence.rs b/services/alphavantage-provider-service/src/persistence.rs index 6cd94bf..974a13c 100644 --- a/services/alphavantage-provider-service/src/persistence.rs +++ b/services/alphavantage-provider-service/src/persistence.rs @@ -24,6 +24,16 @@ impl PersistenceClient { } } + pub async fn get_company_profile(&self, symbol: &str) -> Result> { + let url = format!("{}/companies/{}", self.base_url, symbol); + let resp = self.client.get(&url).send().await?; + if resp.status() == reqwest::StatusCode::NOT_FOUND { + return Ok(None); + } + let profile = resp.error_for_status()?.json().await?; + Ok(Some(profile)) + } + pub async fn upsert_company_profile(&self, profile: CompanyProfileDto) -> Result<()> { let url = format!("{}/companies", self.base_url); info!("Upserting company profile for {} to {}", profile.symbol, url); diff --git a/services/alphavantage-provider-service/src/worker.rs b/services/alphavantage-provider-service/src/worker.rs index 1895474..ba65c22 100644 --- a/services/alphavantage-provider-service/src/worker.rs +++ b/services/alphavantage-provider-service/src/worker.rs @@ -47,6 +47,44 @@ pub async fn handle_fetch_command( PersistenceClient::new(state.config.data_persistence_service_url.clone()); let symbol = command.symbol.clone(); + // Check freshness + let mut is_fresh = false; + match persistence_client.get_company_profile(&command.symbol).await { + Ok(Some(p)) => { + if let Some(updated_at) = p.updated_at { + let age = chrono::Utc::now() - updated_at; + if age < chrono::Duration::hours(24) { + info!("Data for {} is fresh (age: {}h). Skipping fetch.", command.symbol, age.num_hours()); + is_fresh = true; + } + } + } + Ok(None) => {} + Err(e) => tracing::warn!("Failed to check profile freshness: {}", e), + } + + if is_fresh { + let event = FinancialsPersistedEvent { + request_id: command.request_id, + symbol: command.symbol, + years_updated: vec![], + template_id: command.template_id, + }; + let subject = "events.data.financials_persisted".to_string(); + publisher + .publish(subject, serde_json::to_vec(&event).unwrap().into()) + .await?; + + update_task_progress( + &state.tasks, + command.request_id, + 100, + "Data retrieved from cache", + ) + .await; + return Ok(()); + } + // Symbol conversion for Chinese stocks let av_symbol = if symbol.ends_with(".SH") { symbol.replace(".SH", ".SS") @@ -201,8 +239,9 @@ pub async fn handle_fetch_command( request_id: command.request_id, symbol: command.symbol, years_updated, + template_id: command.template_id, }; - let subject = "financials.persisted".to_string(); // NATS subject + let subject = "events.data.financials_persisted".to_string(); // NATS subject publisher .publish(subject, serde_json::to_vec(&event).unwrap().into()) .await?; diff --git a/services/api-gateway/src/api.rs b/services/api-gateway/src/api.rs index 20c8dc0..14fc9ff 100644 --- a/services/api-gateway/src/api.rs +++ b/services/api-gateway/src/api.rs @@ -1,7 +1,7 @@ use crate::error::Result; use crate::state::AppState; use axum::{ - extract::{Path, State}, + extract::{Path, Query, State}, http::StatusCode, response::{IntoResponse, Json}, routing::{get, post}, @@ -36,6 +36,11 @@ pub struct AnalysisRequest { pub template_id: String, } +#[derive(Deserialize)] +pub struct AnalysisResultQuery { + pub symbol: String, +} + // --- Router Definition --- pub fn create_router(app_state: AppState) -> Router { Router::new() @@ -52,7 +57,9 @@ fn create_v1_router() -> Router { "/analysis-requests/{symbol}", post(trigger_analysis_generation), ) + .route("/analysis-results", get(get_analysis_results_by_symbol)) .route("/companies/{symbol}/profile", get(get_company_profile)) + .route("/market-data/financial-statements/{symbol}", get(get_financials_by_symbol)) .route("/tasks/{request_id}", get(get_task_progress)) // --- New Config Routes --- .route( @@ -68,6 +75,7 @@ fn create_v1_router() -> Router { get(get_data_sources_config).put(update_data_sources_config), ) .route("/configs/test", post(test_data_source_config)) + .route("/configs/llm/test", post(test_llm_config)) // --- New Discover Routes --- .route("/discover-models/{provider_id}", get(discover_models)) .route("/discover-models", post(discover_models_preview)) @@ -105,6 +113,7 @@ async fn trigger_data_fetch( request_id, symbol: payload.symbol.clone(), market: payload.market, + template_id: payload.template_id.clone(), }; info!(request_id = %request_id, "Publishing data fetch command"); @@ -117,23 +126,6 @@ async fn trigger_data_fetch( ) .await?; - // If a template_id is provided, trigger the analysis generation workflow as well. - if let Some(template_id) = payload.template_id { - let analysis_command = GenerateReportCommand { - request_id, - symbol: payload.symbol, - template_id, - }; - info!(request_id = %request_id, template_id = %analysis_command.template_id, "Publishing analysis generation command (auto-triggered)"); - state - .nats_client - .publish( - ANALYSIS_COMMANDS_QUEUE.to_string(), - serde_json::to_vec(&analysis_command).unwrap().into(), - ) - .await?; - } - Ok(( StatusCode::ACCEPTED, Json(RequestAcceptedResponse { request_id }), @@ -170,6 +162,15 @@ async fn trigger_analysis_generation( )) } +/// [GET /v1/analysis-results?symbol=...] +async fn get_analysis_results_by_symbol( + State(state): State, + Query(query): Query, +) -> Result { + let results = state.persistence_client.get_analysis_results(&query.symbol).await?; + Ok(Json(results)) +} + /// [GET /v1/companies/:symbol/profile] /// Queries the persisted company profile from the data-persistence-service. async fn get_company_profile( @@ -180,6 +181,15 @@ async fn get_company_profile( Ok(Json(profile)) } +/// [GET /v1/market-data/financial-statements/:symbol] +async fn get_financials_by_symbol( + State(state): State, + Path(symbol): Path, +) -> Result { + let financials = state.persistence_client.get_financials(&symbol).await?; + Ok(Json(financials)) +} + /// [GET /v1/tasks/:request_id] /// Aggregates task progress from all downstream provider services. async fn get_task_progress( @@ -291,6 +301,42 @@ async fn test_data_source_config( } } +#[derive(Deserialize, Serialize)] +struct TestLlmConfigRequest { + api_base_url: String, + api_key: String, + model_id: String, +} + +async fn test_llm_config( + State(state): State, + Json(payload): Json, +) -> Result { + let client = reqwest::Client::new(); + let target_url = format!("{}/test-llm", state.config.report_generator_service_url.trim_end_matches('/')); + + let response = client + .post(&target_url) + .json(&payload) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response.text().await?; + return Ok(( + StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::BAD_GATEWAY), + Json(serde_json::json!({ + "error": "LLM test failed", + "details": error_text, + })), + ).into_response()); + } + + let response_json: serde_json::Value = response.json().await?; + Ok((StatusCode::OK, Json(response_json)).into_response()) +} + // --- Config API Handlers (Proxy to data-persistence-service) --- diff --git a/services/api-gateway/src/config.rs b/services/api-gateway/src/config.rs index 12b5f48..cd7f2ca 100644 --- a/services/api-gateway/src/config.rs +++ b/services/api-gateway/src/config.rs @@ -6,6 +6,7 @@ pub struct AppConfig { pub server_port: u16, pub nats_addr: String, pub data_persistence_service_url: String, + pub report_generator_service_url: String, pub provider_services: Vec, } @@ -18,6 +19,8 @@ impl AppConfig { let server_port: u16 = cfg.get::("server_port")?; let nats_addr: String = cfg.get::("nats_addr")?; let data_persistence_service_url: String = cfg.get::("data_persistence_service_url")?; + let report_generator_service_url: String = cfg.get::("report_generator_service_url") + .unwrap_or_else(|_| "http://report-generator-service:8004".to_string()); // Parse provider_services deterministically: // 1) prefer array from env (e.g., PROVIDER_SERVICES__0, PROVIDER_SERVICES__1, ...) @@ -45,6 +48,7 @@ impl AppConfig { server_port, nats_addr, data_persistence_service_url, + report_generator_service_url, provider_services, }) } diff --git a/services/api-gateway/src/persistence.rs b/services/api-gateway/src/persistence.rs index 92683ee..26bf202 100644 --- a/services/api-gateway/src/persistence.rs +++ b/services/api-gateway/src/persistence.rs @@ -3,7 +3,7 @@ //! use crate::error::Result; -use common_contracts::dtos::CompanyProfileDto; +use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto}; use common_contracts::config_models::{LlmProvidersConfig, DataSourcesConfig, AnalysisTemplateSets}; #[derive(Clone)] @@ -33,6 +33,32 @@ impl PersistenceClient { Ok(profile) } + pub async fn get_financials(&self, symbol: &str) -> Result> { + let url = format!("{}/market-data/financial-statements/{}", self.base_url, symbol); + let financials = self + .client + .get(&url) + .send() + .await? + .error_for_status()? + .json::>() + .await?; + Ok(financials) + } + + pub async fn get_analysis_results(&self, symbol: &str) -> Result> { + let url = format!("{}/analysis-results?symbol={}", self.base_url, symbol); + let results = self + .client + .get(&url) + .send() + .await? + .error_for_status()? + .json::>() + .await?; + Ok(results) + } + // --- Config Methods --- pub async fn get_llm_providers_config(&self) -> Result { diff --git a/services/common-contracts/src/dtos.rs b/services/common-contracts/src/dtos.rs index 00dbe6a..9bc81eb 100644 --- a/services/common-contracts/src/dtos.rs +++ b/services/common-contracts/src/dtos.rs @@ -11,6 +11,7 @@ pub struct CompanyProfileDto { pub industry: Option, pub list_date: Option, pub additional_info: Option, + pub updated_at: Option>, } // Market Data API DTOs @@ -62,7 +63,7 @@ pub struct NewAnalysisResult { /// Represents a persisted analysis result read from the database. #[api_dto] pub struct AnalysisResultDto { - pub id: i64, + pub id: Uuid, pub request_id: Uuid, pub symbol: String, pub template_id: String, diff --git a/services/common-contracts/src/messages.rs b/services/common-contracts/src/messages.rs index 10cb83a..ccc5b1f 100644 --- a/services/common-contracts/src/messages.rs +++ b/services/common-contracts/src/messages.rs @@ -10,6 +10,7 @@ pub struct FetchCompanyDataCommand { pub request_id: Uuid, pub symbol: String, pub market: String, + pub template_id: Option, // Optional trigger for analysis } /// Command to start a full report generation workflow. @@ -35,6 +36,7 @@ pub struct FinancialsPersistedEvent { pub request_id: Uuid, pub symbol: String, pub years_updated: Vec, + pub template_id: Option, // Pass-through for analysis trigger } diff --git a/services/data-persistence-service/src/api/analysis.rs b/services/data-persistence-service/src/api/analysis.rs index 213611f..a2666e6 100644 --- a/services/data-persistence-service/src/api/analysis.rs +++ b/services/data-persistence-service/src/api/analysis.rs @@ -8,10 +8,11 @@ use axum::{ }; use common_contracts::dtos::{AnalysisResultDto, NewAnalysisResult}; use serde::Deserialize; -use tracing::instrument; +use service_kit::api; +use tracing::{instrument, error}; use anyhow::Error as AnyhowError; -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, utoipa::IntoParams, utoipa::ToSchema)] pub struct AnalysisQuery { pub symbol: String, pub module_id: Option, @@ -19,15 +20,18 @@ pub struct AnalysisQuery { /// Creates a new analysis result and returns the created record. #[instrument(skip(state, payload), fields(request_id = %payload.request_id, symbol = %payload.symbol, module_id = %payload.module_id))] +#[api(POST, "/api/v1/analysis-results")] pub async fn create_analysis_result( State(state): State, Json(payload): Json, ) -> Result { + // Use explicit column names to avoid issues if DB schema and struct are slightly out of sync + // Also ensure we are returning all fields needed by AnalysisResult let result = sqlx::query_as::<_, AnalysisResult>( r#" INSERT INTO analysis_results (request_id, symbol, template_id, module_id, content, meta_data) VALUES ($1, $2, $3, $4, $5, $6) - RETURNING * + RETURNING id, request_id, symbol, template_id, module_id, content, meta_data, created_at "# ) .bind(&payload.request_id) @@ -38,7 +42,10 @@ pub async fn create_analysis_result( .bind(&payload.meta_data) .fetch_one(state.pool()) .await - .map_err(AnyhowError::from)?; + .map_err(|e| { + error!("Database error inserting analysis result: {}", e); + AnyhowError::from(e) + })?; let dto = AnalysisResultDto { id: result.id, @@ -56,21 +63,46 @@ pub async fn create_analysis_result( /// Retrieves all analysis results for a given symbol. #[instrument(skip(state))] +#[api(GET, "/api/v1/analysis-results", output(list = "AnalysisResultDto"))] pub async fn get_analysis_results( State(state): State, Query(query): Query, ) -> Result>, ServerError> { - let results = sqlx::query_as::<_, AnalysisResult>( - r#" - SELECT * FROM analysis_results - WHERE symbol = $1 - ORDER BY created_at DESC - "# - ) - .bind(&query.symbol) - .fetch_all(state.pool()) - .await - .map_err(AnyhowError::from)?; + // Use string replacement for module_id to avoid lifetime issues with query_builder + // This is safe because we're not interpolating user input directly into the SQL structure, just deciding whether to add a clause. + // However, binding parameters is better. The issue with previous code was lifetime of temporary values. + + let results = if let Some(mid) = &query.module_id { + sqlx::query_as::<_, AnalysisResult>( + r#" + SELECT id, request_id, symbol, template_id, module_id, content, meta_data, created_at + FROM analysis_results + WHERE symbol = $1 AND module_id = $2 + ORDER BY created_at DESC + "# + ) + .bind(&query.symbol) + .bind(mid) + .fetch_all(state.pool()) + .await + } else { + sqlx::query_as::<_, AnalysisResult>( + r#" + SELECT id, request_id, symbol, template_id, module_id, content, meta_data, created_at + FROM analysis_results + WHERE symbol = $1 + ORDER BY created_at DESC + "# + ) + .bind(&query.symbol) + .fetch_all(state.pool()) + .await + }; + + let results = results.map_err(|e| { + error!("Database error fetching analysis results: {}", e); + AnyhowError::from(e) + })?; let dtos = results .into_iter() @@ -89,22 +121,31 @@ pub async fn get_analysis_results( Ok(Json(dtos)) } +use uuid::Uuid; + /// Retrieves a single analysis result by its primary ID. #[instrument(skip(state))] +#[api(GET, "/api/v1/analysis-results/{id}", output(detail = "AnalysisResultDto"))] pub async fn get_analysis_result_by_id( State(state): State, - Path(id): Path, + Path(id_str): Path, ) -> Result, ServerError> { + let id = Uuid::parse_str(&id_str).map_err(|_| ServerError::NotFound(format!("Invalid UUID: {}", id_str)))?; + let result = sqlx::query_as::<_, AnalysisResult>( r#" - SELECT * FROM analysis_results + SELECT id, request_id, symbol, template_id, module_id, content, meta_data, created_at + FROM analysis_results WHERE id = $1 "# ) .bind(&id) .fetch_one(state.pool()) .await - .map_err(AnyhowError::from)?; + .map_err(|e| { + error!("Database error fetching analysis result by id: {}", e); + AnyhowError::from(e) + })?; let dto = AnalysisResultDto { id: result.id, diff --git a/services/data-persistence-service/src/api/companies.rs b/services/data-persistence-service/src/api/companies.rs index cd48907..f3176c6 100644 --- a/services/data-persistence-service/src/api/companies.rs +++ b/services/data-persistence-service/src/api/companies.rs @@ -40,6 +40,7 @@ pub async fn get_company_by_symbol( industry: company.industry, list_date: company.list_date, additional_info: company.additional_info, + updated_at: Some(company.updated_at), }; info!(target: "api", symbol = %dto.symbol, "get_company_by_symbol completed"); diff --git a/services/data-persistence-service/src/api/configs.rs b/services/data-persistence-service/src/api/configs.rs index af82ab0..234db79 100644 --- a/services/data-persistence-service/src/api/configs.rs +++ b/services/data-persistence-service/src/api/configs.rs @@ -1,7 +1,6 @@ use axum::{extract::State, Json}; use common_contracts::config_models::{AnalysisTemplateSets, DataSourcesConfig, LlmProvidersConfig}; use service_kit::api; -use tracing::instrument; use crate::{db::system_config, AppState, ServerError}; #[api(GET, "/api/v1/configs/llm_providers", output(detail = "LlmProvidersConfig"))] diff --git a/services/data-persistence-service/src/api/market_data.rs b/services/data-persistence-service/src/api/market_data.rs index 9bb651f..105a3af 100644 --- a/services/data-persistence-service/src/api/market_data.rs +++ b/services/data-persistence-service/src/api/market_data.rs @@ -29,7 +29,7 @@ pub async fn batch_insert_financials( Ok(axum::http::StatusCode::CREATED) } -#[api(GET, "/api/v1/market-data/financials/{symbol}", output(list = "TimeSeriesFinancialDto"))] +#[api(GET, "/api/v1/market-data/financial-statements/{symbol}", output(list = "TimeSeriesFinancialDto"))] pub async fn get_financials_by_symbol( State(state): State, Path(symbol): Path, diff --git a/services/data-persistence-service/src/models.rs b/services/data-persistence-service/src/models.rs index 03b003f..73cfe3d 100644 --- a/services/data-persistence-service/src/models.rs +++ b/services/data-persistence-service/src/models.rs @@ -14,9 +14,11 @@ pub struct SystemConfig { #[derive(Debug, Clone, Serialize, sqlx::FromRow)] pub struct AnalysisResult { - pub id: i64, + pub id: Uuid, + #[sqlx(default)] // request_id is missing in some schema versions, handle gracefully or ensure migration runs pub request_id: Uuid, pub symbol: String, + // template_id/module_id might be missing if schema is very old, but we rely on migrations pub template_id: String, pub module_id: String, pub content: String, diff --git a/services/data-persistence-service/src/seeding.rs b/services/data-persistence-service/src/seeding.rs index ddc0bee..5594b0d 100644 --- a/services/data-persistence-service/src/seeding.rs +++ b/services/data-persistence-service/src/seeding.rs @@ -1,6 +1,5 @@ //! One-time data seeding logic for initializing the database. -use data_persistence_service::models::SystemConfig; use common_contracts::config_models::{AnalysisModuleConfig, AnalysisTemplateSet, AnalysisTemplateSets}; use sqlx::PgPool; use std::collections::HashMap; diff --git a/services/data-persistence-service/tests/api_tests.rs b/services/data-persistence-service/tests/api_tests.rs index aa85ee9..1df46da 100644 --- a/services/data-persistence-service/tests/api_tests.rs +++ b/services/data-persistence-service/tests/api_tests.rs @@ -33,6 +33,7 @@ async fn test_api_upsert_and_get_company(pool: PgPool) { industry: Some("API Testing".to_string()), list_date: Some(chrono::NaiveDate::from_ymd_opt(2025, 1, 1).unwrap()), additional_info: None, + updated_at: None, }; let request = Request::builder() diff --git a/services/data-persistence-service/tests/db_tests.rs b/services/data-persistence-service/tests/db_tests.rs index db775d0..832b4c6 100644 --- a/services/data-persistence-service/tests/db_tests.rs +++ b/services/data-persistence-service/tests/db_tests.rs @@ -43,6 +43,7 @@ async fn test_upsert_and_get_company(pool: PgPool) { industry: Some("Testing".to_string()), list_date: Some(chrono::NaiveDate::from_ymd_opt(2024, 1, 1).unwrap()), additional_info: Some(serde_json::json!({ "ceo": "John Doe" })), + updated_at: None, }; // 2. Act: Call the upsert function diff --git a/services/finnhub-provider-service/src/mapping.rs b/services/finnhub-provider-service/src/mapping.rs index a6db965..6281002 100644 --- a/services/finnhub-provider-service/src/mapping.rs +++ b/services/finnhub-provider-service/src/mapping.rs @@ -22,6 +22,7 @@ pub fn map_profile_dto(profile_raw: &FinnhubProfile, symbol: &str) -> Result Router { Router::new() .route("/health", get(health_check)) .route("/tasks", get(get_current_tasks)) + .route("/test-llm", post(test_llm_connection)) .with_state(app_state) } @@ -41,3 +46,26 @@ async fn get_current_tasks(State(state): State) -> Json, + Json(payload): Json, +) -> Result, (StatusCode, String)> { + let client = LlmClient::new( + payload.api_base_url, + SecretString::from(payload.api_key), + payload.model_id, + ); + + let response = client.generate_text("Hello".to_string()).await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; + + Ok(Json(response)) +} diff --git a/services/report-generator-service/src/llm_client.rs b/services/report-generator-service/src/llm_client.rs index 8f4adde..716bff1 100644 --- a/services/report-generator-service/src/llm_client.rs +++ b/services/report-generator-service/src/llm_client.rs @@ -1,67 +1,78 @@ use crate::error::ProviderError; -use reqwest::Client; +use async_openai::{ + config::OpenAIConfig, + types::{CreateChatCompletionRequestArgs, ChatCompletionRequestUserMessageArgs}, + Client, +}; use secrecy::{ExposeSecret, SecretString}; -use serde::Serialize; +use tracing::debug; #[derive(Clone)] pub struct LlmClient { - client: Client, - api_url: String, - api_key: SecretString, + client: Client, model: String, } -#[derive(Serialize)] -struct LlmRequest { - model: String, - prompt: String, -} - - impl LlmClient { pub fn new(api_url: String, api_key: SecretString, model: String) -> Self { + let api_url = api_url.trim(); + // async_openai expects the base URL to NOT include /chat/completions + // It usually expects something like "https://api.openai.com/v1" + // If the user provided a full URL like ".../chat/completions", we should strip it. + + let base_url = if api_url.ends_with("/chat/completions") { + api_url.trim_end_matches("/chat/completions").trim_end_matches('/').to_string() + } else if api_url.ends_with("/completions") { + api_url.trim_end_matches("/completions").trim_end_matches('/').to_string() + } else { + api_url.trim_end_matches('/').to_string() + }; + + debug!("Initializing LlmClient with base_url: {}", base_url); + + let config = OpenAIConfig::new() + .with_api_base(base_url) + .with_api_key(api_key.expose_secret()); + + let client = Client::with_config(config); + Self { - client: Client::new(), - api_url, - api_key, + client, model, } } pub async fn generate_text(&self, prompt: String) -> Result { - let request_payload = LlmRequest { - model: self.model.clone(), - prompt, - }; + debug!("Sending request to LLM model: {}", self.model); - let res = self - .client - .post(&self.api_url) - .bearer_auth(self.api_key.expose_secret()) - .json(&request_payload) - .send() - .await?; + let request = CreateChatCompletionRequestArgs::default() + .model(&self.model) + .messages([ + ChatCompletionRequestUserMessageArgs::default() + .content(prompt) + .build() + .map_err(|e| ProviderError::LlmApi(format!("Failed to build message: {}", e)))? + .into() + ]) + .build() + .map_err(|e| ProviderError::LlmApi(format!("Failed to build request: {}", e)))?; - if !res.status().is_success() { - let status = res.status(); - let error_text = res - .text() - .await - .unwrap_or_else(|_| "Unknown LLM API error".to_string()); - return Err(ProviderError::LlmApi(format!( - "LLM API request failed with status {}: {}", - status, - error_text - ))); + let response = self.client.chat().create(request).await + .map_err(|e| { + let err_msg = e.to_string(); + if err_msg.contains("") || err_msg.contains(" Result<(), anyhow::Error> { let mut subscriber = nats_client.subscribe(SUBJECT_NAME.to_string()).await?; info!( - "Consumer started, waiting for commands on subject '{}'", + "Consumer started, waiting for events on subject '{}'", SUBJECT_NAME ); while let Some(message) = subscriber.next().await { - info!("Received NATS command to generate report."); + info!("Received NATS event for persisted financials."); let state_clone = app_state.clone(); tokio::spawn(async move { - match serde_json::from_slice::(&message.payload) { - Ok(command) => { - info!( - "Deserialized command for symbol: {}, template: {}", - command.symbol, command.template_id - ); - if let Err(e) = run_report_generation_workflow(Arc::new(state_clone), command).await - { - error!("Error running report generation workflow: {:?}", e); + match serde_json::from_slice::(&message.payload) { + Ok(event) => { + if let Some(template_id) = event.template_id { + info!( + "Event triggered analysis for symbol: {}, template: {}", + event.symbol, template_id + ); + let command = GenerateReportCommand { + request_id: event.request_id, + symbol: event.symbol, + template_id, + }; + if let Err(e) = run_report_generation_workflow(Arc::new(state_clone), command).await + { + error!("Error running report generation workflow: {:?}", e); + } + } else { + info!("Received data persisted event for {} but no template_id provided. Skipping analysis.", event.symbol); } } Err(e) => { - error!("Failed to deserialize GenerateReportCommand: {}", e); + error!("Failed to deserialize FinancialsPersistedEvent: {}", e); } } }); diff --git a/services/report-generator-service/src/worker.rs b/services/report-generator-service/src/worker.rs index a10eb83..65c40b2 100644 --- a/services/report-generator-service/src/worker.rs +++ b/services/report-generator-service/src/worker.rs @@ -103,7 +103,27 @@ pub async fn run_report_generation_workflow( Err(e) => { let err_msg = format!("LLM generation failed: {}", e); error!(module_id = %module_id, "{}", err_msg); - generated_results.insert(module_id.clone(), format!("Error: {}", err_msg)); + let error_content = format!("Error: {}", err_msg); + + // Persist error result so frontend knows it failed + let result_to_persist = NewAnalysisResult { + request_id: command.request_id, + symbol: command.symbol.clone(), + template_id: command.template_id.clone(), + module_id: module_id.clone(), + content: error_content.clone(), + meta_data: serde_json::json!({ + "model_id": module_config.model_id, + "status": "error", + "error": err_msg + }), + }; + + if let Err(pe) = persistence_client.create_analysis_result(result_to_persist).await { + error!(module_id = %module_id, "Failed to persist analysis error result: {}", pe); + } + + generated_results.insert(module_id.clone(), error_content); continue; } }; @@ -116,7 +136,10 @@ pub async fn run_report_generation_workflow( template_id: command.template_id.clone(), module_id: module_id.clone(), content: content.clone(), - meta_data: serde_json::json!({ "model_id": module_config.model_id }), + meta_data: serde_json::json!({ + "model_id": module_config.model_id, + "status": "success" + }), }; if let Err(e) = persistence_client.create_analysis_result(result_to_persist).await { @@ -186,8 +209,11 @@ fn create_llm_client_for_module( )) })?; + let api_url = provider.api_base_url.clone(); + info!("Creating LLM client for module '{}' using provider '{}' with URL: '{}'", module_config.name, module_config.provider_id, api_url); + Ok(LlmClient::new( - provider.api_base_url.clone(), + api_url, provider.api_key.clone().into(), module_config.model_id.clone(), )) diff --git a/services/tushare-provider-service/src/persistence.rs b/services/tushare-provider-service/src/persistence.rs index 6cd94bf..974a13c 100644 --- a/services/tushare-provider-service/src/persistence.rs +++ b/services/tushare-provider-service/src/persistence.rs @@ -24,6 +24,16 @@ impl PersistenceClient { } } + pub async fn get_company_profile(&self, symbol: &str) -> Result> { + let url = format!("{}/companies/{}", self.base_url, symbol); + let resp = self.client.get(&url).send().await?; + if resp.status() == reqwest::StatusCode::NOT_FOUND { + return Ok(None); + } + let profile = resp.error_for_status()?.json().await?; + Ok(Some(profile)) + } + pub async fn upsert_company_profile(&self, profile: CompanyProfileDto) -> Result<()> { let url = format!("{}/companies", self.base_url); info!("Upserting company profile for {} to {}", profile.symbol, url); diff --git a/services/tushare-provider-service/src/tushare.rs b/services/tushare-provider-service/src/tushare.rs index 662930d..808781d 100644 --- a/services/tushare-provider-service/src/tushare.rs +++ b/services/tushare-provider-service/src/tushare.rs @@ -72,6 +72,7 @@ impl TushareDataProvider { industry, list_date, additional_info: None, + updated_at: None, }; // Map time-series financials into DTOs diff --git a/services/tushare-provider-service/src/worker.rs b/services/tushare-provider-service/src/worker.rs index d30823d..6334188 100644 --- a/services/tushare-provider-service/src/worker.rs +++ b/services/tushare-provider-service/src/worker.rs @@ -36,7 +36,66 @@ pub async fn run_tushare_workflow( let mut entry = state .tasks .get_mut(&task_id) + .ok_or_else(|| AppError::Internal("Task not found".to_string()))?; + entry.status = "CheckingCache".to_string(); + entry.progress_percent = 5; + entry.details = "Checking local data freshness".to_string(); + } + + // Check freshness + let persistence_client = PersistenceClient::new(state.config.data_persistence_service_url.clone()); + let mut is_fresh = false; + match persistence_client.get_company_profile(&command.symbol).await { + Ok(Some(p)) => { + if let Some(updated_at) = p.updated_at { + let age = chrono::Utc::now() - updated_at; + if age < chrono::Duration::hours(24) { + info!("Data for {} is fresh (age: {}h). Skipping fetch.", command.symbol, age.num_hours()); + is_fresh = true; + } + } + } + Ok(None) => {} + Err(e) => tracing::warn!("Failed to check profile freshness: {}", e), + } + + if is_fresh { + { + let mut entry = state + .tasks + .get_mut(&task_id) .ok_or_else(|| AppError::Internal("Task not found".to_string()))?; + entry.status = "Completed".to_string(); + entry.progress_percent = 100; + entry.details = "Data retrieved from cache".to_string(); + } + + let nats_client = async_nats::connect(&state.config.nats_addr) + .await + .map_err(|e| AppError::Internal(format!("NATS connection failed: {}", e)))?; + + let financials_event = FinancialsPersistedEvent { + request_id: command.request_id, + symbol: command.symbol.clone(), + years_updated: vec![], + template_id: command.template_id.clone(), + }; + nats_client + .publish( + "events.data.financials_persisted", + serde_json::to_vec(&financials_event).unwrap().into(), + ) + .await?; + + let _ = completion_tx.send(()).await; + return Ok(()); + } + + { + let mut entry = state + .tasks + .get_mut(&task_id) + .ok_or_else(|| AppError::Internal("Task not found".to_string()))?; entry.status = "FetchingData".to_string(); entry.progress_percent = 10; entry.details = "Starting data fetch from Tushare".to_string(); @@ -57,7 +116,7 @@ pub async fn run_tushare_workflow( } // 4. Persist data - let persistence_client = PersistenceClient::new(state.config.data_persistence_service_url.clone()); + // persistence_client already created above persist_data( &persistence_client, &profile, @@ -158,6 +217,7 @@ async fn publish_events( request_id: command.request_id, symbol: command.symbol.clone(), years_updated: years.into_iter().collect(), + template_id: command.template_id.clone(), }; nats_client .publish( diff --git a/services/yfinance-provider-service/src/mapping.rs b/services/yfinance-provider-service/src/mapping.rs index 83cd158..9b6c1d2 100644 --- a/services/yfinance-provider-service/src/mapping.rs +++ b/services/yfinance-provider-service/src/mapping.rs @@ -41,6 +41,7 @@ pub fn map_profile(summary_json: &Value, symbol: &str) -> Result Result> { + let url = format!("{}/companies/{}", self.base_url, symbol); + let resp = self.client.get(&url).send().await?; + if resp.status() == reqwest::StatusCode::NOT_FOUND { + return Ok(None); + } + let profile = resp.error_for_status()?.json().await?; + Ok(Some(profile)) + } + pub async fn upsert_company_profile(&self, profile: CompanyProfileDto) -> Result<()> { let url = format!("{}/companies", self.base_url); info!("Upserting company profile for {} to {}", profile.symbol, url); diff --git a/services/yfinance-provider-service/src/worker.rs b/services/yfinance-provider-service/src/worker.rs index 7ff4956..0b44dff 100644 --- a/services/yfinance-provider-service/src/worker.rs +++ b/services/yfinance-provider-service/src/worker.rs @@ -30,12 +30,56 @@ pub async fn handle_fetch_command( state.tasks.insert(task_id, common_contracts::observability::TaskProgress { request_id: task_id, task_name: format!("yfinance:{}", command.symbol), - status: "FetchingData".to_string(), - progress_percent: 10, - details: "Fetching data from YFinance".to_string(), + status: "CheckingCache".to_string(), + progress_percent: 5, + details: "Checking local data freshness".to_string(), started_at: chrono::Utc::now(), }); + // Check freshness + let mut is_fresh = false; + match client.get_company_profile(&command.symbol).await { + Ok(Some(p)) => { + if let Some(updated_at) = p.updated_at { + let age = chrono::Utc::now() - updated_at; + if age < chrono::Duration::hours(24) { + info!("Data for {} is fresh (age: {}h). Skipping fetch.", command.symbol, age.num_hours()); + is_fresh = true; + } + } + } + Ok(None) => {} + Err(e) => tracing::warn!("Failed to check profile freshness: {}", e), + } + + if is_fresh { + if let Some(mut task) = state.tasks.get_mut(&task_id) { + task.status = "Completed".to_string(); + task.progress_percent = 100; + task.details = "Data retrieved from cache".to_string(); + } + + let financials_event = FinancialsPersistedEvent { + request_id: task_id, + symbol: command.symbol.clone(), + years_updated: vec![], + template_id: command.template_id.clone(), + }; + publisher + .publish( + "events.data.financials_persisted".to_string(), + serde_json::to_vec(&financials_event).unwrap().into(), + ) + .await?; + return Ok(()); + } + + if let Some(mut task) = state.tasks.get_mut(&task_id) { + task.status = "FetchingData".to_string(); + task.progress_percent = 10; + task.details = "Fetching data from YFinance".to_string(); + } + // Fetch let (profile, financials): (CompanyProfileDto, Vec) = state.yfinance_provider.fetch_all_data(&command.symbol).await?; @@ -69,6 +113,7 @@ pub async fn handle_fetch_command( request_id: task_id, symbol: command.symbol.clone(), years_updated: years.into_iter().collect(), + template_id: command.template_id.clone(), }; publisher .publish(