diff --git a/docker-compose.yml b/docker-compose.yml index 6c4ecbc..bf8ca43 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -67,7 +67,8 @@ services: ports: - "13001:3001" depends_on: - - api-gateway + api-gateway: + condition: service_healthy networks: - app-network @@ -95,6 +96,11 @@ services: - yfinance-provider-service networks: - app-network + healthcheck: + test: ["CMD-SHELL", "curl -fsS http://localhost:4000/health >/dev/null || exit 1"] + interval: 5s + timeout: 5s + retries: 12 alphavantage-provider-service: build: @@ -105,7 +111,6 @@ services: SERVER_PORT: 8000 NATS_ADDR: nats://nats:4222 DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1 - ALPHAVANTAGE_API_KEY: ${ALPHAVANTAGE_API_KEY} RUST_LOG: info,axum=info RUST_BACKTRACE: "1" depends_on: @@ -129,8 +134,6 @@ services: NATS_ADDR: nats://nats:4222 DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1 TUSHARE_API_URL: http://api.waditu.com - # Please provide your Tushare token via .env - TUSHARE_API_TOKEN: ${TUSHARE_API_TOKEN} RUST_LOG: info,axum=info RUST_BACKTRACE: "1" depends_on: @@ -154,8 +157,6 @@ services: NATS_ADDR: nats://nats:4222 DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1 FINNHUB_API_URL: https://finnhub.io/api/v1 - # Please provide your Finnhub token in .env file - FINNHUB_API_KEY: ${FINNHUB_API_KEY} RUST_LOG: info,axum=info RUST_BACKTRACE: "1" depends_on: diff --git a/docs/2_architecture/20251116_[Active]_system_architecture.md b/docs/2_architecture/20251116_[Active]_system_architecture.md index 5df2241..3ff7704 100644 --- a/docs/2_architecture/20251116_[Active]_system_architecture.md +++ b/docs/2_architecture/20251116_[Active]_system_architecture.md @@ -29,8 +29,8 @@ owner: '@lv' ``` +-------------+ +------------------+ +---------------------------+ | | HTTP | | | | -| 前端 |----->| API 网关 |----->| 消息总线 | -| (Next.js) | | (Rust) | | (例如 RabbitMQ, NATS) | +| 前端 |----->| API 网关 |----->| 消息总线 (NATS) | +| (Next.js) | | (Rust) | | | | | | | | | +-------------+ +-------+----------+ +-------------+-------------+ | | @@ -69,7 +69,7 @@ owner: '@lv' - 为所有其他内部微服务提供稳定、统一的数据库读写 HTTP 接口。 - **消息总线 (Message Bus)**: - - 整个系统的神经中枢,负责所有服务间的异步通信。 + - 整个系统的神经中枢,负责所有服务间的异步通信。当前选用 **NATS** 作为具体实现。 ## 3. `SystemModule` 核心规范 diff --git a/docs/3_project_management/tasks/pending/20251116_[Active]_refactor_llm_provider_architecture.md b/docs/3_project_management/tasks/completed/20251116_[Active]_refactor_llm_provider_architecture.md similarity index 100% rename from docs/3_project_management/tasks/pending/20251116_[Active]_refactor_llm_provider_architecture.md rename to docs/3_project_management/tasks/completed/20251116_[Active]_refactor_llm_provider_architecture.md diff --git a/docs/3_project_management/tasks/completed/20251117_[Active]_refactor_configuration_management.md b/docs/3_project_management/tasks/completed/20251117_[Active]_refactor_configuration_management.md new file mode 100644 index 0000000..49f75d2 --- /dev/null +++ b/docs/3_project_management/tasks/completed/20251117_[Active]_refactor_configuration_management.md @@ -0,0 +1,98 @@ +# 任务文档:配置管理重构——统一API凭证管理 + +- **状态**: Active +- **创建日期**: 2025-11-17 +- **负责人**: @AI-Assistant +- **审查人**: @lv + +--- + +## 1. 背景与目标 + +### 1.1. 当前问题 + +当前系统对外部服务(如 Tushare, Finnhub)API Token 的管理方式存在两个主要问题: + +1. **配置方式分裂**: + - **敏感凭证 (API Tokens)**: 通过启动时的**环境变量**注入。这种方式虽然安全,但缺乏灵活性,每次修改都需要重新部署或重启服务。 + - **业务逻辑配置 (AI模型选择等)**: 通过**数据库**统一管理,并支持UI动态调整。 + - 这种分裂的管理模式增加了系统的运维复杂性,与我们追求的“单一事实源”架构理念不符。 + +2. **服务韧性不足**: + - 依赖环境变量的服务采取“快速失败” (Fail-Fast) 策略。如果启动时未提供有效的 API Token,服务会立即崩溃退出。 + - 这种模式虽然能尽早暴露问题,但在一个动态的、持续运行的系统中显得过于“僵硬”。我们期望的行为是:服务在缺少非核心配置时,应能进入一个“降级”状态,待配置就绪后再自动恢复工作,而不是直接停止运行。 + +### 1.2. 改造目标 + +本次重构旨在将所有外部服务的 API Token 配置,从环境变量迁移到数据库中,实现与业务逻辑配置的统一管理。具体目标如下: + +- **统一配置源**: 将 `system_config` 数据库表作为所有可变配置(包括API Tokens)的唯一事实源。 +- **提升易用性**: 允许用户通过前端UI界面,集中管理和更新所有数据源的 API Token。 +- **增强服务韧性**: 改造数据提供商服务,使其在缺少 API Token 时不会崩溃,而是进入“降级模式”,并能在 Token 被提供后自动恢复正常工作。 +- **简化部署**: 移除对多个环境变量的依赖,使服务的部署和运维过程更加简洁。 + +--- + +## 2. 实施方案 + +本次改造将遵循“后端 -> 服务 -> 前端”的顺序分层实施,确保每一步都有坚实的基础。 + +### 2.1. 数据模型与持久化层 + +我们将通过复用 `system_config` 表中现有的 `(config_key, config_value)` 存储模式,来扩展配置管理的能力,使其能够安全地存储和检索数据源的配置。 + +1. **定义数据结构**: 在 `common-contracts` 共享库中,定义一个清晰的、用于描述数据源配置的 `DataSourceConfig` 结构体。它将包含 `provider_id`, `api_token`, `api_url` 等字段。 +2. **复用现有表结构**: 我们将向 `system_config` 表中插入一条新的记录,其 `config_key` 固定为 `"data_sources"`,并将所有数据源的配置集合(一个 `HashMap`)序列化后存入该记录的 `config_value` 字段中。 +3. **扩展API**: 在 `data-persistence-service` 中增加新的 HTTP API 端点,用于对数据源配置进行增、删、改、查(CRUD)操作。例如: + - `GET /api/v1/configs/data-sources`: 获取所有数据源的配置列表。 + - `PUT /api/v1/configs/data-sources`: 创建或更新所有数据源的配置。 + +### 2.2. 微服务改造:引入“降级与恢复”模式 + +这是本次重构的核心。所有依赖外部 API Token 的数据提供商服务 (`finnhub`, `tushare`, `alphavantage`) 都将进行如下改造: + +1. **移除启动时检查**: 删除 `config.rs` 中检查环境变量并导致程序崩溃的逻辑。 +2. **引入内部状态机**: 每个服务内部将维护一个状态(例如 `State`),包含 `Active` 和 `Degraded(reason: String)` 两种状态。 +3. **动态配置加载**: 服务将不再从环境变量读取 Token,而是在内部启动一个**后台任务**(轮询器),该任务会: + - 在服务启动时,以及之后每隔一段时间(例如 60 秒),调用 `data-persistence-service` 的新 API 来获取自己的配置。 + - 如果成功获取到有效的 Token,则更新服务内部的 API 客户端,并将服务状态设置为 `Active`。此时,服务正常订阅和处理来自 NATS 的消息。 + - 如果未能获取 Token(或 Token 为空),则将服务状态设置为 `Degraded`,并附上原因(如 "API Token not configured")。在这种状态下,服务**不会**订阅 NATS 消息队列,避免接收无法处理的任务。 +4. **更新健康检查**: 服务的 `/health` 端点将反映其内部状态。当处于 `Degraded` 状态时,健康检查接口应返回相应的状态码和信息,以便监控系统能够清晰地了解服务当前是否可用。 + +### 2.3. 前端UI实现 + +为了让用户能够方便地管理这些配置,我们将在前端进行如下调整: + +1. **创建新UI组件**: 在 `/config` 页面,新增一个名为“数据源配置”的管理面板。 +2. **功能实现**: 该面板将提供一个表单或列表,允许用户: + - 查看当前所有数据源(Tushare, Finnhub 等)的配置状态。 + - 为每个数据源输入或更新其 API Token。 + - 保存更改。点击保存后,前端将调用 `data-persistence-service` 的新 API,将更新后的配置持久化到数据库中。 + +--- + +## 3. 详细任务清单 + +### 第一阶段:后端基础 + +- [x] ~~**任务 BE-1**: 在 `common-contracts` 中定义 `DataSourceConfig` 和 `DataSourceProvider` 等共享数据结构。~~ +- [x] ~~**任务 BE-3**: 在 `data-persistence-service` 中实现对数据源配置的 CRUD 业务逻辑。~~ +- [x] ~~**任务 BE-4**: 在 `data-persistence-service` 中暴露 `GET /api/v1/configs/data-sources` 和 `PUT /api/v1/configs/data-sources` 这两个 API 端点。~~ + +### 第二阶段:微服务改造 + +- [x] ~~**任务 SVC-1**: **(Finnhub)** 重构 `finnhub-provider-service`:~~ + - [x] ~~移除 `config.rs` 中的 `FINNHUB_API_KEY` 环境变量加载逻辑。~~ + - [x] ~~实现内部状态机 (`Active`/`Degraded`) 和动态配置轮询器。~~ + - [x] ~~修改 `/health` 端点以反映内部状态。~~ + - [x] ~~调整 NATS 消息订阅逻辑,只在 `Active` 状态下进行订阅。~~ +- [x] ~~**任务 SVC-2**: **(Tushare)** 以 `finnhub-provider-service` 为模板,对 `tushare-provider-service` 进行相同的重构。~~ +- [x] ~~**任务 SVC-3**: **(Alphavantage)** 以 `finnhub-provider-service` 为模板,对 `alphavantage-provider-service` 进行相同的重构。~~ +- [x] ~~**任务 SVC-4**: **(审查)** 审查 `report-generator-service` 的 LLM 配置加载逻辑,确保其与新的动态配置模式在设计理念上保持一致。~~ + +### 第三阶段:前端实现 + +- [x] **任务 FE-1**: 在 `/config` 页面设计并实现“数据源配置”UI 组件。 +- [x] **任务 FE-2**: 实现 `useApi.ts` 中用于获取和更新数据源配置的 hooks。 +- [x] **任务 FE-3**: 将 UI 组件与 API hooks 连接,完成前端的完整功能。 +- [x] **任务 FE-4**: 调整 `/llm-config` 页面,使其在UI/UX风格上与新的“数据源配置”面板保持一致性。 diff --git a/docs/3_project_management/tasks/pending/20251117_[Active]_implement_analysis_orchestrator.md b/docs/3_project_management/tasks/pending/20251117_[Active]_implement_analysis_orchestrator.md new file mode 100644 index 0000000..486c71b --- /dev/null +++ b/docs/3_project_management/tasks/pending/20251117_[Active]_implement_analysis_orchestrator.md @@ -0,0 +1,111 @@ +--- +status: "Active" +date: "2025-11-17" +author: "AI 助手" +--- + +# 设计文档:分析模块编排器 + +## 1. 概述与目标 + +### 1.1. 问题陈述 + +我们当前基于 Rust 的后端缺少执行智能、多步骤财务分析所需的核心业务逻辑。尽管旧的 Python 系统拥有一个功能性的分析框架,但在初次重构过程中,这部分逻辑并未被迁移。本应承载此逻辑的 `report-generator-service` 服务目前仅包含一个无法正常工作的占位符实现。此外,前端配置页面缺少从零开始创建或管理分析模块的用户界面,这导致了一个“先有鸡还是先有蛋”的困境——系统中不存在任何可供配置的默认模块。 + +### 1.2. 目标 + +本任务旨在我们的 Rust 微服务架构中,设计并实现一个健壮的、可配置的**分析模块编排器(Analysis Module Orchestrator)**。该系统将复刻并改进旧 Python 系统的逻辑,以支持完全通过配置(提示词和依赖关系)来创建、管理和执行复杂的、具备依赖关系感知能力的分析工作流。 + +为达成此目标,需要完成以下任务: +1. 在前端为分析模块管理实现一个完整的 CRUD (创建、读取、更新、删除) 操作界面。 +2. 在 `report-generator-service` 中实现一个健壮的后端编排器,使其能够基于模块依赖关系构成的有向无环图 (DAG) 来执行分析工作流。 +3. 通过 `api-gateway` 和 NATS 消息总线整合前端与后端服务,以打造无缝的端到端用户体验。 +4. 实现一个数据播种(Data Seeding)机制,以确保系统在首次启动时能够预加载一套默认的分析模块。 + +## 2. 系统架构与数据流 + +本次实现将涉及四个关键服务和一个消息总线:`前端`、`API 网关`、`数据持久化服务` 和 `报告生成服务`。 + +### 2.1. 高层数据流 + +1. **配置流程**: + * **用户** 在 **前端** 配置页面上进行交互,以创建或更新分析模块。 + * **前端** 向 **API 网关** 发送 `PUT /api/v1/configs/analysis_modules` 请求。 + * **API 网关** 将这些请求代理至 **数据持久化服务**,由其将配置保存到数据库的 `system_config` 表中。 + +2. **执行流程**: + * **用户** 在 **前端** 为特定的股票代码触发一次分析运行。 + * **前端** 向 **API 网关** 发送 `POST /api/v1/analysis-requests/{symbol}` 请求。 + * **API 网关** 验证请求,并向 **NATS 消息总线** 的一个新主题发布一条 `GenerateReportCommand` 消息。随后,它会立即向前端返回一个带有请求ID的 `202 Accepted` 响应。 + * **报告生成服务** 订阅 `GenerateReportCommand` 主题,接收到消息后,启动编排工作流。 + * **报告生成服务** 从 **数据持久化服务** 获取所需的分析模块配置。 + * 服务执行分析,为每个模块调用 LLM API,并通过 **数据持久化服务** 将结果持久化存回数据库。 + +## 3. 前端实施计划 (`/config` 页面) + +我们将修改 `frontend/src/app/config/page.tsx` 文件,为分析模块提供完整的 CRUD 用户体验。 + +- **创建 (Create)**: 添加一个“新增模块”按钮。点击后,将显示一个表单,用于输入: + - **模块 ID**: 一个唯一的、机器可读的字符串 (例如, `fundamental_analysis`)。 + - **模块名称**: 一个人类可读的显示名称 (例如, "基本面分析")。 +- **读取 (Read)**: 页面将为每个已存在的分析模块渲染一个卡片,展示其当前配置。 +- **更新 (Update)**: 每个模块卡片将包含以下可编辑字段: + - **LLM Provider**: 一个下拉菜单,其选项从 `llm_providers` 配置中动态填充。 + - **Model**: 一个级联下拉菜单,显示所选 Provider 下可用的模型。 + - **提示词模板**: 一个用于编写 Prompt 的大文本区域。 + - **依赖关系**: 一个包含所有其他模块ID的复选框列表,允许用户定义模块间的依赖。 +- **删除 (Delete)**: 每个模块卡片将有一个带有确认对话框的“删除”按钮。 + +## 4. 后端实施计划 + +### 4.1. `data-persistence-service` + +- **数据播种 (关键任务)**: 实现一次性的启动逻辑。 + 1. 在服务启动时,检查 `system_config` 表中是否存在键为 `analysis_modules` 的记录。 + 2. 如果记录**不存在**,则从磁盘读取旧的 `config/analysis-config.json` 文件。 + 3. 解析文件内容,并将其作为 `analysis_modules` 的值插入数据库。 + 4. 此机制确保系统在首次部署时,即被预置一套默认且功能完备的分析模块。 +- **API**: 无需变更。现有的 `GET /configs/analysis_modules` 和 `PUT /configs/analysis_modules` 端点已能满足需求。 + +### 4.2. `api-gateway` + +- **新端点**: 创建一个新的端点 `POST /api/v1/analysis-requests/{symbol}`。 +- **逻辑**: + 1. 此端点不应执行任何重度计算任务。 + 2. 它将从路径中接收一个股票 `symbol`。 + 3. 它将生成一个唯一的 `request_id` (例如, UUID)。 + 4. 它将构建一条包含 `symbol` 和 `request_id` 的 `GenerateReportCommand` 消息。 + 5. 它将此消息发布到一个专用的 NATS 主题 (例如, `analysis.commands.generate_report`)。 + 6. 它将立即返回一个 `202 Accepted` 状态码,并在响应体中包含 `request_id`。 + +### 4.3. `report-generator-service` (核心任务) + +此服务需要进行最主要的开发工作。所有逻辑将在 `worker.rs` 文件中实现。 + +1. **消息消费者**: 服务将订阅 `analysis.commands.generate_report` NATS 主题。一旦收到 `GenerateReportCommand` 消息,即触发 `run_report_generation_workflow` 工作流。 + +2. **编排逻辑 (`run_report_generation_workflow`)**: + * **获取配置**: 从 `data-persistence-service` 获取完整的 `AnalysisModulesConfig`。 + * **构建依赖图**: 根据模块配置,在内存中构建一个有向图。强烈推荐使用 `petgraph` crate 来完成此任务。 + * **拓扑排序**: 对该图执行拓扑排序,以获得一个线性的执行顺序。该算法**必须**包含循环检测功能,以便在配置错误时能够优雅地处理,并记录错误日志。 + * **顺序执行**: 遍历排序后的模块列表。对每个模块: + * **构建上下文**: 收集其所有直接依赖模块的文本输出(这些模块已保证被提前执行)。 + * **渲染提示词**: 使用 `Tera` 模板引擎,将依赖模块的输出以及其他所需数据(如公司名称、财务数据)注入到当前模块的 `prompt_template` 中。 + * **执行 LLM 调用**: 通过 `LlmClient` 调用相应的 LLM API。 + * **持久化结果**: 成功生成内容后,调用 `data-persistence-service` 将输出文本保存,并与 `symbol` 和 `module_id` 关联。同时,将结果保存在本地,以供工作流中的后续模块使用。 + +3. **补全缺失逻辑**: + * 实现 `// TODO` 中关于持久化结果的部分。 + * 将 `financial_data` 占位符替换为从 `data-persistence-service` 获取并格式化后的真实财务数据。 + +## 5. 未来工作:向 "Deep Research" 模块演进 + +如前所述,初始实现将依赖 LLM 的内部知识来完成“新闻”或“市场情绪”等分析。这是一个为快速实现功能而刻意选择的短期策略。 + +长期愿景是用一个 `Deep Research` 模块来取代这种模式。该模块将作为一个智能的数据预处理器。届时,编排器将不再注入简单的文本,而是触发 Deep Research 模块,后者将: +1. 理解目标分析模块(如 `news_analysis`)的数据需求。 +2. 查询内部数据源(例如,数据库中的 `news` 表)以查找相关信息。 +3. 对检索到的数据执行多步推理或摘要。 +4. 为最终的分析模块提示词提供一个高质量、经过浓缩的数据包。 + +这一演进将使我们的系统从“提示词驱动”转变为“数据驱动”,从而显著提升分析结果的可靠性、可控性和准确性。 diff --git a/frontend/src/app/api/configs/data_sources/route.ts b/frontend/src/app/api/configs/data_sources/route.ts new file mode 100644 index 0000000..f66ee5b --- /dev/null +++ b/frontend/src/app/api/configs/data_sources/route.ts @@ -0,0 +1,45 @@ +const BACKEND_BASE = process.env.BACKEND_INTERNAL_URL || process.env.NEXT_PUBLIC_BACKEND_URL; + +export async function GET() { + if (!BACKEND_BASE) { + return new Response('NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 }); + } + try { + const resp = await fetch(`${BACKEND_BASE}/configs/data_sources`, { + headers: { 'Content-Type': 'application/json' }, + cache: 'no-store', + }); + const text = await resp.text(); + return new Response(text, { + status: resp.status, + headers: { 'Content-Type': resp.headers.get('Content-Type') || 'application/json' }, + }); + } catch (e: any) { + const errorBody = JSON.stringify({ message: e?.message || '连接后端失败' }); + return new Response(errorBody, { status: 502, headers: { 'Content-Type': 'application/json' } }); + } +} + +export async function PUT(req: Request) { + if (!BACKEND_BASE) { + return new Response('NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 }); + } + const body = await req.text(); + try { + const resp = await fetch(`${BACKEND_BASE}/configs/data_sources`, { + method: 'PUT', + headers: { 'Content-Type': 'application/json' }, + body, + }); + const text = await resp.text(); + return new Response(text, { + status: resp.status, + headers: { 'Content-Type': resp.headers.get('Content-Type') || 'application/json' }, + }); + } catch (e: any) { + const errorBody = JSON.stringify({ message: e?.message || '连接后端失败' }); + return new Response(errorBody, { status: 502, headers: { 'Content-Type': 'application/json' } }); + } +} + + diff --git a/frontend/src/app/api/configs/llm_providers/route.ts b/frontend/src/app/api/configs/llm_providers/route.ts index 7b4fb78..5ac6c46 100644 --- a/frontend/src/app/api/configs/llm_providers/route.ts +++ b/frontend/src/app/api/configs/llm_providers/route.ts @@ -4,6 +4,7 @@ export async function GET() { if (!BACKEND_BASE) { return new Response('NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 }); } + try { const resp = await fetch(`${BACKEND_BASE}/configs/llm_providers`, { headers: { 'Content-Type': 'application/json' }, cache: 'no-store', @@ -13,6 +14,10 @@ export async function GET() { status: resp.status, headers: { 'Content-Type': resp.headers.get('Content-Type') || 'application/json' }, }); + } catch (e: any) { + const errorBody = JSON.stringify({ message: e?.message || '连接后端失败' }); + return new Response(errorBody, { status: 502, headers: { 'Content-Type': 'application/json' } }); + } } export async function PUT(req: Request) { @@ -20,6 +25,7 @@ export async function PUT(req: Request) { return new Response('NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 }); } const body = await req.text(); + try { const resp = await fetch(`${BACKEND_BASE}/configs/llm_providers`, { method: 'PUT', headers: { 'Content-Type': 'application/json' }, @@ -30,5 +36,9 @@ export async function PUT(req: Request) { status: resp.status, headers: { 'Content-Type': resp.headers.get('Content-Type') || 'application/json' }, }); + } catch (e: any) { + const errorBody = JSON.stringify({ message: e?.message || '连接后端失败' }); + return new Response(errorBody, { status: 502, headers: { 'Content-Type': 'application/json' } }); + } } diff --git a/frontend/src/app/api/discover-models/[provider_id]/route.ts b/frontend/src/app/api/discover-models/[provider_id]/route.ts index f02e008..be6435b 100644 --- a/frontend/src/app/api/discover-models/[provider_id]/route.ts +++ b/frontend/src/app/api/discover-models/[provider_id]/route.ts @@ -1,13 +1,18 @@ -const BACKEND_BASE = process.env.NEXT_PUBLIC_BACKEND_URL; +const BACKEND_BASE = process.env.BACKEND_INTERNAL_URL || process.env.NEXT_PUBLIC_BACKEND_URL; export async function GET( _req: Request, - context: { params: Promise<{ provider_id: string }> } + context: any ) { if (!BACKEND_BASE) { - return new Response('NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 }); + return new Response('BACKEND_INTERNAL_URL/NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 }); + } + const raw = context?.params; + const params = raw && typeof raw.then === 'function' ? await raw : raw; + const provider_id = params?.provider_id as string | undefined; + if (!provider_id) { + return new Response('provider_id 缺失', { status: 400 }); } - const { provider_id } = await context.params; const target = `${BACKEND_BASE}/discover-models/${encodeURIComponent(provider_id)}`; const resp = await fetch(target, { headers: { 'Content-Type': 'application/json' }, diff --git a/frontend/src/app/api/discover-models/route.ts b/frontend/src/app/api/discover-models/route.ts new file mode 100644 index 0000000..cb63264 --- /dev/null +++ b/frontend/src/app/api/discover-models/route.ts @@ -0,0 +1,26 @@ +const BACKEND_BASE = process.env.BACKEND_INTERNAL_URL || process.env.NEXT_PUBLIC_BACKEND_URL; + +export async function POST(req: Request) { + if (!BACKEND_BASE) { + return new Response('BACKEND_INTERNAL_URL/NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 }); + } + const body = await req.text(); + try { + const resp = await fetch(`${BACKEND_BASE}/discover-models`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body, + cache: 'no-store', + }); + const text = await resp.text(); + return new Response(text, { + status: resp.status, + headers: { 'Content-Type': resp.headers.get('Content-Type') || 'application/json' }, + }); + } catch (e: any) { + const errorBody = JSON.stringify({ message: e?.message || '连接后端失败' }); + return new Response(errorBody, { status: 502, headers: { 'Content-Type': 'application/json' } }); + } +} + + diff --git a/frontend/src/app/config/page.tsx b/frontend/src/app/config/page.tsx index 27a4fce..054556e 100644 --- a/frontend/src/app/config/page.tsx +++ b/frontend/src/app/config/page.tsx @@ -1,9 +1,9 @@ 'use client'; -import { useState, useEffect } from 'react'; +import { useState, useEffect, useRef, useCallback } from 'react'; import { - useConfig, updateConfig, testConfig, - useAnalysisModules, updateAnalysisModules, useLlmProviders + useConfig, testConfig, + useAnalysisModules, updateAnalysisModules, useLlmProviders, updateLlmProviders, discoverProviderModels, discoverProviderModelsPreview } from '@/hooks/useApi'; import { useConfigStore } from '@/stores/useConfigStore'; import type { SystemConfig } from '@/stores/useConfigStore'; @@ -17,8 +17,11 @@ import { Label } from "@/components/ui/label"; import { Separator } from "@/components/ui/separator"; import { Checkbox } from "@/components/ui/checkbox"; import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from "@/components/ui/select"; +import { ScrollArea } from "@/components/ui/scroll-area"; +import { Spinner } from "@/components/ui/spinner"; // Types are imported from '@/types' -import type { AnalysisModulesConfig } from '@/types'; +import type { AnalysisModulesConfig, DataSourcesConfig, DataSourceConfig, DataSourceProvider, LlmProvidersConfig, LlmModel } from '@/types'; +import { useDataSourcesConfig, updateDataSourcesConfig } from '@/hooks/useApi'; export default function ConfigPage() { // 从 Zustand store 获取全局状态 @@ -30,14 +33,18 @@ export default function ConfigPage() { // const { data: analysisConfig, mutate: mutateAnalysisConfig } = useAnalysisModules(); // 本地表单状态 - const [newApiApiKey, setNewApiApiKey] = useState(''); - const [newApiBaseUrl, setNewApiBaseUrl] = useState(''); - const [tushareApiKey, setTushareApiKey] = useState(''); - const [finnhubApiKey, setFinnhubApiKey] = useState(''); + // 数据源本地状态 + const { data: initialDataSources, error: dsError, isLoading: dsLoading, mutate: mutateDataSources } = useDataSourcesConfig(); + const [localDataSources, setLocalDataSources] = useState({}); // 分析配置的本地状态 const [localAnalysisModules, setLocalAnalysisModules] = useState({}); + // -- New State for Creating Analysis Modules -- + const [isCreatingModule, setIsCreatingModule] = useState(false); + const [newModuleId, setNewModuleId] = useState(''); + const [newModuleName, setNewModuleName] = useState(''); + // 分析配置保存状态(状态定义在下方统一维护) // 测试结果状态 @@ -48,17 +55,131 @@ export default function ConfigPage() { const [saveMessage, setSaveMessage] = useState(''); // --- New State for Analysis Modules --- - const { data: llmProviders } = useLlmProviders(); + const { data: llmProviders, mutate: mutateLlmProviders } = useLlmProviders(); const { data: initialAnalysisModules, mutate } = useAnalysisModules(); const [isSavingAnalysis, setIsSavingAnalysis] = useState(false); const [analysisSaveMessage, setAnalysisSaveMessage] = useState(''); + // --- State for LLM Providers Management --- + const [localLlmProviders, setLocalLlmProviders] = useState({}); + const [isSavingLlm, setIsSavingLlm] = useState(false); + const [llmSaveMessage, setLlmSaveMessage] = useState(''); + const [newProviderId, setNewProviderId] = useState(''); + const [newProviderBaseUrl, setNewProviderBaseUrl] = useState(''); + const [newProviderApiKey, setNewProviderApiKey] = useState(''); + const [pendingApiKeys, setPendingApiKeys] = useState>({}); + const [editingApiKey, setEditingApiKey] = useState>({}); + const [discoverMessages, setDiscoverMessages] = useState>({}); + const [modelPickerOpen, setModelPickerOpen] = useState>({}); + const [candidateModels, setCandidateModels] = useState>({}); + const [modelSearch, setModelSearch] = useState>({}); + const [selectedCandidates, setSelectedCandidates] = useState>>({}); + const hasInitializedLlmRef = useRef(false); + const autoSaveTimerRef = useRef | null>(null); + const [newModelInputs, setNewModelInputs] = useState>({}); + const [newModelNameInputs, setNewModelNameInputs] = useState>({}); + const [newModelMenuOpen, setNewModelMenuOpen] = useState>({}); + const [newModelHighlightIndex, setNewModelHighlightIndex] = useState>({}); + const savingStartedAtRef = useRef(0); + const llmDirtyRef = useRef(false); + const latestServerPayloadRef = useRef(''); + const lastSavedPayloadRef = useRef(''); + const markLlmDirty = useCallback(() => { + llmDirtyRef.current = true; + }, []); + const normalizeProviders = useCallback((obj: LlmProvidersConfig) => { + const cloned: LlmProvidersConfig = JSON.parse(JSON.stringify(obj || {})); + Object.keys(cloned).forEach(pid => { + if (!cloned[pid].name || cloned[pid].name.trim().length === 0) { + cloned[pid].name = pid; + } + }); + return cloned; + }, []); + + const buildMergedLlmPayload = useCallback(() => { + const merged: LlmProvidersConfig = normalizeProviders(localLlmProviders || {}); + // 待更新的 API Key 覆盖 + Object.entries(pendingApiKeys || {}).forEach(([pid, key]) => { + if (merged[pid]) merged[pid].api_key = key; + }); + return merged; + }, [localLlmProviders, pendingApiKeys, normalizeProviders]); + + const flushSaveLlmImmediate = useCallback(async () => { + const payload = buildMergedLlmPayload(); + const payloadStr = JSON.stringify(payload); + if (payloadStr === latestServerPayloadRef.current || payloadStr === lastSavedPayloadRef.current) { + return; + } + savingStartedAtRef.current = Date.now(); + setIsSavingLlm(true); + setLlmSaveMessage('自动保存中...'); + try { + const updated = await updateLlmProviders(payload); + await mutateLlmProviders(updated, false); + lastSavedPayloadRef.current = payloadStr; + llmDirtyRef.current = false; + setPendingApiKeys({}); + setEditingApiKey({}); + setLlmSaveMessage('已自动保存'); + } catch (e: any) { + setLlmSaveMessage(`保存失败: ${e?.message || '未知错误'}`); + } finally { + const elapsed = Date.now() - (savingStartedAtRef.current || 0); + const minMs = 1000; + const waitMs = elapsed >= minMs ? 0 : (minMs - elapsed); + if (waitMs > 0) { + await new Promise((resolve) => setTimeout(resolve, waitMs)); + } + setIsSavingLlm(false); + setTimeout(() => setLlmSaveMessage(''), 3000); + } + }, [buildMergedLlmPayload, mutateLlmProviders, setPendingApiKeys, setEditingApiKey]); + + // 对 LLM Providers 的任何修改自动保存(去抖) + useEffect(() => { + if (!hasInitializedLlmRef.current) { + hasInitializedLlmRef.current = true; + return; + } + if (!llmDirtyRef.current) { + return; + } + if (autoSaveTimerRef.current) { + clearTimeout(autoSaveTimerRef.current); + } + autoSaveTimerRef.current = setTimeout(() => { + void flushSaveLlmImmediate(); + }, 500); + return () => { + if (autoSaveTimerRef.current) { + clearTimeout(autoSaveTimerRef.current); + } + }; + }, [localLlmProviders, pendingApiKeys, flushSaveLlmImmediate]); + useEffect(() => { if (initialAnalysisModules) { setLocalAnalysisModules(initialAnalysisModules); } }, [initialAnalysisModules]); + useEffect(() => { + if (initialDataSources) { + setLocalDataSources(initialDataSources); + } + }, [initialDataSources]); + + useEffect(() => { + if (llmProviders) { + setLocalLlmProviders(llmProviders); + const normalized = normalizeProviders(llmProviders); + latestServerPayloadRef.current = JSON.stringify(normalized); + llmDirtyRef.current = false; + } + }, [llmProviders, normalizeProviders]); + const handleAnalysisChange = (moduleId: string, field: string, value: string) => { setLocalAnalysisModules(prev => ({ ...prev, @@ -115,68 +236,53 @@ export default function ConfigPage() { }); }; - // 旧版保存逻辑已移除,统一使用 handleSaveAnalysis - - const validateConfig = () => { - const errors: string[] = []; - - // 验证New API Base URL格式 - if (newApiBaseUrl && !newApiBaseUrl.match(/^https?:\/\/.+/)) { - errors.push('New API Base URL格式不正确,应为 http:// 或 https:// 开头'); - } - - // 验证API Key长度(基本检查) - if (newApiApiKey && newApiApiKey.length < 10) { - errors.push('New API Key长度过短'); - } - - if (tushareApiKey && tushareApiKey.length < 10) { - errors.push('Tushare API Key长度过短'); - } - - if (finnhubApiKey && finnhubApiKey.length < 10) { - errors.push('Finnhub API Key长度过短'); - } - - return errors; - }; - - const handleSave = async () => { - // 验证配置 - const validationErrors = validateConfig(); - if (validationErrors.length > 0) { - setSaveMessage(`配置验证失败: ${validationErrors.join(', ')}`); + // --- New handlers for module creation/deletion --- + const handleAddNewModule = () => { + if (!newModuleId || !newModuleName) { + setAnalysisSaveMessage('模块 ID 和名称不能为空'); + setTimeout(() => setAnalysisSaveMessage(''), 3000); return; } + if (localAnalysisModules[newModuleId]) { + setAnalysisSaveMessage('模块 ID 已存在'); + setTimeout(() => setAnalysisSaveMessage(''), 3000); + return; + } + setLocalAnalysisModules(prev => ({ + ...prev, + [newModuleId]: { + name: newModuleName, + provider_id: '', + model_id: '', + prompt_template: '', + dependencies: [], + } + })); + setNewModuleId(''); + setNewModuleName(''); + setIsCreatingModule(false); + }; + + const handleDeleteModule = (moduleId: string) => { + setLocalAnalysisModules(prev => { + const next = { ...prev }; + delete next[moduleId]; + return next; + }); + }; + + // 旧版保存逻辑已移除,统一使用 handleSaveAnalysis + const handleSave = async () => { setSaving(true); setSaveMessage('保存中...'); - const newConfig: Partial = {}; - - if (newApiApiKey || newApiBaseUrl) { - newConfig.new_api = { - api_key: newApiApiKey || config?.new_api?.api_key || '', - base_url: newApiBaseUrl || config?.new_api?.base_url || undefined, - }; - } - - if (tushareApiKey || finnhubApiKey) { - newConfig.data_sources = { - ...config?.data_sources, - ...(tushareApiKey && { tushare: { api_key: tushareApiKey } }), - ...(finnhubApiKey && { finnhub: { api_key: finnhubApiKey } }), - }; - } - try { - const updated = await updateConfig(newConfig); - setConfig(updated); // 更新全局状态 + if (initialDataSources) { + await updateDataSourcesConfig(localDataSources); + await mutateDataSources(localDataSources, false); + } setSaveMessage('保存成功!'); - // 清空敏感字段输入 - setNewApiApiKey(''); - setTushareApiKey(''); - setFinnhubApiKey(''); } catch (e: any) { setSaveMessage(`保存失败: ${e.message}`); } finally { @@ -197,26 +303,18 @@ export default function ConfigPage() { } }; - const handleTestNewApi = () => { - handleTest('new_api', { - api_key: newApiApiKey || config?.new_api?.api_key, - base_url: newApiBaseUrl || config?.new_api?.base_url - }); - }; - const handleTestTushare = () => { - handleTest('tushare', { api_key: tushareApiKey || config?.data_sources?.tushare?.api_key }); + const cfg = localDataSources['tushare']; + handleTest('tushare', { api_key: cfg?.api_key, api_url: cfg?.api_url, enabled: cfg?.enabled }); }; const handleTestFinnhub = () => { - handleTest('finnhub', { api_key: finnhubApiKey || config?.data_sources?.finnhub?.api_key }); + const cfg = localDataSources['finnhub']; + handleTest('finnhub', { api_key: cfg?.api_key, api_url: cfg?.api_url, enabled: cfg?.enabled }); }; const handleReset = () => { - setNewApiApiKey(''); - setNewApiBaseUrl(''); - setTushareApiKey(''); - setFinnhubApiKey(''); + if (initialDataSources) setLocalDataSources(initialDataSources); setTestResults({}); setSaveMessage(''); }; @@ -253,7 +351,7 @@ export default function ConfigPage() { // 验证导入的配置格式 if (importedConfig.new_api?.base_url) { - setNewApiBaseUrl(importedConfig.new_api.base_url); + setNewProviderBaseUrl(importedConfig.new_api.base_url); } setSaveMessage('配置导入成功,请检查并保存'); @@ -302,42 +400,496 @@ export default function ConfigPage() { - AI 服务配置 - New API 设置 (兼容 OpenAI 格式) +
+ AI Provider 管理 +
+ {isSavingLlm && ( + <> + + 自动保存中... + + )} + {!isSavingLlm && llmSaveMessage && ( + + {llmSaveMessage} + + )} +
+
+ 管理多个 Provider(API Key、Base URL)及其模型清单
-
- + {/* 新增 Provider */} +
+
+
+ + setNewProviderId(e.target.value)} placeholder="例如: openai_official" /> +
+
+ + setNewProviderBaseUrl(e.target.value)} placeholder="例如: https://api.openai.com/v1" /> +
+
+ + setNewProviderApiKey(e.target.value)} placeholder="输入新Provider的API Key" /> +
+
- setNewApiApiKey(e.target.value)} - placeholder="留空表示保持当前值" - className="flex-1" - /> -
- {testResults.new_api && ( - - {testResults.new_api.message} - - )}
- -
- - setNewApiBaseUrl(e.target.value)} - placeholder="例如: http://localhost:3000/v1" - className="flex-1" - /> + + + + {/* Provider 列表 */} +
+ {Object.entries(localLlmProviders || {}).map(([providerId, provider]) => { + const message = discoverMessages[providerId]; + const candidates = candidateModels[providerId] || []; + const query = (modelSearch[providerId] || '').trim().toLowerCase(); + const filteredCandidates = candidates.filter(id => id.toLowerCase().includes(query)); + const selectedMap = selectedCandidates[providerId] || {}; + return ( +
+
+
+
+

{providerId}

+ Base on ID + + {isSavingLlm ? ( + + + 正在保存… + + ) : null} + +
+
{provider.api_base_url}
+
+
+ + + +
+
+ + {message && ( +
{message}
+ )} + + {modelPickerOpen[providerId] && ( +
+
+
+ + setModelSearch(prev => ({ ...prev, [providerId]: e.target.value }))} + placeholder="输入关键字过滤(前缀/包含均可)" + /> +
+
+ + +
+
+ +
+
+ +
+ {filteredCandidates.length === 0 ? ( +
无匹配的候选模型
+ ) : ( + filteredCandidates.map(id => { + const checked = !!selectedMap[id]; + return ( +
+
{id}
+ { + setSelectedCandidates(prev => { + const cur = { ...(prev[providerId] || {}) }; + if (v) cur[id] = true; else delete cur[id]; + return { ...prev, [providerId]: cur }; + }); + }} + /> +
+ ); + }) + )} +
+
+
+ )} + +
+
+ + { + markLlmDirty(); + setLocalLlmProviders(prev => ({ + ...prev, + [providerId]: { ...prev[providerId], api_base_url: e.target.value }, + })); + }} + /> +
+
+ + {!editingApiKey[providerId] ? ( +
+ + {provider.api_key ? '已配置' : '未配置'} + + +
+ ) : ( +
+ { + markLlmDirty(); + setPendingApiKeys(prev => ({ ...prev, [providerId]: e.target.value })); + }} + /> + +
+ )} +
+
+ +
+ +
+ {(provider.models || []).map((m) => ( +
+ { + const v = e.target.value; + markLlmDirty(); + setLocalLlmProviders(prev => ({ + ...prev, + [providerId]: { + ...prev[providerId], + models: (prev[providerId].models || []).map(mm => + mm.model_id === m.model_id ? { ...mm, model_id: v } : mm + ), + }, + })); + }} + /> + { + const v = e.target.value; + markLlmDirty(); + setLocalLlmProviders(prev => ({ + ...prev, + [providerId]: { + ...prev[providerId], + models: (prev[providerId].models || []).map(mm => + mm.model_id === m.model_id ? { ...mm, name: v } : mm + ), + }, + })); + }} + placeholder="可选别名" + /> +
+
+ { + markLlmDirty(); + setLocalLlmProviders(prev => ({ + ...prev, + [providerId]: { + ...prev[providerId], + models: (prev[providerId].models || []).map(mm => + mm.model_id === m.model_id ? { ...mm, is_active: !!checked } : mm + ), + }, + })); + }} + /> + +
+ +
+
+ ))} +
+
+
+
+ { + const v = e.target.value; + setNewModelInputs(prev => ({ ...prev, [providerId]: v })); + setNewModelMenuOpen(prev => ({ ...prev, [providerId]: true })); + setNewModelHighlightIndex(prev => ({ ...prev, [providerId]: 0 })); + }} + onFocus={() => { + setNewModelMenuOpen(prev => ({ ...prev, [providerId]: true })); + }} + onKeyDown={(e) => { + const typedRaw = (newModelInputs[providerId] || ''); + const typed = typedRaw.trim().toLowerCase(); + const existing = new Set((provider.models || []).map(m => m.model_id)); + const list = (candidateModels[providerId] || []) + .filter(id => id.toLowerCase().includes(typed)) + .filter(id => !existing.has(id)) + .slice(0, 50); + const hi = newModelHighlightIndex[providerId] ?? 0; + if (e.key === 'ArrowDown') { + e.preventDefault(); + if (list.length > 0) { + const next = Math.min(hi + 1, list.length - 1); + setNewModelHighlightIndex(prev => ({ ...prev, [providerId]: next })); + } + return; + } + if (e.key === 'ArrowUp') { + e.preventDefault(); + if (list.length > 0) { + const next = Math.max(hi - 1, 0); + setNewModelHighlightIndex(prev => ({ ...prev, [providerId]: next })); + } + return; + } + if (e.key === 'Escape') { + setNewModelMenuOpen(prev => ({ ...prev, [providerId]: false })); + return; + } + if (e.key === 'Enter') { + const chosen = list.length > 0 && (newModelMenuOpen[providerId] ?? false) ? list[hi] : (newModelInputs[providerId] || '').trim(); + const id = (chosen || '').trim(); + if (!id) return; + const existsSet = new Set((provider.models || []).map(m => m.model_id)); + if (existsSet.has(id)) return; + markLlmDirty(); + setLocalLlmProviders(prev => ({ + ...prev, + [providerId]: { + ...prev[providerId], + models: [ + ...(prev[providerId].models || []), + { model_id: id, name: (newModelNameInputs[providerId] || '') || null, is_active: true }, + ], + }, + })); + setNewModelInputs(prev => ({ ...prev, [providerId]: '' })); + setNewModelNameInputs(prev => ({ ...prev, [providerId]: '' })); + setNewModelMenuOpen(prev => ({ ...prev, [providerId]: false })); + setNewModelHighlightIndex(prev => ({ ...prev, [providerId]: 0 })); + } + }} + /> + {(() => { + const typed = (newModelInputs[providerId] || '').trim().toLowerCase(); + const existing = new Set((provider.models || []).map(m => m.model_id)); + const list = (candidateModels[providerId] || []) + .filter(id => id.toLowerCase().includes(typed)) + .filter(id => !existing.has(id)) + .slice(0, 50); + const open = !!newModelMenuOpen[providerId]; + if (!typed || list.length === 0 || !open) return null; + const hi = newModelHighlightIndex[providerId] ?? 0; + return ( +
+ {list.map((id, idx) => ( +
{ + setNewModelHighlightIndex(prev => ({ ...prev, [providerId]: idx })); + }} + onMouseDown={(ev) => { + // 防止失焦导致菜单关闭 + ev.preventDefault(); + }} + onClick={() => { + setNewModelInputs(prev => ({ ...prev, [providerId]: id })); + }} + > + {id} +
+ ))} +
+ ); + })()} +
+ { + const v = e.target.value; + setNewModelNameInputs(prev => ({ ...prev, [providerId]: v })); + }} + /> +
+ +
+
+
+ 若无候选,请先点击上方“刷新候选模型”加载后再输入筛选。 +
+
+
+
+ ); + })}
@@ -350,53 +902,90 @@ export default function ConfigPage() { 外部数据源 API 设置 -
-
- -

中国股票数据源

-
- setTushareApiKey(e.target.value)} - placeholder="留空表示保持当前值" - className="flex-1" - /> - -
- {testResults.tushare && ( - - {testResults.tushare.message} - - )} + {dsLoading &&
加载数据源配置...
} + {dsError &&
数据源配置加载失败: {String(dsError)}
} + {!dsLoading && !dsError && ( +
+ {(['tushare','finnhub','alphavantage','yfinance'] as DataSourceProvider[]).map((providerKey) => { + const item: DataSourceConfig = localDataSources[providerKey] || { + provider: providerKey, + api_key: '', + api_url: '', + enabled: false, + }; + return ( +
+
+
+ +

+ {providerKey === 'tushare' && '中国市场数据源'} + {providerKey === 'finnhub' && '全球市场数据源'} + {providerKey === 'alphavantage' && '全球市场数据源(MCP桥接)'} + {providerKey === 'yfinance' && '雅虎财经数据源'} +

+
+
+ { + setLocalDataSources((prev) => ({ + ...prev, + [providerKey]: { ...item, enabled: !!checked, provider: providerKey }, + })); + }} + /> + +
+
+
+
+ + { + const v = e.target.value; + setLocalDataSources((prev) => ({ + ...prev, + [providerKey]: { ...item, api_key: v, provider: providerKey }, + })); + }} + placeholder="留空表示清空或保持(根据启用状态)" + /> +
+
+ + { + const v = e.target.value; + setLocalDataSources((prev) => ({ + ...prev, + [providerKey]: { ...item, api_url: v, provider: providerKey }, + })); + }} + placeholder={providerKey === 'tushare' ? 'https://api.tushare.pro' : 'https://...'} + /> +
+
+
+ {providerKey === 'tushare' && ( + + )} + {providerKey === 'finnhub' && ( + + )} +
+
+ ); + })}
- - - -
- -

全球金融市场数据源

-
- setFinnhubApiKey(e.target.value)} - placeholder="留空表示保持当前值" - className="flex-1" - /> - -
- {testResults.finnhub && ( - - {testResults.finnhub.message} - - )} -
-
+ )} @@ -405,15 +994,21 @@ export default function ConfigPage() { 分析模块配置 - 配置各个分析模块的模型和提示词 + 配置各个分析模块的模型、提示词和依赖关系。模块ID将作为Prompt中注入上下文的占位符。 {Object.entries(localAnalysisModules).map(([moduleId, config]) => { const availableModels = llmProviders?.[config.provider_id]?.models.filter(m => m.is_active) || []; return (
-

{config.name || moduleId}

- +
+
+

{config.name || moduleId}

+

ID: {moduleId}

+
+ +
+
@@ -462,7 +1057,41 @@ export default function ConfigPage() {
); })} + + {isCreatingModule && ( +
+

新增分析模块

+
+
+ + setNewModuleId(e.target.value.replace(/\s/g, ''))} + placeholder="e.g. fundamental_analysis" + /> +
+
+ + setNewModuleName(e.target.value)} + placeholder="e.g. 基本面分析" + /> +
+
+
+ + +
+
+ )} +
+ diff --git a/frontend/src/hooks/useApi.ts b/frontend/src/hooks/useApi.ts index e0548f9..1919c52 100644 --- a/frontend/src/hooks/useApi.ts +++ b/frontend/src/hooks/useApi.ts @@ -7,6 +7,7 @@ import { LlmProvidersConfig, AnalysisModulesConfig, FinancialConfigResponse, + DataSourcesConfig, } from "@/types"; import { useEffect, useState } from "react"; // Execution-step types not used currently; keep API minimal and explicit @@ -335,6 +336,15 @@ export async function discoverProviderModels(providerId: string) { return res.json(); } +export async function discoverProviderModelsPreview(apiBaseUrl: string, apiKey: string) { + const res = await fetch(`/api/discover-models`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ api_base_url: apiBaseUrl, api_key: apiKey }), + }); + return res.json(); +} + // --- Analysis Modules Config Hooks --- export function useAnalysisModules() { @@ -353,3 +363,22 @@ export async function updateAnalysisModules(payload: AnalysisModulesConfig) { } return res.json() as Promise; } + +// --- Data Sources Config Hooks --- + +export function useDataSourcesConfig() { + return useSWR('/api/configs/data_sources', fetcher); +} + +export async function updateDataSourcesConfig(payload: DataSourcesConfig) { + const res = await fetch('/api/configs/data_sources', { + method: 'PUT', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(payload), + }); + if (!res.ok) { + const text = await res.text().catch(() => ''); + throw new Error(text || `HTTP ${res.status}`); + } + return res.json() as Promise; +} diff --git a/frontend/src/types/index.ts b/frontend/src/types/index.ts index 5c51e97..c29b642 100644 --- a/frontend/src/types/index.ts +++ b/frontend/src/types/index.ts @@ -497,4 +497,19 @@ export interface AnalysisModuleConfig { } /** 分析模块配置集合:键为 module_id(如 bull_case) */ -export type AnalysisModulesConfig = Record; \ No newline at end of file +export type AnalysisModulesConfig = Record; + +// ============================================================================ +// 数据源配置类型(与后端 common-contracts 配置保持结构一致) +// ============================================================================ + +export type DataSourceProvider = 'tushare' | 'finnhub' | 'alphavantage' | 'yfinance'; + +export interface DataSourceConfig { + provider: DataSourceProvider; + api_key?: string | null; + api_url?: string | null; + enabled: boolean; +} + +export type DataSourcesConfig = Record; \ No newline at end of file diff --git a/services/alphavantage-provider-service/src/api.rs b/services/alphavantage-provider-service/src/api.rs index 36d470d..e3996dc 100644 --- a/services/alphavantage-provider-service/src/api.rs +++ b/services/alphavantage-provider-service/src/api.rs @@ -6,7 +6,7 @@ use axum::{ Router, }; use common_contracts::observability::{HealthStatus, ServiceStatus, TaskProgress}; -use crate::state::AppState; +use crate::state::{AppState, ServiceOperationalStatus}; pub fn create_router(app_state: AppState) -> Router { Router::new() @@ -17,14 +17,22 @@ pub fn create_router(app_state: AppState) -> Router { /// [GET /health] /// Provides the current health status of the module. -async fn health_check(State(_state): State) -> Json { +async fn health_check(State(state): State) -> Json { let mut details = HashMap::new(); - // In a real scenario, we would check connections to the message bus, etc. - details.insert("message_bus_connection".to_string(), "ok".to_string()); + let operational_status = state.status.read().await; + + let (service_status, reason) = match &*operational_status { + ServiceOperationalStatus::Active => (ServiceStatus::Ok, "ok".to_string()), + ServiceOperationalStatus::Degraded { reason } => { + (ServiceStatus::Degraded, reason.clone()) + } + }; + details.insert("operational_status".to_string(), reason); + let status = HealthStatus { module_id: "alphavantage-provider-service".to_string(), - status: ServiceStatus::Ok, + status: service_status, version: env!("CARGO_PKG_VERSION").to_string(), details, }; diff --git a/services/alphavantage-provider-service/src/config.rs b/services/alphavantage-provider-service/src/config.rs index 9991d4f..c4e4d8e 100644 --- a/services/alphavantage-provider-service/src/config.rs +++ b/services/alphavantage-provider-service/src/config.rs @@ -1,12 +1,12 @@ use secrecy::SecretString; use serde::Deserialize; -#[derive(Deserialize, Debug)] +#[derive(Debug, Deserialize, Clone)] pub struct AppConfig { pub server_port: u16, pub nats_addr: String, - pub alphavantage_api_key: SecretString, pub data_persistence_service_url: String, + pub alphavantage_api_key: Option, } impl AppConfig { @@ -15,6 +15,14 @@ impl AppConfig { .add_source(config::Environment::default().separator("__")) .build()?; - config.try_deserialize() + let cfg: AppConfig = config.try_deserialize()?; + + if cfg.data_persistence_service_url.trim().is_empty() { + return Err(config::ConfigError::Message( + "DATA_PERSISTENCE_SERVICE_URL must not be empty".to_string(), + )); + } + + Ok(cfg) } } diff --git a/services/alphavantage-provider-service/src/config_poller.rs b/services/alphavantage-provider-service/src/config_poller.rs new file mode 100644 index 0000000..64abc33 --- /dev/null +++ b/services/alphavantage-provider-service/src/config_poller.rs @@ -0,0 +1,56 @@ +use crate::error::Result; +use crate::state::AppState; +use common_contracts::config_models::{DataSourceConfig, DataSourceProvider}; +use secrecy::SecretString; +use std::collections::HashMap; +use std::time::Duration; +use tracing::{error, info, instrument}; + +const POLLING_INTERVAL_SECONDS: u64 = 60; + +#[instrument(skip(state))] +pub async fn run_config_poller(state: AppState) { + info!("Starting configuration poller..."); + let mut interval = tokio::time::interval(Duration::from_secs(POLLING_INTERVAL_SECONDS)); + interval.tick().await; // Initial tick happens immediately + + loop { + if let Err(e) = poll_and_update_config(&state).await { + error!("Failed to poll and update config: {:?}", e); + } + interval.tick().await; + } +} + +async fn poll_and_update_config(state: &AppState) -> Result<()> { + info!("Polling for data source configurations..."); + let client = reqwest::Client::new(); + let url = format!( + "{}/configs/data_sources", + state.config.data_persistence_service_url + ); + + let response = client.get(&url).send().await?; + response.error_for_status_ref()?; + + let configs: HashMap = response.json().await?; + + let alphavantage_config = configs.values().find(|cfg| { + matches!(cfg.provider, DataSourceProvider::Alphavantage) && cfg.enabled + }); + + if let Some(config) = alphavantage_config { + if let Some(api_key) = &config.api_key { + state.update_provider(Some(SecretString::from(api_key.clone()))).await; + info!("Successfully updated Alphavantage provider with new configuration."); + } else { + state.update_provider(None).await; + info!("Alphavantage provider is enabled but API key is missing. Service is degraded."); + } + } else { + state.update_provider(None).await; + info!("No enabled Alphavantage configuration found. Service is degraded."); + } + + Ok(()) +} diff --git a/services/alphavantage-provider-service/src/error.rs b/services/alphavantage-provider-service/src/error.rs index a1950bb..e5aa8a9 100644 --- a/services/alphavantage-provider-service/src/error.rs +++ b/services/alphavantage-provider-service/src/error.rs @@ -1,40 +1,41 @@ +use anyhow::anyhow; +use reqwest::Error as ReqwestError; use thiserror::Error; -pub type Result = std::result::Result; - #[derive(Error, Debug)] pub enum AppError { #[error("Configuration error: {0}")] Configuration(String), - #[error("Message bus error: {0}")] - MessageBus(#[from] async_nats::Error), - - #[error("Message bus publish error: {0}")] - MessageBusPublish(#[from] async_nats::PublishError), - - #[error("Message bus subscribe error: {0}")] - MessageBusSubscribe(String), - - #[error("Message bus connect error: {0}")] - MessageBusConnect(String), - - #[error("HTTP request to another service failed: {0}")] - ServiceRequest(#[from] reqwest::Error), - #[error("Data parsing error: {0}")] DataParsing(#[from] anyhow::Error), + + #[error("Internal error: {0}")] + Internal(String), + + #[error("Provider not available: {0}")] + ProviderNotAvailable(String), + + #[error(transparent)] + Reqwest(#[from] ReqwestError), + + #[error(transparent)] + Nats(#[from] async_nats::Error), + + #[error(transparent)] + NatsSubscribe(#[from] async_nats::client::SubscribeError), + + #[error(transparent)] + NatsUnsubscribe(#[from] async_nats::UnsubscribeError), + + #[error(transparent)] + NatsPublish(#[from] async_nats::error::Error), } -// 手动实现针对 async-nats 泛型错误类型的 From 转换 -impl From> for AppError { - fn from(err: async_nats::error::Error) -> Self { - AppError::MessageBusConnect(err.to_string()) +impl From for AppError { + fn from(e: config::ConfigError) -> Self { + AppError::Configuration(e.to_string()) } } -impl From for AppError { - fn from(err: async_nats::SubscribeError) -> Self { - AppError::MessageBusSubscribe(err.to_string()) - } -} +pub type Result = std::result::Result; diff --git a/services/alphavantage-provider-service/src/main.rs b/services/alphavantage-provider-service/src/main.rs index 6970aab..2d54922 100644 --- a/services/alphavantage-provider-service/src/main.rs +++ b/services/alphavantage-provider-service/src/main.rs @@ -7,6 +7,7 @@ mod persistence; mod state; mod worker; mod av_client; +mod config_poller; use crate::config::AppConfig; use crate::error::Result; @@ -29,6 +30,9 @@ async fn main() -> Result<()> { // Initialize application state let app_state = AppState::new(config)?; + // --- Start the config poller --- + tokio::spawn(config_poller::run_config_poller(app_state.clone())); + // Create the Axum router let app = api::create_router(app_state.clone()); diff --git a/services/alphavantage-provider-service/src/message_consumer.rs b/services/alphavantage-provider-service/src/message_consumer.rs index 204dffa..0232250 100644 --- a/services/alphavantage-provider-service/src/message_consumer.rs +++ b/services/alphavantage-provider-service/src/message_consumer.rs @@ -1,26 +1,57 @@ use crate::error::Result; -use crate::state::AppState; +use crate::state::{AppState, ServiceOperationalStatus}; use common_contracts::messages::FetchCompanyDataCommand; use futures_util::StreamExt; -use tracing::{error, info}; +use std::time::Duration; +use tracing::{error, info, warn}; const SUBJECT_NAME: &str = "data_fetch_commands"; pub async fn run(state: AppState) -> Result<()> { info!("Starting NATS message consumer..."); - let client = async_nats::connect(&state.config.nats_addr).await?; - info!("Connected to NATS."); + loop { + let status = state.status.read().await.clone(); + if let ServiceOperationalStatus::Degraded { reason } = status { + warn!( + "Service is in degraded state (reason: {}). Pausing message consumption for 30s.", + reason + ); + tokio::time::sleep(Duration::from_secs(30)).await; + continue; + } - // This is a simple subscriber. For production, consider JetStream for durability. + info!("Service is Active. Connecting to NATS..."); + match async_nats::connect(&state.config.nats_addr).await { + Ok(client) => { + info!("Successfully connected to NATS."); + if let Err(e) = subscribe_and_process(state.clone(), client).await { + error!("NATS subscription error: {}. Reconnecting in 10s...", e); + } + } + Err(e) => { + error!("Failed to connect to NATS: {}. Retrying in 10s...", e); + } + } + tokio::time::sleep(Duration::from_secs(10)).await; + } +} + +async fn subscribe_and_process(state: AppState, client: async_nats::Client) -> Result<()> { let mut subscriber = client.subscribe(SUBJECT_NAME.to_string()).await?; - info!( "Consumer started, waiting for messages on subject '{}'", SUBJECT_NAME ); while let Some(message) = subscriber.next().await { + let current_status = state.status.read().await.clone(); + if matches!(current_status, ServiceOperationalStatus::Degraded {..}) { + warn!("Service became degraded. Disconnecting from NATS and pausing consumption."); + subscriber.unsubscribe().await?; + return Ok(()); + } + info!("Received NATS message."); let state_clone = state.clone(); let publisher_clone = client.clone(); @@ -42,6 +73,5 @@ pub async fn run(state: AppState) -> Result<()> { } }); } - Ok(()) } diff --git a/services/alphavantage-provider-service/src/state.rs b/services/alphavantage-provider-service/src/state.rs index 5cfb122..6b5ac74 100644 --- a/services/alphavantage-provider-service/src/state.rs +++ b/services/alphavantage-provider-service/src/state.rs @@ -1,23 +1,74 @@ -use std::sync::Arc; +use crate::av_client::AvClient; +use crate::config::AppConfig; use common_contracts::observability::TaskProgress; use dashmap::DashMap; +use secrecy::{ExposeSecret, SecretString}; +use std::sync::Arc; +use tokio::sync::RwLock; use uuid::Uuid; -use crate::config::AppConfig; -use crate::error::Result; -pub type TaskStore = Arc>; +#[derive(Clone, Debug)] +pub enum ServiceOperationalStatus { + Active, + Degraded { reason: String }, +} #[derive(Clone)] pub struct AppState { + pub tasks: Arc>, pub config: Arc, - pub tasks: TaskStore, + pub status: Arc>, + av_provider: Arc>>>, } impl AppState { - pub fn new(config: AppConfig) -> Result { + pub fn new(config: AppConfig) -> Result { + let initial_status = if config.alphavantage_api_key.is_some() { + ServiceOperationalStatus::Degraded { reason: "Initializing provider, waiting for config poller to connect.".to_string() } + } else { + ServiceOperationalStatus::Degraded { reason: "Alphavantage API Key is not configured.".to_string() } + }; + Ok(Self { - config: Arc::new(config), tasks: Arc::new(DashMap::new()), + config: Arc::new(config), + status: Arc::new(RwLock::new(initial_status)), + av_provider: Arc::new(RwLock::new(None)), }) } + + pub async fn get_provider(&self) -> Option> { + self.av_provider.read().await.clone() + } + + pub async fn update_provider(&self, api_key: Option) { + let mut provider_guard = self.av_provider.write().await; + let mut status_guard = self.status.write().await; + + if let Some(key) = api_key { + let mcp_endpoint = format!( + "https://mcp.alphavantage.co/mcp?apikey={}", + key.expose_secret() + ); + match AvClient::connect(&mcp_endpoint).await { + Ok(new_provider) => { + *provider_guard = Some(Arc::new(new_provider)); + *status_guard = ServiceOperationalStatus::Active; + } + Err(e) => { + *provider_guard = None; + *status_guard = ServiceOperationalStatus::Degraded { + reason: format!("Failed to connect to Alphavantage: {}", e), + }; + } + } + } else { + *provider_guard = None; + *status_guard = ServiceOperationalStatus::Degraded { + reason: "Alphavantage API Key is not configured.".to_string(), + }; + } + } } + +pub type TaskStore = DashMap; diff --git a/services/alphavantage-provider-service/src/worker.rs b/services/alphavantage-provider-service/src/worker.rs index 48d3984..e919112 100644 --- a/services/alphavantage-provider-service/src/worker.rs +++ b/services/alphavantage-provider-service/src/worker.rs @@ -1,4 +1,4 @@ -use crate::error::Result; +use crate::error::{Result, AppError}; use crate::mapping::{CombinedFinancials, parse_company_profile, parse_financials, parse_realtime_quote}; use crate::persistence::PersistenceClient; use crate::state::{AppState, TaskStore}; @@ -30,9 +30,21 @@ pub async fn handle_fetch_command( }; state.tasks.insert(command.request_id, task); - let api_key = state.config.alphavantage_api_key.expose_secret(); - let mcp_endpoint = format!("https://mcp.alphavantage.co/mcp?apikey={}", api_key); - let client = Arc::new(AvClient::connect(&mcp_endpoint).await?); + let client = match state.get_provider().await { + Some(p) => p, + None => { + let reason = "Execution failed: Alphavantage provider is not available (misconfigured).".to_string(); + error!("{}", reason); + update_task_progress( + &state.tasks, + command.request_id, + 100, + &reason, + ).await; + return Err(AppError::ProviderNotAvailable(reason)); + } + }; + let persistence_client = PersistenceClient::new(state.config.data_persistence_service_url.clone()); let symbol = command.symbol.clone(); diff --git a/services/api-gateway/Cargo.lock b/services/api-gateway/Cargo.lock index b445c55..42fa9f6 100644 --- a/services/api-gateway/Cargo.lock +++ b/services/api-gateway/Cargo.lock @@ -2997,6 +2997,7 @@ dependencies = [ "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] diff --git a/services/api-gateway/Cargo.toml b/services/api-gateway/Cargo.toml index fb0101f..2d1e626 100644 --- a/services/api-gateway/Cargo.toml +++ b/services/api-gateway/Cargo.toml @@ -7,7 +7,7 @@ edition = "2024" # Web Service axum = "0.8.7" tokio = { version = "1", features = ["full"] } -tower-http = { version = "0.6.6", features = ["cors"] } +tower-http = { version = "0.6.6", features = ["cors", "trace"] } # Shared Contracts common-contracts = { path = "../common-contracts" } diff --git a/services/api-gateway/Dockerfile b/services/api-gateway/Dockerfile index 44727f2..547b680 100644 --- a/services/api-gateway/Dockerfile +++ b/services/api-gateway/Dockerfile @@ -3,20 +3,12 @@ FROM rust:1.90 as builder WORKDIR /usr/src/app -# Pre-build dependencies to leverage Docker layer caching +# Deterministic dependency caching without shipping a stub binary COPY ./services/common-contracts /usr/src/app/services/common-contracts COPY ./services/api-gateway/Cargo.toml ./services/api-gateway/Cargo.lock* ./services/api-gateway/ - WORKDIR /usr/src/app/services/api-gateway -RUN mkdir -p src && \ - echo "fn main() {}" > src/main.rs && \ - cargo build --release --bin api-gateway - -# Copy the full source code +# Copy the full source code and build the final binary COPY ./services/api-gateway /usr/src/app/services/api-gateway - -# Build the application -WORKDIR /usr/src/app/services/api-gateway RUN cargo build --release --bin api-gateway # 2. Runtime Stage @@ -25,7 +17,10 @@ FROM debian:bookworm-slim # Set timezone ENV TZ=Asia/Shanghai RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone -RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates libssl3 && rm -rf /var/lib/apt/lists/* +# Install minimal runtime deps: +# - ca-certificates/libssl3: TLS support for outbound HTTPS +# - curl: required for container healthcheck defined in docker-compose.yml +RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates libssl3 curl && rm -rf /var/lib/apt/lists/* # Copy the built binary from the builder stage COPY --from=builder /usr/src/app/services/api-gateway/target/release/api-gateway /usr/local/bin/ diff --git a/services/api-gateway/src/api.rs b/services/api-gateway/src/api.rs index 96056d3..86344f2 100644 --- a/services/api-gateway/src/api.rs +++ b/services/api-gateway/src/api.rs @@ -46,8 +46,10 @@ fn create_v1_router() -> Router { // --- New Config Routes --- .route("/configs/llm_providers", get(get_llm_providers_config).put(update_llm_providers_config)) .route("/configs/analysis_modules", get(get_analysis_modules_config).put(update_analysis_modules_config)) - // --- New Discover Route --- + .route("/configs/data_sources", get(get_data_sources_config).put(update_data_sources_config)) + // --- New Discover Routes --- .route("/discover-models/{provider_id}", get(discover_models)) + .route("/discover-models", post(discover_models_preview)) } // --- Health & Stateless Tasks --- @@ -159,7 +161,7 @@ async fn get_task_progress( // --- Config API Handlers (Proxy to data-persistence-service) --- -use common_contracts::config_models::{LlmProvidersConfig, AnalysisModulesConfig}; +use common_contracts::config_models::{LlmProvidersConfig, AnalysisModulesConfig, DataSourcesConfig}; /// [GET /v1/configs/llm_providers] async fn get_llm_providers_config( @@ -195,16 +197,35 @@ async fn update_analysis_modules_config( Ok(Json(updated_config)) } +/// [GET /v1/configs/data_sources] +async fn get_data_sources_config( + State(state): State, +) -> Result { + let config = state.persistence_client.get_data_sources_config().await?; + Ok(Json(config)) +} + +/// [PUT /v1/configs/data_sources] +async fn update_data_sources_config( + State(state): State, + Json(payload): Json, +) -> Result { + let updated_config = state.persistence_client.update_data_sources_config(&payload).await?; + Ok(Json(updated_config)) +} + /// [GET /v1/discover-models/:provider_id] async fn discover_models( State(state): State, Path(provider_id): Path, ) -> Result { + info!("discover_models: provider_id={}", provider_id); let providers = state.persistence_client.get_llm_providers_config().await?; if let Some(provider) = providers.get(&provider_id) { let client = reqwest::Client::new(); let url = format!("{}/models", provider.api_base_url.trim_end_matches('/')); + info!("discover_models: target_url={} (provider_id={})", url, provider_id); let response = client .get(&url) @@ -215,10 +236,7 @@ async fn discover_models( if !response.status().is_success() { let status = response.status(); let error_text = response.text().await?; - warn!( - "Failed to discover models for provider '{}'. Status: {}, Body: {}", - provider_id, status, error_text - ); + warn!("discover_models failed: provider_id={} status={} body={}", provider_id, status, error_text); // Return a structured error to the frontend return Ok(( StatusCode::BAD_GATEWAY, @@ -232,9 +250,52 @@ async fn discover_models( let models_json: serde_json::Value = response.json().await?; Ok((StatusCode::OK, Json(models_json)).into_response()) } else { + warn!("discover_models: provider not found: {}", provider_id); Ok(( StatusCode::NOT_FOUND, Json(serde_json::json!({ "error": "Provider not found" })), ).into_response()) } } + +#[derive(Deserialize)] +struct DiscoverPreviewRequest { + api_base_url: String, + api_key: String, +} + +/// [POST /v1/discover-models] +/// Preview discovery without persisting provider configuration. +async fn discover_models_preview( + Json(payload): Json, +) -> Result { + let redacted_key = if payload.api_key.is_empty() { "" } else { "" }; + info!( + "discover_models_preview: target_url={}/models api_key={}", + payload.api_base_url.trim_end_matches('/'), + redacted_key + ); + let client = reqwest::Client::new(); + let url = format!("{}/models", payload.api_base_url.trim_end_matches('/')); + let response = client + .get(&url) + .bearer_auth(&payload.api_key) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let error_text = response.text().await?; + warn!("discover_models_preview failed: status={} body={}", status, error_text); + return Ok(( + StatusCode::BAD_GATEWAY, + Json(serde_json::json!({ + "error": "Failed to fetch models from provider", + "provider_error": error_text, + })), + ).into_response()); + } + + let models_json: serde_json::Value = response.json().await?; + Ok((StatusCode::OK, Json(models_json)).into_response()) +} diff --git a/services/api-gateway/src/main.rs b/services/api-gateway/src/main.rs index dbb14a3..6894220 100644 --- a/services/api-gateway/src/main.rs +++ b/services/api-gateway/src/main.rs @@ -7,11 +7,36 @@ mod persistence; use crate::config::AppConfig; use crate::error::Result; use crate::state::AppState; -use tracing::info; +use tracing::{error, info}; +use tracing_subscriber::EnvFilter; use std::process; +use std::io::{self, Write}; +use tower_http::trace::TraceLayer; #[tokio::main] async fn main() { + // Ensure panics are clearly printed with backtraces (independent of env var) + std::panic::set_hook(Box::new(|panic_info| { + eprintln!("panic in api-gateway: {}", panic_info); + eprintln!("backtrace:\n{:?}", std::backtrace::Backtrace::force_capture()); + })); + + // Emit earliest visibility logs to stderr for containers that restart quickly + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0); + eprintln!("api-gateway launching: pid={}, ts_unix={}", process::id(), ts); + // Print critical environment variables relevant to configuration (no secrets) + eprintln!( + "env: SERVER_PORT={:?}, NATS_ADDR={:?}, DATA_PERSISTENCE_SERVICE_URL={:?}, PROVIDER_SERVICES.len={}", + std::env::var("SERVER_PORT").ok(), + std::env::var("NATS_ADDR").ok(), + std::env::var("DATA_PERSISTENCE_SERVICE_URL").ok(), + std::env::var("PROVIDER_SERVICES").ok().map(|s| s.len()).unwrap_or(0), + ); + let _ = io::stderr().flush(); + if let Err(e) = run().await { eprintln!("api-gateway failed to start: {}", e); process::exit(1); @@ -19,9 +44,16 @@ async fn main() { } async fn run() -> Result<()> { - // Initialize logging + // Initialize deterministic logging, default to info if not provided + let env_filter = EnvFilter::try_from_default_env() + .unwrap_or_else(|_| EnvFilter::new("info,axum=info,hyper=info")); tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_env_filter(env_filter) + .with_target(true) + .with_thread_ids(true) + .with_thread_names(true) + .with_ansi(false) + .compact() .init(); info!("Starting api-gateway service..."); @@ -29,20 +61,36 @@ async fn run() -> Result<()> { // Load configuration let config = AppConfig::load().map_err(|e| error::AppError::Configuration(e.to_string()))?; let port = config.server_port; + info!( + server_port = port, + nats_addr = %config.nats_addr, + persistence_url = %config.data_persistence_service_url, + "Loaded configuration" + ); info!("Configured provider services: {:?}", config.provider_services); // Initialize application state let app_state = AppState::new(config).await?; // Create the Axum router - let app = api::create_router(app_state); + let app = api::create_router(app_state) + // Request-level tracing for better observability in Tilt/Compose logs + .layer(TraceLayer::new_for_http()); // Start the HTTP server - let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port)) - .await - .unwrap(); + let addr = format!("0.0.0.0:{}", port); + let listener = match tokio::net::TcpListener::bind(&addr).await { + Ok(l) => l, + Err(e) => { + error!(%addr, err = %e, "Failed to bind TCP listener"); + return Err(error::AppError::Anyhow(anyhow::anyhow!(e))); + } + }; info!("HTTP server listening on port {}", port); - axum::serve(listener, app).await.unwrap(); + if let Err(e) = axum::serve(listener, app).await { + error!(err = %e, "HTTP server terminated with error"); + return Err(error::AppError::Anyhow(anyhow::anyhow!(e))); + } Ok(()) } diff --git a/services/api-gateway/src/persistence.rs b/services/api-gateway/src/persistence.rs index 04a1976..1fed953 100644 --- a/services/api-gateway/src/persistence.rs +++ b/services/api-gateway/src/persistence.rs @@ -4,7 +4,7 @@ use crate::error::Result; use common_contracts::dtos::CompanyProfileDto; -use common_contracts::config_models::{LlmProvidersConfig, AnalysisModulesConfig}; +use common_contracts::config_models::{LlmProvidersConfig, AnalysisModulesConfig, DataSourcesConfig}; #[derive(Clone)] pub struct PersistenceClient { @@ -88,4 +88,31 @@ impl PersistenceClient { .await?; Ok(updated_config) } + + pub async fn get_data_sources_config(&self) -> Result { + let url = format!("{}/configs/data_sources", self.base_url); + let config = self + .client + .get(&url) + .send() + .await? + .error_for_status()? + .json::() + .await?; + Ok(config) + } + + pub async fn update_data_sources_config(&self, payload: &DataSourcesConfig) -> Result { + let url = format!("{}/configs/data_sources", self.base_url); + let updated_config = self + .client + .put(&url) + .json(payload) + .send() + .await? + .error_for_status()? + .json::() + .await?; + Ok(updated_config) + } } diff --git a/services/common-contracts/src/config_models.rs b/services/common-contracts/src/config_models.rs index 313a465..4aee390 100644 --- a/services/common-contracts/src/config_models.rs +++ b/services/common-contracts/src/config_models.rs @@ -34,3 +34,29 @@ pub struct AnalysisModuleConfig { // 整个分析模块配置集合的数据结构 pub type AnalysisModulesConfig = HashMap; // Key: module_id, e.g., "bull_case" + +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct SystemConfig { + pub llm_providers: LlmProvidersConfig, + pub analysis_modules: AnalysisModulesConfig, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, ToSchema)] +#[serde(rename_all = "snake_case")] +pub enum DataSourceProvider { + Tushare, + Finnhub, + Alphavantage, + Yfinance, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, ToSchema)] +pub struct DataSourceConfig { + pub provider: DataSourceProvider, + pub api_key: Option, + pub api_url: Option, + pub enabled: bool, +} + +// 数据源配置集合(集中、强类型、单一来源) +pub type DataSourcesConfig = HashMap; diff --git a/services/data-persistence-service/src/api/configs.rs b/services/data-persistence-service/src/api/configs.rs index af3ddb8..7095b8e 100644 --- a/services/data-persistence-service/src/api/configs.rs +++ b/services/data-persistence-service/src/api/configs.rs @@ -1,6 +1,9 @@ use axum::{extract::State, Json}; -use common_contracts::config_models::{LlmProvidersConfig, AnalysisModulesConfig}; +use common_contracts::config_models::{ + AnalysisModulesConfig, DataSourceConfig, LlmProvidersConfig, +}; use service_kit::api; +use std::collections::HashMap; use crate::{db::system_config, AppState, ServerError}; @@ -41,3 +44,32 @@ pub async fn update_analysis_modules_config( let updated_config = system_config::update_config(pool, "analysis_modules", &payload).await?; Ok(Json(updated_config)) } + +pub type DataSourcesConfig = HashMap; + +#[api( + GET, + "/api/v1/configs/data_sources", + output(detail = "DataSourcesConfig") +)] +pub async fn get_data_sources_config( + State(state): State, +) -> Result, ServerError> { + let pool = state.pool(); + let config = system_config::get_config::(pool, "data_sources").await?; + Ok(Json(config)) +} + +#[api( + PUT, + "/api/v1/configs/data_sources", + output(detail = "DataSourcesConfig") +)] +pub async fn update_data_sources_config( + State(state): State, + Json(payload): Json, +) -> Result, ServerError> { + let pool = state.pool(); + let updated_config = system_config::update_config(pool, "data_sources", &payload).await?; + Ok(Json(updated_config)) +} diff --git a/services/data-persistence-service/src/models.rs b/services/data-persistence-service/src/models.rs index a6841ca..09d9cc6 100644 --- a/services/data-persistence-service/src/models.rs +++ b/services/data-persistence-service/src/models.rs @@ -1 +1,12 @@ +use common_contracts::config_models::{AnalysisModulesConfig, LlmProvidersConfig, DataSourceConfig}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + pub use common_contracts::models::*; + +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct SystemConfig { + pub llm_providers: LlmProvidersConfig, + pub analysis_modules: AnalysisModulesConfig, + pub data_sources: HashMap, +} diff --git a/services/finnhub-provider-service/src/api.rs b/services/finnhub-provider-service/src/api.rs index 8d60b38..3b2d585 100644 --- a/services/finnhub-provider-service/src/api.rs +++ b/services/finnhub-provider-service/src/api.rs @@ -6,7 +6,7 @@ use axum::{ Router, }; use common_contracts::observability::{HealthStatus, ServiceStatus, TaskProgress}; -use crate::state::AppState; +use crate::state::{AppState, ServiceOperationalStatus}; pub fn create_router(app_state: AppState) -> Router { Router::new() @@ -17,14 +17,22 @@ pub fn create_router(app_state: AppState) -> Router { /// [GET /health] /// Provides the current health status of the module. -async fn health_check(State(_state): State) -> Json { +async fn health_check(State(state): State) -> Json { let mut details = HashMap::new(); - // In a real scenario, we would check connections to the message bus, etc. - details.insert("message_bus_connection".to_string(), "ok".to_string()); + let operational_status = state.status.read().await; + + let (service_status, reason) = match &*operational_status { + ServiceOperationalStatus::Active => (ServiceStatus::Ok, "ok".to_string()), + ServiceOperationalStatus::Degraded { reason } => { + (ServiceStatus::Degraded, reason.clone()) + } + }; + details.insert("operational_status".to_string(), reason); + let status = HealthStatus { module_id: "finnhub-provider-service".to_string(), - status: ServiceStatus::Ok, + status: service_status, version: env!("CARGO_PKG_VERSION").to_string(), details, }; diff --git a/services/finnhub-provider-service/src/config.rs b/services/finnhub-provider-service/src/config.rs index b33d3dc..6cbe273 100644 --- a/services/finnhub-provider-service/src/config.rs +++ b/services/finnhub-provider-service/src/config.rs @@ -7,7 +7,7 @@ pub struct AppConfig { pub nats_addr: String, pub data_persistence_service_url: String, pub finnhub_api_url: String, - pub finnhub_api_key: SecretString, + pub finnhub_api_key: Option, } impl AppConfig { @@ -39,11 +39,6 @@ impl AppConfig { "FINNHUB_API_URL must not be empty".to_string(), )); } - if cfg.finnhub_api_key.expose_secret().trim().is_empty() { - return Err(config::ConfigError::Message( - "FINNHUB_API_KEY must not be empty".to_string(), - )); - } Ok(cfg) } diff --git a/services/finnhub-provider-service/src/config_poller.rs b/services/finnhub-provider-service/src/config_poller.rs new file mode 100644 index 0000000..0d21c8f --- /dev/null +++ b/services/finnhub-provider-service/src/config_poller.rs @@ -0,0 +1,56 @@ +use crate::error::Result; +use crate::state::AppState; +use common_contracts::config_models::{DataSourceConfig, DataSourceProvider}; +use secrecy::SecretString; +use std::collections::HashMap; +use std::time::Duration; +use tracing::{error, info, instrument}; + +const POLLING_INTERVAL_SECONDS: u64 = 60; + +#[instrument(skip(state))] +pub async fn run_config_poller(state: AppState) { + info!("Starting configuration poller..."); + let mut interval = tokio::time::interval(Duration::from_secs(POLLING_INTERVAL_SECONDS)); + interval.tick().await; // Initial tick happens immediately + + loop { + if let Err(e) = poll_and_update_config(&state).await { + error!("Failed to poll and update config: {:?}", e); + } + interval.tick().await; + } +} + +async fn poll_and_update_config(state: &AppState) -> Result<()> { + info!("Polling for data source configurations..."); + let client = reqwest::Client::new(); + let url = format!( + "{}/configs/data_sources", + state.config.data_persistence_service_url + ); + + let response = client.get(&url).send().await?; + response.error_for_status_ref()?; + + let configs: HashMap = response.json().await?; + + let finnhub_config = configs.values().find(|cfg| { + matches!(cfg.provider, DataSourceProvider::Finnhub) && cfg.enabled + }); + + if let Some(config) = finnhub_config { + if let Some(api_key) = &config.api_key { + state.update_provider(Some(SecretString::from(api_key.clone()))).await; + info!("Successfully updated Finnhub provider with new configuration."); + } else { + state.update_provider(None).await; + info!("Finnhub provider is enabled but API key is missing. Service is degraded."); + } + } else { + state.update_provider(None).await; + info!("No enabled Finnhub configuration found. Service is degraded."); + } + + Ok(()) +} diff --git a/services/finnhub-provider-service/src/error.rs b/services/finnhub-provider-service/src/error.rs index a1950bb..990de74 100644 --- a/services/finnhub-provider-service/src/error.rs +++ b/services/finnhub-provider-service/src/error.rs @@ -1,40 +1,38 @@ +use anyhow::anyhow; +use reqwest::Error as ReqwestError; use thiserror::Error; -pub type Result = std::result::Result; - #[derive(Error, Debug)] pub enum AppError { #[error("Configuration error: {0}")] Configuration(String), - #[error("Message bus error: {0}")] - MessageBus(#[from] async_nats::Error), - - #[error("Message bus publish error: {0}")] - MessageBusPublish(#[from] async_nats::PublishError), - - #[error("Message bus subscribe error: {0}")] - MessageBusSubscribe(String), - - #[error("Message bus connect error: {0}")] - MessageBusConnect(String), - - #[error("HTTP request to another service failed: {0}")] - ServiceRequest(#[from] reqwest::Error), - #[error("Data parsing error: {0}")] DataParsing(#[from] anyhow::Error), + + #[error("Provider not available: {0}")] + ProviderNotAvailable(String), + + #[error(transparent)] + Reqwest(#[from] ReqwestError), + + #[error(transparent)] + Nats(#[from] async_nats::Error), + + #[error(transparent)] + NatsSubscribe(#[from] async_nats::client::SubscribeError), + + #[error(transparent)] + NatsUnsubscribe(#[from] async_nats::UnsubscribeError), + + #[error(transparent)] + NatsPublish(#[from] async_nats::error::Error), } -// 手动实现针对 async-nats 泛型错误类型的 From 转换 -impl From> for AppError { - fn from(err: async_nats::error::Error) -> Self { - AppError::MessageBusConnect(err.to_string()) +impl From for AppError { + fn from(e: config::ConfigError) -> Self { + AppError::Configuration(e.to_string()) } } -impl From for AppError { - fn from(err: async_nats::SubscribeError) -> Self { - AppError::MessageBusSubscribe(err.to_string()) - } -} +pub type Result = std::result::Result; diff --git a/services/finnhub-provider-service/src/finnhub.rs b/services/finnhub-provider-service/src/finnhub.rs index 520ec35..2382340 100644 --- a/services/finnhub-provider-service/src/finnhub.rs +++ b/services/finnhub-provider-service/src/finnhub.rs @@ -55,6 +55,7 @@ pub struct ReportItem { pub label: String, } +#[derive(Clone)] pub struct FinnhubDataProvider { client: FinnhubClient, } diff --git a/services/finnhub-provider-service/src/main.rs b/services/finnhub-provider-service/src/main.rs index 5463cb4..64e0e8e 100644 --- a/services/finnhub-provider-service/src/main.rs +++ b/services/finnhub-provider-service/src/main.rs @@ -8,6 +8,7 @@ mod message_consumer; mod persistence; mod state; mod worker; +mod config_poller; use crate::config::AppConfig; use crate::error::Result; @@ -30,6 +31,9 @@ async fn main() -> Result<()> { // Initialize application state let app_state = AppState::new(config); + // --- Start the config poller --- + tokio::spawn(config_poller::run_config_poller(app_state.clone())); + // Create the Axum router let app = api::create_router(app_state.clone()); diff --git a/services/finnhub-provider-service/src/message_consumer.rs b/services/finnhub-provider-service/src/message_consumer.rs index a62f83e..b32f610 100644 --- a/services/finnhub-provider-service/src/message_consumer.rs +++ b/services/finnhub-provider-service/src/message_consumer.rs @@ -1,27 +1,58 @@ use crate::error::Result; -use crate::state::AppState; +use crate::state::{AppState, ServiceOperationalStatus}; use common_contracts::messages::FetchCompanyDataCommand; use futures_util::StreamExt; use std::sync::Arc; -use tracing::{error, info}; +use std::time::Duration; +use tracing::{error, info, warn}; const SUBJECT_NAME: &str = "data_fetch_commands"; pub async fn run(state: AppState) -> Result<()> { info!("Starting NATS message consumer..."); - let client = async_nats::connect(&state.config.nats_addr).await?; - info!("Connected to NATS."); + loop { + let status = state.status.read().await.clone(); + if let ServiceOperationalStatus::Degraded { reason } = status { + warn!( + "Service is in degraded state (reason: {}). Pausing message consumption for 30s.", + reason + ); + tokio::time::sleep(Duration::from_secs(30)).await; + continue; + } - // This is a simple subscriber. For production, consider JetStream for durability. + info!("Service is Active. Connecting to NATS..."); + match async_nats::connect(&state.config.nats_addr).await { + Ok(client) => { + info!("Successfully connected to NATS."); + if let Err(e) = subscribe_and_process(state.clone(), client).await { + error!("NATS subscription error: {}. Reconnecting in 10s...", e); + } + } + Err(e) => { + error!("Failed to connect to NATS: {}. Retrying in 10s...", e); + } + } + tokio::time::sleep(Duration::from_secs(10)).await; + } +} + +async fn subscribe_and_process(state: AppState, client: async_nats::Client) -> Result<()> { let mut subscriber = client.subscribe(SUBJECT_NAME.to_string()).await?; - info!( "Consumer started, waiting for messages on subject '{}'", SUBJECT_NAME ); while let Some(message) = subscriber.next().await { + let current_status = state.status.read().await.clone(); + if matches!(current_status, ServiceOperationalStatus::Degraded {..}) { + warn!("Service became degraded. Disconnecting from NATS and pausing consumption."); + subscriber.unsubscribe().await?; + return Ok(()); + } + info!("Received NATS message."); let state_clone = state.clone(); let publisher_clone = client.clone(); @@ -30,6 +61,16 @@ pub async fn run(state: AppState) -> Result<()> { match serde_json::from_slice::(&message.payload) { Ok(command) => { info!("Deserialized command for symbol: {}", command.symbol); + + // Skip processing if market is 'CN' + if command.market.to_uppercase() == "CN" { + info!( + "Skipping command for symbol '{}' as its market ('{}') is 'CN'.", + command.symbol, command.market + ); + return; + } + if let Err(e) = crate::worker::handle_fetch_command(state_clone, command, publisher_clone) .await diff --git a/services/finnhub-provider-service/src/state.rs b/services/finnhub-provider-service/src/state.rs index 58d0c07..be7ed28 100644 --- a/services/finnhub-provider-service/src/state.rs +++ b/services/finnhub-provider-service/src/state.rs @@ -1,32 +1,73 @@ -use std::sync::Arc; - -use dashmap::DashMap; -use uuid::Uuid; - use common_contracts::observability::TaskProgress; -use secrecy::ExposeSecret; +use secrecy::{ExposeSecret, SecretString}; +use std::sync::Arc; +use tokio::sync::RwLock; use crate::config::AppConfig; use crate::finnhub::FinnhubDataProvider; +use dashmap::DashMap; +use uuid::Uuid; + +#[derive(Clone, Debug)] +pub enum ServiceOperationalStatus { + Active, + Degraded { reason: String }, +} #[derive(Clone)] pub struct AppState { pub tasks: Arc>, pub config: Arc, - pub finnhub_provider: Arc, + pub status: Arc>, + finnhub_provider: Arc>>, } impl AppState { pub fn new(config: AppConfig) -> Self { - let provider = Arc::new(FinnhubDataProvider::new( - config.finnhub_api_url.clone(), - config.finnhub_api_key.expose_secret().to_string(), - )); + let (initial_provider, initial_status) = + if let Some(api_key) = config.finnhub_api_key.as_ref() { + let provider = FinnhubDataProvider::new( + config.finnhub_api_url.clone(), + api_key.expose_secret().to_string(), + ); + (Some(provider), ServiceOperationalStatus::Active) + } else { + ( + None, + ServiceOperationalStatus::Degraded { + reason: "Finnhub API Key is not configured.".to_string(), + }, + ) + }; Self { tasks: Arc::new(DashMap::new()), config: Arc::new(config), - finnhub_provider: provider, + status: Arc::new(RwLock::new(initial_status)), + finnhub_provider: Arc::new(RwLock::new(initial_provider)), + } + } + + pub async fn get_provider(&self) -> Option { + self.finnhub_provider.read().await.clone() + } + + pub async fn update_provider(&self, api_key: Option) { + let mut provider_guard = self.finnhub_provider.write().await; + let mut status_guard = self.status.write().await; + + if let Some(key) = api_key { + let new_provider = FinnhubDataProvider::new( + self.config.finnhub_api_url.clone(), + key.expose_secret().to_string(), + ); + *provider_guard = Some(new_provider); + *status_guard = ServiceOperationalStatus::Active; + } else { + *provider_guard = None; + *status_guard = ServiceOperationalStatus::Degraded { + reason: "Finnhub API Key is not configured.".to_string(), + }; } } } diff --git a/services/finnhub-provider-service/src/worker.rs b/services/finnhub-provider-service/src/worker.rs index b80e84f..8293f35 100644 --- a/services/finnhub-provider-service/src/worker.rs +++ b/services/finnhub-provider-service/src/worker.rs @@ -1,11 +1,11 @@ -use crate::error::Result; +use crate::error::{AppError, Result}; use crate::persistence::PersistenceClient; use crate::state::AppState; use chrono::Datelike; use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto}; use common_contracts::messages::{CompanyProfilePersistedEvent, FetchCompanyDataCommand, FinancialsPersistedEvent}; use common_contracts::observability::TaskProgress; -use tracing::info; +use tracing::{error, info}; pub async fn handle_fetch_command( state: AppState, @@ -26,11 +26,22 @@ pub async fn handle_fetch_command( }, ); + let provider = match state.get_provider().await { + Some(p) => p, + None => { + let reason = "Execution failed: Finnhub provider is not available (misconfigured).".to_string(); + error!("{}", reason); + if let Some(mut task) = state.tasks.get_mut(&command.request_id) { + task.status = "Failed".to_string(); + task.details = reason.clone(); + } + return Err(AppError::ProviderNotAvailable(reason)); + } + }; + // 1. Fetch data via provider - let (profile, financials): (CompanyProfileDto, Vec) = state - .finnhub_provider - .fetch_all_data(&command.symbol) - .await?; + let (profile, financials): (CompanyProfileDto, Vec) = + provider.fetch_all_data(&command.symbol).await?; // 2. Persist { diff --git a/services/tushare-provider-service/Cargo.lock b/services/tushare-provider-service/Cargo.lock index ed0badb..6d1fe3c 100644 --- a/services/tushare-provider-service/Cargo.lock +++ b/services/tushare-provider-service/Cargo.lock @@ -2296,6 +2296,16 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" +[[package]] +name = "secrecy" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bd1c54ea06cfd2f6b63219704de0b9b4f72dcc2b8fdef820be6cd799780e91e" +dependencies = [ + "serde", + "zeroize", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -3312,6 +3322,7 @@ dependencies = [ "rmcp", "rust_decimal", "rust_decimal_macros", + "secrecy", "serde", "serde_json", "thiserror 2.0.17", diff --git a/services/tushare-provider-service/Cargo.toml b/services/tushare-provider-service/Cargo.toml index de1dd92..b080324 100644 --- a/services/tushare-provider-service/Cargo.toml +++ b/services/tushare-provider-service/Cargo.toml @@ -31,3 +31,4 @@ chrono = "0.4.38" rust_decimal = "1.35.0" rust_decimal_macros = "1.35.0" itertools = "0.14.0" +secrecy = { version = "0.8", features = ["serde"] } diff --git a/services/tushare-provider-service/src/api.rs b/services/tushare-provider-service/src/api.rs index e29ce49..744bfed 100644 --- a/services/tushare-provider-service/src/api.rs +++ b/services/tushare-provider-service/src/api.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use axum::{routing::get, Router, extract::State, response::Json}; -use crate::state::AppState; +use crate::state::{AppState, ServiceOperationalStatus}; use common_contracts::observability::{HealthStatus, ServiceStatus}; pub fn create_router(app_state: AppState) -> Router { @@ -11,12 +11,22 @@ pub fn create_router(app_state: AppState) -> Router { .with_state(app_state) } -async fn health_check(State(_state): State) -> Json { +async fn health_check(State(state): State) -> Json { let mut details = HashMap::new(); - details.insert("message_bus_connection".to_string(), "ok".to_string()); + let operational_status = state.status.read().await; + + let (service_status, reason) = match &*operational_status { + ServiceOperationalStatus::Active => (ServiceStatus::Ok, "ok".to_string()), + ServiceOperationalStatus::Degraded { reason } => { + (ServiceStatus::Degraded, reason.clone()) + } + }; + + details.insert("operational_status".to_string(), reason); + let status = HealthStatus { module_id: "tushare-provider-service".to_string(), - status: ServiceStatus::Ok, + status: service_status, version: env!("CARGO_PKG_VERSION").to_string(), details, }; diff --git a/services/tushare-provider-service/src/config.rs b/services/tushare-provider-service/src/config.rs index b9c0700..f0aab53 100644 --- a/services/tushare-provider-service/src/config.rs +++ b/services/tushare-provider-service/src/config.rs @@ -1,4 +1,5 @@ use serde::Deserialize; +use secrecy::SecretString; #[derive(Debug, Deserialize, Clone)] pub struct AppConfig { @@ -6,7 +7,7 @@ pub struct AppConfig { pub nats_addr: String, pub data_persistence_service_url: String, pub tushare_api_url: String, - pub tushare_api_token: String, + pub tushare_api_token: Option, } impl AppConfig { @@ -36,11 +37,6 @@ impl AppConfig { "TUSHARE_API_URL must not be empty".to_string(), )); } - if cfg.tushare_api_token.trim().is_empty() || cfg.tushare_api_token.trim() == "YOUR_TUSHARE_API_TOKEN" { - return Err(config::ConfigError::Message( - "TUSHARE_API_TOKEN must be provided (non-empty, non-placeholder)".to_string(), - )); - } Ok(cfg) } diff --git a/services/tushare-provider-service/src/config_poller.rs b/services/tushare-provider-service/src/config_poller.rs new file mode 100644 index 0000000..0526065 --- /dev/null +++ b/services/tushare-provider-service/src/config_poller.rs @@ -0,0 +1,56 @@ +use crate::error::Result; +use crate::state::AppState; +use common_contracts::config_models::{DataSourceConfig, DataSourceProvider}; +use secrecy::SecretString; +use std::collections::HashMap; +use std::time::Duration; +use tracing::{error, info, instrument}; + +const POLLING_INTERVAL_SECONDS: u64 = 60; + +#[instrument(skip(state))] +pub async fn run_config_poller(state: AppState) { + info!("Starting configuration poller..."); + let mut interval = tokio::time::interval(Duration::from_secs(POLLING_INTERVAL_SECONDS)); + interval.tick().await; // Initial tick happens immediately + + loop { + if let Err(e) = poll_and_update_config(&state).await { + error!("Failed to poll and update config: {:?}", e); + } + interval.tick().await; + } +} + +async fn poll_and_update_config(state: &AppState) -> Result<()> { + info!("Polling for data source configurations..."); + let client = reqwest::Client::new(); + let url = format!( + "{}/configs/data_sources", + state.config.data_persistence_service_url + ); + + let response = client.get(&url).send().await?; + response.error_for_status_ref()?; + + let configs: HashMap = response.json().await?; + + let tushare_config = configs.values().find(|cfg| { + matches!(cfg.provider, DataSourceProvider::Tushare) && cfg.enabled + }); + + if let Some(config) = tushare_config { + if let Some(api_key) = &config.api_key { + state.update_provider(Some(SecretString::from(api_key.clone()))).await; + info!("Successfully updated Tushare provider with new configuration."); + } else { + state.update_provider(None).await; + info!("Tushare provider is enabled but API key is missing. Service is degraded."); + } + } else { + state.update_provider(None).await; + info!("No enabled Tushare configuration found. Service is degraded."); + } + + Ok(()) +} diff --git a/services/tushare-provider-service/src/error.rs b/services/tushare-provider-service/src/error.rs index a166b74..10f33de 100644 --- a/services/tushare-provider-service/src/error.rs +++ b/services/tushare-provider-service/src/error.rs @@ -1,27 +1,47 @@ +use anyhow::anyhow; +use reqwest::Error as ReqwestError; use thiserror::Error; #[derive(Error, Debug)] -pub enum ProviderError { - #[error("API request failed: {0}")] - ApiRequest(#[from] reqwest::Error), - - #[error("Failed to parse JSON response: {0}")] - JsonParsing(#[from] serde_json::Error), - - #[error("Tushare API returned an error: code={code}, message='{msg}'")] - TushareApi { code: i64, msg: String }, - +pub enum AppError { #[error("Configuration error: {0}")] Configuration(String), - #[error("Data mapping error: {0}")] + #[error("Data parsing error: {0}")] + DataParsing(#[from] anyhow::Error), + + #[error("Mapping error: {0}")] Mapping(String), - #[error("Persistence client error: {0}")] - Persistence(String), - #[error("Internal error: {0}")] - Internal(#[from] anyhow::Error), + Internal(String), + + #[error("Provider not available: {0}")] + ProviderNotAvailable(String), + + #[error(transparent)] + Reqwest(#[from] ReqwestError), + + #[error(transparent)] + Nats(#[from] async_nats::Error), + + #[error(transparent)] + NatsSubscribe(#[from] async_nats::client::SubscribeError), + + #[error(transparent)] + NatsUnsubscribe(#[from] async_nats::UnsubscribeError), + + #[error(transparent)] + NatsPublish(#[from] async_nats::error::Error), + + #[error(transparent)] + SerdeJson(#[from] serde_json::Error), } -pub type Result = std::result::Result; +impl From for AppError { + fn from(e: config::ConfigError) -> Self { + AppError::Configuration(e.to_string()) + } +} + +pub type Result = std::result::Result; diff --git a/services/tushare-provider-service/src/main.rs b/services/tushare-provider-service/src/main.rs index e5726f8..575b297 100644 --- a/services/tushare-provider-service/src/main.rs +++ b/services/tushare-provider-service/src/main.rs @@ -8,9 +8,10 @@ mod state; mod ts_client; mod tushare; mod worker; +mod config_poller; use crate::config::AppConfig; -use crate::error::{Result, ProviderError}; +use crate::error::{Result, AppError}; use crate::state::AppState; use tracing::info; @@ -24,12 +25,15 @@ async fn main() -> Result<()> { info!("Starting tushare-provider-service..."); // Load configuration - let config = AppConfig::load().map_err(|e| ProviderError::Configuration(e.to_string()))?; + let config = AppConfig::load().map_err(|e| AppError::Configuration(e.to_string()))?; let port = config.server_port; // Initialize application state let app_state = AppState::new(config); + // --- Start the config poller --- + tokio::spawn(config_poller::run_config_poller(app_state.clone())); + // Create the Axum router let app = api::create_router(app_state.clone()); diff --git a/services/tushare-provider-service/src/mapping.rs b/services/tushare-provider-service/src/mapping.rs index 9544396..d0baac3 100644 --- a/services/tushare-provider-service/src/mapping.rs +++ b/services/tushare-provider-service/src/mapping.rs @@ -8,7 +8,7 @@ use rust_decimal::prelude::*; use rust_decimal_macros::dec; use crate::{ - error::ProviderError, + error::AppError, tushare::{ BalanceSheet, Cashflow, Dividend, FinaIndicator, Income, Repurchase, StkHolderNumber, }, @@ -28,7 +28,7 @@ pub struct TushareFinancials { pub fn map_financial_statements( symbol: &str, raw_data: TushareFinancials, -) -> Result, ProviderError> { +) -> Result, AppError> { // 1. Merge all financial data by end_date let mut by_date = merge_financial_data(&raw_data); @@ -251,12 +251,12 @@ fn calculate_derived_metrics(series: &mut SeriesMap) { series.extend(new_series); } -fn flatten_series_to_dtos(symbol: &str, series: SeriesMap) -> Result, ProviderError> { +fn flatten_series_to_dtos(symbol: &str, series: SeriesMap) -> Result, AppError> { let mut dtos: Vec = Vec::new(); for (metric_name, data_points) in series { for point in data_points { let period_date = NaiveDate::parse_from_str(&point.period, "%Y%m%d") - .map_err(|e| ProviderError::Mapping(format!("Invalid period '{}': {}", point.period, e)))?; + .map_err(|e| AppError::Mapping(format!("Invalid period '{}': {}", point.period, e)))?; dtos.push(TimeSeriesFinancialDto { symbol: symbol.to_string(), metric_name: metric_name.clone(), diff --git a/services/tushare-provider-service/src/message_consumer.rs b/services/tushare-provider-service/src/message_consumer.rs index 78b7cf5..92eae79 100644 --- a/services/tushare-provider-service/src/message_consumer.rs +++ b/services/tushare-provider-service/src/message_consumer.rs @@ -1,5 +1,5 @@ -use crate::error::{Result, ProviderError}; -use crate::state::AppState; +use crate::error::Result; +use crate::state::{AppState, ServiceOperationalStatus}; use common_contracts::messages::FetchCompanyDataCommand; use futures_util::StreamExt; use tracing::{error, info, warn}; @@ -7,34 +7,60 @@ use tracing::{error, info, warn}; use std::sync::Arc; use tokio::sync::mpsc; use chrono::Utc; +use std::time::Duration; const SUBJECT_NAME: &str = "data_fetch_commands"; pub async fn run(state: AppState) -> Result<()> { info!("Starting NATS message consumer..."); - let client = async_nats::connect(&state.config.nats_addr) - .await - .map_err(|e| ProviderError::Internal(anyhow::anyhow!("NATS connect failed: {}", e)))?; - info!("Connected to NATS."); - subscribe_to_data_commands(Arc::new(state), client).await + loop { + let status = state.status.read().await.clone(); + if let ServiceOperationalStatus::Degraded { reason } = status { + warn!( + "Service is in degraded state (reason: {}). Pausing message consumption for 30s.", + reason + ); + tokio::time::sleep(Duration::from_secs(30)).await; + continue; + } + + info!("Service is Active. Connecting to NATS..."); + match async_nats::connect(&state.config.nats_addr).await { + Ok(client) => { + info!("Successfully connected to NATS."); + if let Err(e) = subscribe_and_process(state.clone(), client).await { + error!("NATS subscription error: {}. Reconnecting in 10s...", e); + } + } + Err(e) => { + error!("Failed to connect to NATS: {}. Retrying in 10s...", e); + } + } + tokio::time::sleep(Duration::from_secs(10)).await; + } } -pub async fn subscribe_to_data_commands(app_state: Arc, nats_client: async_nats::Client) -> Result<()> { - // This is a simple subscriber. For production, consider JetStream for durability. - let mut subscriber = nats_client - .subscribe(SUBJECT_NAME.to_string()) - .await - .map_err(|e| ProviderError::Internal(anyhow::anyhow!("NATS subscribe failed: {}", e)))?; - +async fn subscribe_and_process( + state: AppState, + client: async_nats::Client, +) -> Result<()> { + let mut subscriber = client.subscribe(SUBJECT_NAME.to_string()).await?; info!( "Consumer started, waiting for messages on subject '{}'", SUBJECT_NAME ); while let Some(message) = subscriber.next().await { + let current_status = state.status.read().await.clone(); + if matches!(current_status, ServiceOperationalStatus::Degraded {..}) { + warn!("Service became degraded. Disconnecting from NATS and pausing consumption."); + subscriber.unsubscribe().await?; + return Ok(()); + } + info!("Received NATS message."); - let state_for_closure = app_state.clone(); + let state_for_closure = Arc::new(state.clone()); tokio::spawn(async move { if let Err(e) = serde_json::from_slice::(&message.payload) { diff --git a/services/tushare-provider-service/src/state.rs b/services/tushare-provider-service/src/state.rs index e41ed94..bddebc3 100644 --- a/services/tushare-provider-service/src/state.rs +++ b/services/tushare-provider-service/src/state.rs @@ -1,31 +1,72 @@ -use std::sync::Arc; - -use dashmap::DashMap; -use uuid::Uuid; - -use common_contracts::observability::TaskProgress; - use crate::config::AppConfig; use crate::tushare::TushareDataProvider; +use common_contracts::observability::TaskProgress; +use dashmap::DashMap; +use secrecy::{ExposeSecret, SecretString}; +use std::sync::Arc; +use tokio::sync::RwLock; +use uuid::Uuid; + +#[derive(Clone, Debug)] +pub enum ServiceOperationalStatus { + Active, + Degraded { reason: String }, +} #[derive(Clone)] pub struct AppState { pub tasks: Arc>, pub config: Arc, - pub tushare_provider: Arc, + pub status: Arc>, + tushare_provider: Arc>>, } impl AppState { pub fn new(config: AppConfig) -> Self { - let provider = Arc::new(TushareDataProvider::new( - config.tushare_api_url.clone(), - config.tushare_api_token.clone(), - )); + let (initial_provider, initial_status) = + if let Some(api_key) = config.tushare_api_token.as_ref() { + let provider = TushareDataProvider::new( + config.tushare_api_url.clone(), + api_key.expose_secret().clone(), + ); + (Some(provider), ServiceOperationalStatus::Active) + } else { + ( + None, + ServiceOperationalStatus::Degraded { + reason: "Tushare API Key is not configured.".to_string(), + }, + ) + }; Self { tasks: Arc::new(DashMap::new()), config: Arc::new(config), - tushare_provider: provider, + status: Arc::new(RwLock::new(initial_status)), + tushare_provider: Arc::new(RwLock::new(initial_provider)), + } + } + + pub async fn get_provider(&self) -> Option { + self.tushare_provider.read().await.clone() + } + + pub async fn update_provider(&self, api_key: Option) { + let mut provider_guard = self.tushare_provider.write().await; + let mut status_guard = self.status.write().await; + + if let Some(key) = api_key { + let new_provider = TushareDataProvider::new( + self.config.tushare_api_url.clone(), + key.expose_secret().clone(), + ); + *provider_guard = Some(new_provider); + *status_guard = ServiceOperationalStatus::Active; + } else { + *provider_guard = None; + *status_guard = ServiceOperationalStatus::Degraded { + reason: "Tushare API Key is not configured.".to_string(), + }; } } } diff --git a/services/tushare-provider-service/src/ts_client.rs b/services/tushare-provider-service/src/ts_client.rs index 9fe9d2f..7351c85 100644 --- a/services/tushare-provider-service/src/ts_client.rs +++ b/services/tushare-provider-service/src/ts_client.rs @@ -1,4 +1,4 @@ -use crate::error::ProviderError; +use crate::error::AppError; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tracing::info; @@ -46,7 +46,7 @@ impl TushareClient { api_name: &str, params: serde_json::Value, fields: &str, - ) -> Result, ProviderError> { + ) -> Result, AppError> { let request_payload = TushareRequest { api_name, token: &self.api_token, @@ -67,15 +67,16 @@ impl TushareClient { let response: TushareResponse = serde_json::from_str(&text)?; if response.code != 0 { - return Err(ProviderError::TushareApi { - code: response.code, - msg: response.msg, - }); + return Err(AppError::DataParsing(anyhow::anyhow!(format!( + "Tushare API error code {}: {}", + response.code, response.msg + )))); } - let data = response.data.ok_or_else(|| ProviderError::TushareApi { - code: -1, - msg: "No data field in response".to_string(), + let data = response.data.ok_or_else(|| { + AppError::DataParsing(anyhow::anyhow!( + "Tushare response missing data field" + )) })?; let items = data diff --git a/services/tushare-provider-service/src/tushare.rs b/services/tushare-provider-service/src/tushare.rs index c230b74..7c0adad 100644 --- a/services/tushare-provider-service/src/tushare.rs +++ b/services/tushare-provider-service/src/tushare.rs @@ -4,7 +4,7 @@ use serde_json::json; use tokio; use crate::{ - error::ProviderError, + error::AppError, mapping::{map_financial_statements, TushareFinancials}, ts_client::TushareClient, }; @@ -25,7 +25,7 @@ impl TushareDataProvider { pub async fn fetch_all_data( &self, symbol: &str, - ) -> Result<(CompanyProfileDto, Vec), ProviderError> { + ) -> Result<(CompanyProfileDto, Vec), AppError> { let ( stock_basic, stock_company, @@ -42,18 +42,18 @@ impl TushareDataProvider { let ts_code = stock_basic .get(0) .map(|r| r.ts_code.clone()) - .ok_or_else(|| ProviderError::Mapping("stock_basic missing first row".to_string()))?; + .ok_or_else(|| AppError::Mapping("stock_basic missing first row".to_string()))?; let name = stock_basic .get(0) .and_then(|r| r.name.clone()) - .ok_or_else(|| ProviderError::Mapping("stock_basic.name missing".to_string()))?; + .ok_or_else(|| AppError::Mapping("stock_basic.name missing".to_string()))?; let industry = stock_basic.get(0).and_then(|r| r.industry.clone()); let list_date = stock_basic .get(0) .and_then(|r| r.list_date.clone()) .map(|d| NaiveDate::parse_from_str(&d, "%Y%m%d")) .transpose() - .map_err(|e| ProviderError::Mapping(format!("Invalid list_date: {}", e)))?; + .map_err(|e| AppError::Mapping(format!("Invalid list_date: {}", e)))?; let profile = CompanyProfileDto { symbol: ts_code, @@ -94,7 +94,7 @@ impl TushareDataProvider { Vec, Vec, ), - ProviderError, + AppError, > { let params = json!({ "ts_code": symbol }); diff --git a/services/tushare-provider-service/src/worker.rs b/services/tushare-provider-service/src/worker.rs index 3403590..d30823d 100644 --- a/services/tushare-provider-service/src/worker.rs +++ b/services/tushare-provider-service/src/worker.rs @@ -5,42 +5,52 @@ use common_contracts::{ messages::{CompanyProfilePersistedEvent, FetchCompanyDataCommand, FinancialsPersistedEvent}, }; use tokio::sync::mpsc; -use tracing::info; +use tracing::{info, error}; use chrono::Datelike; -use crate::{error::ProviderError, persistence::PersistenceClient, state::AppState}; +use crate::{error::AppError, persistence::PersistenceClient, state::AppState}; pub async fn run_tushare_workflow( state: Arc, command: FetchCompanyDataCommand, completion_tx: mpsc::Sender<()>, -) -> Result<(), ProviderError> { +) -> Result<(), AppError> { let task_id = command.request_id; let symbol = command.symbol.clone(); + let provider = match state.get_provider().await { + Some(p) => p, + None => { + let reason = "Execution failed: Tushare provider is not available (misconfigured).".to_string(); + error!("{}", reason); + if let Some(mut task) = state.tasks.get_mut(&task_id) { + task.status = "Failed".to_string(); + task.details = reason.clone(); + } + return Err(AppError::ProviderNotAvailable(reason)); + } + }; + // 1. Update task progress: Fetching data { let mut entry = state .tasks .get_mut(&task_id) - .ok_or_else(|| ProviderError::Internal(anyhow::anyhow!("Task not found")))?; + .ok_or_else(|| AppError::Internal("Task not found".to_string()))?; entry.status = "FetchingData".to_string(); entry.progress_percent = 10; entry.details = "Starting data fetch from Tushare".to_string(); } // 2. Fetch data using the provider - let (profile, financials) = state - .tushare_provider - .fetch_all_data(&symbol) - .await?; + let (profile, financials) = provider.fetch_all_data(&symbol).await?; // 3. Update task progress: Persisting data { let mut entry = state .tasks .get_mut(&task_id) - .ok_or_else(|| ProviderError::Internal(anyhow::anyhow!("Task not found")))?; + .ok_or_else(|| AppError::Internal("Task not found".to_string()))?; entry.status = "PersistingData".to_string(); entry.progress_percent = 60; entry.details = "Data fetched, persisting to database".to_string(); @@ -60,7 +70,7 @@ pub async fn run_tushare_workflow( // 5. Publish events let nats_client = async_nats::connect(&state.config.nats_addr) .await - .map_err(|e| ProviderError::Internal(anyhow::anyhow!("NATS connection failed: {}", e)))?; + .map_err(|e| AppError::Internal(format!("NATS connection failed: {}", e)))?; publish_events(&nats_client, &command, &financials).await?; @@ -69,7 +79,7 @@ pub async fn run_tushare_workflow( let mut entry = state .tasks .get_mut(&task_id) - .ok_or_else(|| ProviderError::Internal(anyhow::anyhow!("Task not found")))?; + .ok_or_else(|| AppError::Internal("Task not found".to_string()))?; entry.status = "Completed".to_string(); entry.progress_percent = 100; entry.details = "Workflow finished successfully".to_string(); @@ -91,7 +101,7 @@ async fn persist_data( financials: &[TimeSeriesFinancialDto], state: &Arc, task_id: uuid::Uuid, -) -> Result<(), ProviderError> { +) -> Result<(), AppError> { // In a real implementation, we'd use tokio::try_join! to run these in parallel. if let Err(e) = client.upsert_company_profile(profile.clone()).await { state @@ -128,7 +138,7 @@ async fn publish_events( nats_client: &async_nats::Client, command: &FetchCompanyDataCommand, financials: &[TimeSeriesFinancialDto], -) -> Result<(), ProviderError> { +) -> Result<(), AppError> { let profile_event = CompanyProfilePersistedEvent { request_id: command.request_id, symbol: command.symbol.clone(), @@ -138,8 +148,7 @@ async fn publish_events( "events.data.company_profile_persisted", serde_json::to_vec(&profile_event).unwrap().into(), ) - .await - .map_err(|e| ProviderError::Internal(anyhow::anyhow!("Event publishing failed: {}", e)))?; + .await?; let years: std::collections::BTreeSet = financials .iter() @@ -155,8 +164,7 @@ async fn publish_events( "events.data.financials_persisted", serde_json::to_vec(&financials_event).unwrap().into(), ) - .await - .map_err(|e| ProviderError::Internal(anyhow::anyhow!("Event publishing failed: {}", e)))?; + .await?; Ok(()) }