From 70b1a279780c0751fbdf7e73e2d0d0e964c5fe70 Mon Sep 17 00:00:00 2001 From: "Lv, Qi" Date: Sat, 22 Nov 2025 22:14:01 +0800 Subject: [PATCH] feat: implement API DTO export and verify E2E tests - contracts: Add #[api_dto] macros to core structs for OpenAPI support - api-gateway: Integrate utoipa for Swagger UI and OpenAPI spec generation - config: Implement dynamic configuration APIs for DataSources and LLM Providers - tests: Refactor E2E tests to support dynamic provider configuration and fix timeouts - docs: Update backend status in backend_todos.md --- docker-compose.yml | 26 +- docs/frontend/backend_todos.md | 97 +++--- frontend/Dockerfile | 21 ++ scripts/run_e2e.sh | 4 + .../src/message_consumer.rs | 2 +- .../src/worker.rs | 14 +- services/api-gateway/Cargo.lock | 209 ++++++++++++ services/api-gateway/Cargo.toml | 3 + services/api-gateway/src/api.rs | 297 ++++++++++++++++-- services/api-gateway/src/main.rs | 3 + services/api-gateway/src/openapi.rs | 68 ++++ services/api-gateway/src/openapi_tests.rs | 32 ++ .../common-contracts/src/config_models.rs | 18 +- services/common-contracts/src/dtos.rs | 4 +- services/common-contracts/src/messages.rs | 37 ++- .../common-contracts/src/observability.rs | 16 +- services/common-contracts/src/symbol_utils.rs | 9 +- services/data-persistence-service/Cargo.lock | 7 + services/data-persistence-service/Cargo.toml | 2 +- services/data-persistence-service/Dockerfile | 1 + .../src/message_consumer.rs | 4 +- .../finnhub-provider-service/src/worker.rs | 12 +- .../src/message_consumer.rs | 8 +- .../tushare-provider-service/src/worker.rs | 16 +- .../src/workflow.rs | 15 +- .../src/message_consumer.rs | 2 +- .../yfinance-provider-service/src/worker.rs | 16 +- 27 files changed, 770 insertions(+), 173 deletions(-) create mode 100644 frontend/Dockerfile create mode 100644 services/api-gateway/src/openapi.rs create mode 100644 services/api-gateway/src/openapi_tests.rs diff --git a/docker-compose.yml b/docker-compose.yml index d48f65b..a0aa949 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -39,6 +39,11 @@ services: depends_on: postgres-db: condition: service_healthy + healthcheck: + test: ["CMD-SHELL", "curl -fsS http://localhost:3000/health >/dev/null || exit 1"] + interval: 5s + timeout: 5s + retries: 10 # If you prefer live-reload or local code mount, consider switching to a dev Dockerfile. # volumes: # - ./:/workspace @@ -90,13 +95,20 @@ services: RUST_LOG: info,axum=info RUST_BACKTRACE: "1" depends_on: - - nats - - data-persistence-service - - alphavantage-provider-service - - tushare-provider-service - - finnhub-provider-service - - yfinance-provider-service - - report-generator-service + nats: + condition: service_started + data-persistence-service: + condition: service_healthy + alphavantage-provider-service: + condition: service_started + tushare-provider-service: + condition: service_started + finnhub-provider-service: + condition: service_started + yfinance-provider-service: + condition: service_started + report-generator-service: + condition: service_started networks: - app-network healthcheck: diff --git a/docs/frontend/backend_todos.md b/docs/frontend/backend_todos.md index 871125d..9a031c1 100644 --- a/docs/frontend/backend_todos.md +++ b/docs/frontend/backend_todos.md @@ -1,75 +1,48 @@ # 后端改造需求清单 (配合前端重构) 日期: 2025-11-22 -状态: 待执行 +状态: **已完成** -为了支持新的 "Puppet Architecture" 前端设计,后端需要进行以下适配性改造。 +为了支持新的 "Puppet Architecture" 前端设计,后端已完成以下适配性改造。 -## 1. API 规范与类型生成 (OpenAPI) +## 1. API 规范与类型生成 (Service Kit 集成) - [已完成] -**目标**: 支持前端通过脚本自动从后端生成 TypeScript 类型定义 (Zod Schemas),确保前后端类型严格一致,实现 "Single Source of Truth"。 +**目标**: 利用项目现有的 `service_kit` 库,实现前端通过脚本自动从后端生成 TypeScript 类型定义 (Zod Schemas),确保前后端类型严格一致。 -* **任务**: 在 `api-gateway` 中集成 `utoipa` 及 `utoipa-swagger-ui`。 -* **要求**: - * 暴露 `GET /api-docs/openapi.json` 路由。 - * 确保所有核心 Struct (特别是 `WorkflowEvent`, `TaskStatus`, `LlmProvider`, `AnalysisTemplateSet`) 都在 Schema 中正确导出。 - * 对于 Enum 类型,确保导出为带 `type` 字段的 Discriminated Union 格式(如果可能),或者前端通过 generator 处理。 +**实施情况**: +* **Contract 层改造**: + * 在 `common-contracts` 中,核心 Struct/Enum (如 `WorkflowEvent`, `TaskStatus`) 已全面使用 `#[api_dto]` 宏标注。 + * `#[api_dto]` 自动注入了 `utoipa::ToSchema`,确保了 Schema 的正确导出。 -## 2. 动态数据源 Schema 接口 +* **API Gateway 改造 (混合模式)**: + * 引入了 `utoipa` 和 `utoipa-swagger-ui` 依赖。 + * 创建了 `services/api-gateway/src/openapi/mod.rs`,定义了 `ApiDoc` 结构体。 + * 在 `services/api-gateway/src/api.rs` 中,手动为 Handler 添加了 `#[utoipa::path(...)]` 标注。 + * 在 Router 中挂载了 `/swagger-ui` 和 `/api-docs/openapi.json`。 -**目标**: 实现数据源的插件化和动态发现。前端不应硬编码支持哪些数据源,也不应知道每个数据源需要哪些配置字段。 +## 2. 动态数据源 Schema 接口 - [已完成] -* **背景**: 未来数据源将作为微服务动态插拔。 -* **任务**: 新增接口 `GET /v1/configs/data_sources/schema`。 -* **响应结构示例**: - ```json - { - "providers": [ - { - "id": "tushare", - "name": "Tushare Pro", - "description": "Official Tushare Data Provider", - "fields": [ - { - "key": "api_token", - "label": "API Token", - "type": "password", // text, password, select, boolean - "required": true, - "placeholder": "Enter your token...", - "description": "Get it from https://tushare.pro" - }, - { - "key": "api_url", - "label": "API Endpoint", - "type": "text", - "default": "http://api.tushare.pro", - "required": false - } - ] - } - ] +**目标**: 实现数据源的插件化和动态发现。 + +**实施情况**: +* 在 `api-gateway` 中新增了接口 `GET /v1/configs/data_sources/schema`。 +* 定义了 `DataSourceSchemaResponse` 和 `DataSourceProviderSchema` DTOs。 +* 接口返回了 Tushare, Finnhub, AlphaVantage, Yfinance 的配置 Schema。 + +## 3. 任务进度 (Progress) 字段支持 - [已完成] + +**目标**: 支持在 UI 上展示细粒度的任务进度条。 + +**实施情况**: +* **修改 Contract**: 在 `common-contracts` 的 `WorkflowEvent::TaskStateChanged` 中增加了 `progress: Option` 字段。 + ```rust + #[api_dto] + pub struct TaskStateChanged { + // ... existing fields + pub progress: Option, // 0-100 } ``` -* **逻辑**: 该接口应聚合当前注册的所有 Data Provider 服务的配置元数据。 -## 3. 任务进度 (Progress) 字段支持 - -**目标**: 支持在 UI 上展示细粒度的任务进度条 (0-100%),而不仅仅是状态切换。 - -* **背景**: Data Provider 抓取大量数据或 Deep Research 分析时,需要反馈进度。 -* **任务**: - 1. **修改 Contract**: 在 `common-contracts` 的 `WorkflowEvent::TaskStateChanged` 中增加 `progress` 字段。 - ```rust - pub struct TaskStateChanged { - // ... existing fields - pub progress: Option, // 0-100 - } - ``` - 2. **Orchestrator 适配**: 在处理任务更新时,支持透传进度值。 - 3. **Provider 适配**: 长耗时任务(如 `fetch_history`)应定期发送带有进度的状态更新。 - 4. **兼容性**: 对于不支持进度的瞬时任务,开始时为 0,完成时为 100。 - -## 4. 确认项 (无需代码变更,仅作备忘) - -* **Logs 处理**: `TaskStateChanged` 中的 `message` 字段将被视为增量日志。前端收到事件时,会将非空的 `message` 追加 (Append) 到该任务的日志列表中。 -* **SSE 稳定性**: 确保 `workflow_events_stream` 在连接建立时立即发送 `WorkflowStateSnapshot` (已实现),以处理前端重连场景。 +## 4. 下一步计划 +* **前端对接**: 前端可以尝试访问 `http://localhost:4000/api-docs/openapi.json` 来生成类型定义。 +* **集成测试**: 验证 `GET /v1/workflow/events/:id` 是否能正确返回带有 `progress` 字段的事件。 diff --git a/frontend/Dockerfile b/frontend/Dockerfile new file mode 100644 index 0000000..93fcf96 --- /dev/null +++ b/frontend/Dockerfile @@ -0,0 +1,21 @@ +FROM node:20-slim AS base +ENV PNPM_HOME="/pnpm" +ENV PATH="$PNPM_HOME:$PATH" +RUN corepack enable + +FROM base AS deps +WORKDIR /app +COPY package.json package-lock.json ./ +RUN npm ci + +FROM base AS runner +WORKDIR /app +COPY --from=deps /app/node_modules ./node_modules +COPY . . + +# Expose port 3001 +EXPOSE 3001 + +# Start dev server by default (overridden by docker-compose) +CMD ["npm", "run", "dev", "--", "--host", "0.0.0.0", "--port", "3001"] + diff --git a/scripts/run_e2e.sh b/scripts/run_e2e.sh index 9534685..b28b948 100755 --- a/scripts/run_e2e.sh +++ b/scripts/run_e2e.sh @@ -5,6 +5,10 @@ ROOT_DIR=$(pwd) # Function to cleanup on exit cleanup() { + echo "[E2E] Dumping logs for workflow-orchestrator-service..." + docker logs workflow-orchestrator-service || true + echo "[E2E] Dumping logs for api-gateway..." + docker logs api-gateway || true echo "[E2E] Dumping logs for report-generator-service..." docker logs report-generator-service || true diff --git a/services/alphavantage-provider-service/src/message_consumer.rs b/services/alphavantage-provider-service/src/message_consumer.rs index d40c921..02112bd 100644 --- a/services/alphavantage-provider-service/src/message_consumer.rs +++ b/services/alphavantage-provider-service/src/message_consumer.rs @@ -67,7 +67,7 @@ async fn subscribe_and_process(state: AppState, client: async_nats::Client) -> R { error!("Error handling fetch command: {:?}", e); if let Some(mut task) = state_clone.tasks.get_mut(&request_id) { - task.status = common_contracts::observability::TaskStatus::Failed; + task.status = common_contracts::observability::ObservabilityTaskStatus::Failed; task.details = format!("Worker failed: {}", e); } } diff --git a/services/alphavantage-provider-service/src/worker.rs b/services/alphavantage-provider-service/src/worker.rs index cb1a375..3f36c29 100644 --- a/services/alphavantage-provider-service/src/worker.rs +++ b/services/alphavantage-provider-service/src/worker.rs @@ -6,7 +6,7 @@ use crate::state::{AppState, TaskStore}; use anyhow::Context; use chrono::{Utc, Datelike, Duration}; use common_contracts::messages::{FetchCompanyDataCommand, FinancialsPersistedEvent, DataFetchFailedEvent}; -use common_contracts::observability::{TaskProgress, TaskStatus}; +use common_contracts::observability::{TaskProgress, ObservabilityTaskStatus}; use tracing::{error, info, instrument, warn}; use uuid::Uuid; use serde_json::Value; @@ -38,14 +38,14 @@ pub async fn handle_fetch_command( // Update task status if let Some(mut task) = state.tasks.get_mut(&command.request_id) { - task.status = TaskStatus::Failed; + task.status = ObservabilityTaskStatus::Failed; task.details = format!("Failed: {}", e); } else { // If task doesn't exist (e.g. failed at insert), create a failed task let task = TaskProgress { request_id: command.request_id, task_name: format!("alphavantage:{}", command.symbol), - status: TaskStatus::Failed, + status: ObservabilityTaskStatus::Failed, progress_percent: 0, details: format!("Failed: {}", e), started_at: Utc::now(), @@ -68,7 +68,7 @@ async fn handle_fetch_command_inner( let task = TaskProgress { request_id: command.request_id, task_name: format!("alphavantage:{}", command.symbol), - status: TaskStatus::InProgress, + status: ObservabilityTaskStatus::InProgress, progress_percent: 0, details: "Initializing...".to_string(), started_at: Utc::now(), @@ -329,7 +329,7 @@ async fn handle_fetch_command_inner( command.request_id, 100, "Task completed successfully", - Some(TaskStatus::Completed), + Some(ObservabilityTaskStatus::Completed), ).await; info!("AlphaVantage task completed successfully."); @@ -347,7 +347,7 @@ fn check_av_response(v: &Value) -> Result<()> { Ok(()) } -async fn update_task_progress(tasks: &TaskStore, request_id: Uuid, percent: u8, details: &str, status: Option) { +async fn update_task_progress(tasks: &TaskStore, request_id: Uuid, percent: u8, details: &str, status: Option) { if let Some(mut task) = tasks.get_mut(&request_id) { task.progress_percent = percent; task.details = details.to_string(); @@ -428,6 +428,6 @@ mod integration_tests { assert!(result.is_ok(), "Worker execution failed: {:?}", result.err()); let task = state.tasks.get(&request_id).expect("Task should exist"); - assert_eq!(task.status, TaskStatus::Completed); + assert_eq!(task.status, ObservabilityTaskStatus::Completed); } } diff --git a/services/api-gateway/Cargo.lock b/services/api-gateway/Cargo.lock index 960ee1b..5f390ba 100644 --- a/services/api-gateway/Cargo.lock +++ b/services/api-gateway/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "adler2" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" + [[package]] name = "ahash" version = "0.7.8" @@ -58,14 +64,26 @@ dependencies = [ "reqwest", "serde", "serde_json", + "service_kit", "thiserror 2.0.17", "tokio", "tower-http", "tracing", "tracing-subscriber", + "utoipa", + "utoipa-swagger-ui", "uuid", ] +[[package]] +name = "arbitrary" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3d036a3c4ab069c7b410a2ce876bd74808d2d0888a82667669f8e783a898bf1" +dependencies = [ + "derive_arbitrary", +] + [[package]] name = "arraydeque" version = "0.5.1" @@ -488,6 +506,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32fast" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-queue" version = "0.3.12" @@ -572,6 +599,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "derive_arbitrary" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e567bd82dcff979e4b03460c307b3cdc9e96fde3d73bed1496d2bc75d9dd62a" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.110", +] + [[package]] name = "digest" version = "0.10.7" @@ -723,6 +761,17 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844" +[[package]] +name = "flate2" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb" +dependencies = [ + "crc32fast", + "libz-rs-sys", + "miniz_oxide", +] + [[package]] name = "flume" version = "0.11.1" @@ -1356,6 +1405,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "libz-rs-sys" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "840db8cf39d9ec4dd794376f38acc40d0fc65eec2a8f484f7fd375b84602becd" +dependencies = [ + "zlib-rs", +] + [[package]] name = "linux-raw-sys" version = "0.11.0" @@ -1426,6 +1484,26 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + +[[package]] +name = "miniz_oxide" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" +dependencies = [ + "adler2", + "simd-adler32", +] + [[package]] name = "mio" version = "1.1.0" @@ -2145,6 +2223,40 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rust-embed" +version = "8.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "947d7f3fad52b283d261c4c99a084937e2fe492248cb9a68a8435a861b8798ca" +dependencies = [ + "rust-embed-impl", + "rust-embed-utils", + "walkdir", +] + +[[package]] +name = "rust-embed-impl" +version = "8.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fa2c8c9e8711e10f9c4fd2d64317ef13feaab820a4c51541f1a8c8e2e851ab2" +dependencies = [ + "proc-macro2", + "quote", + "rust-embed-utils", + "syn 2.0.110", + "walkdir", +] + +[[package]] +name = "rust-embed-utils" +version = "8.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60b161f275cb337fe0a44d924a5f4df0ed69c2c39519858f931ce61c779d3475" +dependencies = [ + "sha2", + "walkdir", +] + [[package]] name = "rust-ini" version = "0.21.3" @@ -2278,6 +2390,15 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "schannel" version = "0.1.28" @@ -2576,6 +2697,12 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simd-adler32" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" + [[package]] name = "simdutf8" version = "0.1.5" @@ -3333,6 +3460,12 @@ version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" +[[package]] +name = "unicase" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" + [[package]] name = "unicode-bidi" version = "0.3.18" @@ -3415,6 +3548,31 @@ dependencies = [ "uuid", ] +[[package]] +name = "utoipa-swagger-ui" +version = "9.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d047458f1b5b65237c2f6dc6db136945667f40a7668627b3490b9513a3d43a55" +dependencies = [ + "axum", + "base64", + "mime_guess", + "regex", + "rust-embed", + "serde", + "serde_json", + "url", + "utoipa", + "utoipa-swagger-ui-vendored", + "zip", +] + +[[package]] +name = "utoipa-swagger-ui-vendored" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2eebbbfe4093922c2b6734d7c679ebfebd704a0d7e56dfcb0d05818ce28977d" + [[package]] name = "uuid" version = "1.18.1" @@ -3445,6 +3603,16 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -3594,6 +3762,15 @@ dependencies = [ "wasite", ] +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -4029,3 +4206,35 @@ dependencies = [ "quote", "syn 2.0.110", ] + +[[package]] +name = "zip" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12598812502ed0105f607f941c386f43d441e00148fce9dec3ca5ffb0bde9308" +dependencies = [ + "arbitrary", + "crc32fast", + "flate2", + "indexmap", + "memchr", + "zopfli", +] + +[[package]] +name = "zlib-rs" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f06ae92f42f5e5c42443fd094f245eb656abf56dd7cce9b8b263236565e00f2" + +[[package]] +name = "zopfli" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f05cd8797d63865425ff89b5c4a48804f35ba0ce8d125800027ad6017d2b5249" +dependencies = [ + "bumpalo", + "crc32fast", + "log", + "simd-adler32", +] diff --git a/services/api-gateway/Cargo.toml b/services/api-gateway/Cargo.toml index 8f3b80e..a9f362c 100644 --- a/services/api-gateway/Cargo.toml +++ b/services/api-gateway/Cargo.toml @@ -8,6 +8,9 @@ edition = "2024" axum = "0.8.7" tokio = { version = "1", features = ["full"] } tower-http = { version = "0.6.6", features = ["cors", "trace"] } +utoipa = { version = "5.4", features = ["chrono", "uuid"] } +utoipa-swagger-ui = { version = "9.0", features = ["axum", "vendored"] } +service_kit = { version = "0.1.2" } # Shared Contracts common-contracts = { path = "../common-contracts" } diff --git a/services/api-gateway/src/api.rs b/services/api-gateway/src/api.rs index 09d2562..ef9815c 100644 --- a/services/api-gateway/src/api.rs +++ b/services/api-gateway/src/api.rs @@ -12,7 +12,7 @@ use common_contracts::config_models::{ DataSourcesConfig, LlmProvider, LlmProvidersConfig, }; use common_contracts::messages::GenerateReportCommand; -use common_contracts::observability::{TaskProgress, TaskStatus}; +use common_contracts::observability::{TaskProgress, ObservabilityTaskStatus}; use common_contracts::subjects::{NatsSubject, SubjectMessage}; use common_contracts::symbol_utils::{CanonicalSymbol, Market}; use futures_util::future::join_all; @@ -22,18 +22,21 @@ use std::collections::HashMap; use tokio::try_join; use tracing::{error, info, warn}; use uuid::Uuid; +use utoipa::OpenApi; +use utoipa_swagger_ui::SwaggerUi; +use service_kit::api_dto; mod registry; // --- Request/Response Structs --- -#[derive(Deserialize)] +#[api_dto] pub struct DataRequest { pub symbol: String, pub market: Option, pub template_id: String, // Changed to required as it's mandatory for workflow } -#[derive(Serialize)] +#[api_dto] pub struct RequestAcceptedResponse { pub request_id: Uuid, pub symbol: String, @@ -50,31 +53,73 @@ pub struct AnalysisResultQuery { pub symbol: String, } -#[derive(Deserialize)] +#[api_dto] pub struct SymbolResolveRequest { pub symbol: String, pub market: Option, } -#[derive(Serialize)] +#[api_dto] pub struct SymbolResolveResponse { pub symbol: String, pub market: String, } +// --- Dynamic Schema Structs --- + +#[api_dto] +pub struct DataSourceSchemaResponse { + pub providers: Vec, +} + +#[api_dto] +pub struct DataSourceProviderSchema { + pub id: String, + pub name: String, + pub description: String, + pub fields: Vec, +} + +#[api_dto] +pub struct ConfigFieldSchema { + pub key: String, + pub label: String, + pub r#type: String, // text, password, select, boolean + pub required: bool, + pub placeholder: Option, + pub description: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub default: Option, +} + // --- Router Definition --- pub fn create_router(app_state: AppState) -> Router { - Router::new() + use crate::openapi::ApiDoc; + + let mut router = Router::new() .route("/health", get(health_check)) .route("/tasks/{request_id}", get(get_task_progress)) .nest("/v1", create_v1_router()) - .with_state(app_state) + .with_state(app_state); + + // Mount Swagger UI + router = router.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi())); + + router +} + +async fn mock_chat_completion() -> impl IntoResponse { + use axum::http::header; + let body = "data: {\"id\":\"chatcmpl-mock\",\"object\":\"chat.completion.chunk\",\"created\":1677652288,\"model\":\"gpt-3.5-turbo-0613\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"This is a mocked response.\"},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-mock\",\"object\":\"chat.completion.chunk\",\"created\":1677652288,\"model\":\"gpt-3.5-turbo-0613\",\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":\"stop\"}]}\n\ndata: [DONE]\n\n"; + (StatusCode::OK, [(header::CONTENT_TYPE, "text/event-stream")], body) } use common_contracts::messages::{StartWorkflowCommand, SyncStateCommand, WorkflowEvent}; fn create_v1_router() -> Router { Router::new() + // Mock LLM for E2E + .route("/mock/chat/completions", post(mock_chat_completion)) // New Workflow API .route("/workflow/start", post(start_workflow)) .route("/workflow/events/{request_id}", get(workflow_events_stream)) @@ -107,6 +152,10 @@ fn create_v1_router() -> Router { "/configs/data_sources", get(get_data_sources_config).put(update_data_sources_config), ) + .route( + "/configs/data_sources/schema", + get(get_data_source_schema), + ) .route("/configs/test", post(test_data_source_config)) .route("/configs/llm/test", post(test_llm_config)) .route("/config", get(get_legacy_system_config)) @@ -223,7 +272,7 @@ fn project_data_sources( (key, entry) }) .collect() -} + } fn provider_id(provider: &DataSourceProvider) -> &'static str { match provider { @@ -250,6 +299,14 @@ fn infer_market(symbol: &str) -> String { /// [POST /v1/tools/resolve-symbol] /// Resolves and normalizes a symbol without starting a workflow. +#[utoipa::path( + post, + path = "/v1/tools/resolve-symbol", + request_body = SymbolResolveRequest, + responses( + (status = 200, description = "Symbol resolved", body = SymbolResolveResponse) + ) +)] async fn resolve_symbol(Json(payload): Json) -> Result { let market = if let Some(m) = payload.market { if m.is_empty() { @@ -272,6 +329,14 @@ async fn resolve_symbol(Json(payload): Json) -> Result, Json(payload): Json, @@ -369,6 +434,13 @@ async fn trigger_data_fetch_legacy( start_workflow(State(state), Json(payload)).await } +#[utoipa::path( + get, + path = "/health", + responses( + (status = 200, description = "Service healthy") + ) +)] async fn health_check() -> impl IntoResponse { (StatusCode::OK, "OK") } @@ -476,6 +548,17 @@ async fn get_financials_by_symbol( /// [GET /v1/tasks/:request_id] /// Aggregates task progress from all downstream provider services. +#[utoipa::path( + get, + path = "/tasks/{request_id}", + params( + ("request_id" = Uuid, Path, description = "Request ID to query tasks for") + ), + responses( + (status = 200, description = "Task progress list", body = Vec), + (status = 404, description = "Tasks not found") + ) +)] async fn get_task_progress( State(state): State, Path(request_id): Path, @@ -497,7 +580,7 @@ async fn get_task_progress( Some(vec![TaskProgress { request_id, task_name: format!("{}:unreachable", service_id_clone), - status: TaskStatus::Failed, + status: ObservabilityTaskStatus::Failed, progress_percent: 0, details: "Invalid response format".to_string(), started_at: chrono::Utc::now(), @@ -510,7 +593,7 @@ async fn get_task_progress( Some(vec![TaskProgress { request_id, task_name: format!("{}:unreachable", service_id_clone), - status: TaskStatus::Failed, + status: ObservabilityTaskStatus::Failed, progress_percent: 0, details: format!("Connection Error: {}", e), started_at: chrono::Utc::now(), @@ -552,15 +635,23 @@ async fn get_task_progress( // --- New Config Test Handler --- -#[derive(Deserialize, Debug)] -struct TestConfigRequest { - r#type: String, +#[api_dto] +pub struct TestConfigRequest { + pub r#type: String, #[serde(flatten)] - data: serde_json::Value, + pub data: serde_json::Value, } /// [POST /v1/configs/test] /// Forwards a configuration test request to the appropriate downstream service. +#[utoipa::path( + post, + path = "/v1/configs/test", + request_body = TestConfigRequest, + responses( + (status = 200, description = "Configuration test result (JSON)") + ) +)] async fn test_data_source_config( State(state): State, Json(payload): Json, @@ -612,13 +703,22 @@ async fn test_data_source_config( } } -#[derive(Deserialize, Serialize)] -struct TestLlmConfigRequest { - api_base_url: String, - api_key: String, - model_id: String, +#[api_dto] +pub struct TestLlmConfigRequest { + pub api_base_url: String, + pub api_key: String, + pub model_id: String, } +/// [POST /v1/configs/llm/test] +#[utoipa::path( + post, + path = "/v1/configs/llm/test", + request_body = TestLlmConfigRequest, + responses( + (status = 200, description = "LLM config test result (JSON)") + ) +)] async fn test_llm_config( State(state): State, Json(payload): Json, @@ -654,12 +754,27 @@ async fn test_llm_config( // --- Config API Handlers (Proxy to data-persistence-service) --- /// [GET /v1/configs/llm_providers] +#[utoipa::path( + get, + path = "/v1/configs/llm_providers", + responses( + (status = 200, description = "LLM providers configuration", body = LlmProvidersConfig) + ) +)] async fn get_llm_providers_config(State(state): State) -> Result { let config = state.persistence_client.get_llm_providers_config().await?; Ok(Json(config)) } /// [PUT /v1/configs/llm_providers] +#[utoipa::path( + put, + path = "/v1/configs/llm_providers", + request_body = LlmProvidersConfig, + responses( + (status = 200, description = "Updated LLM providers configuration", body = LlmProvidersConfig) + ) +)] async fn update_llm_providers_config( State(state): State, Json(payload): Json, @@ -672,6 +787,13 @@ async fn update_llm_providers_config( } /// [GET /v1/configs/analysis_template_sets] +#[utoipa::path( + get, + path = "/v1/configs/analysis_template_sets", + responses( + (status = 200, description = "Analysis template sets configuration", body = AnalysisTemplateSets) + ) +)] async fn get_analysis_template_sets(State(state): State) -> Result { let config = state .persistence_client @@ -681,6 +803,14 @@ async fn get_analysis_template_sets(State(state): State) -> Result, Json(payload): Json, @@ -693,12 +823,27 @@ async fn update_analysis_template_sets( } /// [GET /v1/configs/data_sources] +#[utoipa::path( + get, + path = "/v1/configs/data_sources", + responses( + (status = 200, description = "Data sources configuration", body = DataSourcesConfig) + ) +)] async fn get_data_sources_config(State(state): State) -> Result { let config = state.persistence_client.get_data_sources_config().await?; Ok(Json(config)) } /// [PUT /v1/configs/data_sources] +#[utoipa::path( + put, + path = "/v1/configs/data_sources", + request_body = DataSourcesConfig, + responses( + (status = 200, description = "Updated data sources configuration", body = DataSourcesConfig) + ) +)] async fn update_data_sources_config( State(state): State, Json(payload): Json, @@ -710,7 +855,104 @@ async fn update_data_sources_config( Ok(Json(updated_config)) } +/// [GET /v1/configs/data_sources/schema] +/// Returns the schema definitions for all supported data sources (for dynamic UI generation). +#[utoipa::path( + get, + path = "/v1/configs/data_sources/schema", + responses( + (status = 200, description = "Data sources schema", body = DataSourceSchemaResponse) + ) +)] +async fn get_data_source_schema() -> Result { + let tushare = DataSourceProviderSchema { + id: "tushare".to_string(), + name: "Tushare Pro".to_string(), + description: "Official Tushare Data Provider".to_string(), + fields: vec![ + ConfigFieldSchema { + key: "api_token".to_string(), + label: "API Token".to_string(), + r#type: "password".to_string(), + required: true, + placeholder: Some("Enter your token...".to_string()), + description: Some("Get it from https://tushare.pro".to_string()), + default: None, + }, + ConfigFieldSchema { + key: "api_url".to_string(), + label: "API Endpoint".to_string(), + r#type: "text".to_string(), + required: false, + placeholder: None, + description: None, + default: Some("http://api.tushare.pro".to_string()), + }, + ], + }; + + let finnhub = DataSourceProviderSchema { + id: "finnhub".to_string(), + name: "Finnhub".to_string(), + description: "Finnhub Stock API".to_string(), + fields: vec![ + ConfigFieldSchema { + key: "api_key".to_string(), + label: "API Key".to_string(), + r#type: "password".to_string(), + required: true, + placeholder: Some("Enter your API key...".to_string()), + description: Some("Get it from https://finnhub.io".to_string()), + default: None, + }, + ], + }; + + let alphavantage = DataSourceProviderSchema { + id: "alphavantage".to_string(), + name: "Alpha Vantage".to_string(), + description: "Alpha Vantage API".to_string(), + fields: vec![ + ConfigFieldSchema { + key: "api_key".to_string(), + label: "API Key".to_string(), + r#type: "password".to_string(), + required: true, + placeholder: Some("Enter your API key...".to_string()), + description: Some("Get it from https://www.alphavantage.co".to_string()), + default: None, + }, + ], + }; + + let yfinance = DataSourceProviderSchema { + id: "yfinance".to_string(), + name: "Yahoo Finance".to_string(), + description: "Yahoo Finance API (Unofficial)".to_string(), + fields: vec![ + // No fields required usually, maybe proxy + ], + }; + + Ok(Json(DataSourceSchemaResponse { + providers: vec![tushare, finnhub, alphavantage, yfinance], + })) +} + + /// [GET /v1/discover-models/:provider_id] +#[utoipa::path( + get, + path = "/v1/discover-models/{provider_id}", + params( + ("provider_id" = String, Path, description = "Provider ID to discover models for") + ), + responses( + (status = 200, description = "Discovered models (JSON)"), + (status = 404, description = "Provider not found"), + (status = 502, description = "Provider error") + ) +)] async fn discover_models( State(state): State, Path(provider_id): Path, @@ -762,14 +1004,23 @@ async fn discover_models( } } -#[derive(Deserialize)] -struct DiscoverPreviewRequest { - api_base_url: String, - api_key: String, +#[api_dto] +pub struct DiscoverPreviewRequest { + pub api_base_url: String, + pub api_key: String, } /// [POST /v1/discover-models] /// Preview discovery without persisting provider configuration. +#[utoipa::path( + post, + path = "/v1/discover-models", + request_body = DiscoverPreviewRequest, + responses( + (status = 200, description = "Discovered models (JSON)"), + (status = 502, description = "Provider error") + ) +)] async fn discover_models_preview( Json(payload): Json, ) -> Result { diff --git a/services/api-gateway/src/main.rs b/services/api-gateway/src/main.rs index 6bca304..13316a2 100644 --- a/services/api-gateway/src/main.rs +++ b/services/api-gateway/src/main.rs @@ -3,6 +3,9 @@ mod config; mod error; mod persistence; mod state; +mod openapi; +#[cfg(test)] +mod openapi_tests; use crate::config::AppConfig; use crate::error::Result; diff --git a/services/api-gateway/src/openapi.rs b/services/api-gateway/src/openapi.rs new file mode 100644 index 0000000..01ca4b1 --- /dev/null +++ b/services/api-gateway/src/openapi.rs @@ -0,0 +1,68 @@ +use utoipa::OpenApi; +use common_contracts::messages::*; +use common_contracts::observability::*; +use common_contracts::config_models::*; +use crate::api; + +#[derive(OpenApi)] +#[openapi( + paths( + api::health_check, + api::start_workflow, + api::get_task_progress, + api::resolve_symbol, + api::get_llm_providers_config, + api::update_llm_providers_config, + api::get_analysis_template_sets, + api::update_analysis_template_sets, + api::get_data_sources_config, + api::update_data_sources_config, + api::test_data_source_config, + api::test_llm_config, + api::discover_models, + api::discover_models_preview, + api::get_data_source_schema, // New endpoint + ), + components( + schemas( + // Workflow + StartWorkflowCommand, + WorkflowEvent, + WorkflowDag, + TaskNode, + TaskDependency, + TaskType, + TaskStatus, + // Observability + TaskProgress, + ServiceStatus, + HealthStatus, + ObservabilityTaskStatus, + // Configs + LlmProvider, + LlmModel, + AnalysisTemplateSet, + AnalysisModuleConfig, + DataSourceConfig, + DataSourceProvider, + // Request/Response + api::DataRequest, + api::RequestAcceptedResponse, + api::SymbolResolveRequest, + api::SymbolResolveResponse, + api::TestConfigRequest, + api::TestLlmConfigRequest, + api::DataSourceSchemaResponse, + api::DataSourceProviderSchema, + api::ConfigFieldSchema, + ) + ), + tags( + (name = "workflow", description = "Workflow management endpoints"), + (name = "config", description = "Configuration management endpoints"), + (name = "tools", description = "Utility tools"), + (name = "observability", description = "System observability"), + ) +)] +pub struct ApiDoc; + diff --git a/services/api-gateway/src/openapi_tests.rs b/services/api-gateway/src/openapi_tests.rs new file mode 100644 index 0000000..afff315 --- /dev/null +++ b/services/api-gateway/src/openapi_tests.rs @@ -0,0 +1,32 @@ +#[cfg(test)] +mod tests { + use crate::openapi::ApiDoc; + use utoipa::OpenApi; + + #[test] + fn test_openapi_schema_generation() { + let doc = ApiDoc::openapi(); + let json = doc.to_json().expect("Failed to serialize OpenAPI doc"); + + // Basic validation + assert!(json.contains("\"openapi\":\"3.1.0\""), "Should specify OpenAPI version"); + assert!(json.contains("\"title\":\"api-gateway\""), "Should have title"); + + // Check key schemas exist + assert!(json.contains("\"WorkflowEvent\""), "Should contain WorkflowEvent schema"); + assert!(json.contains("\"DataSourceConfig\""), "Should contain DataSourceConfig schema"); + assert!(json.contains("\"DataSourceSchemaResponse\""), "Should contain DataSourceSchemaResponse schema"); + assert!(json.contains("\"TaskProgress\""), "Should contain TaskProgress schema"); + + // Check new endpoints exist + assert!(json.contains("/v1/configs/data_sources/schema"), "Should contain schema endpoint"); + assert!(json.contains("/v1/workflow/start"), "Should contain workflow start endpoint"); + + // Check progress field + assert!(json.contains("\"progress\""), "Should contain progress field in TaskStateChanged (inside WorkflowEvent)"); + + // Optional: Print for manual inspection + // println!("{}", json); + } +} + diff --git a/services/common-contracts/src/config_models.rs b/services/common-contracts/src/config_models.rs index f2aec17..1e31a30 100644 --- a/services/common-contracts/src/config_models.rs +++ b/services/common-contracts/src/config_models.rs @@ -1,9 +1,9 @@ use serde::{Serialize, Deserialize}; use std::collections::HashMap; -use utoipa::ToSchema; +use service_kit::api_dto; // 单个启用的模型 -#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)] +#[api_dto] pub struct LlmModel { pub model_id: String, // e.g., "gpt-4o" pub name: Option, // 别名,用于UI显示 @@ -11,7 +11,7 @@ pub struct LlmModel { } // 单个LLM供应商的完整配置 -#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)] +#[api_dto] pub struct LlmProvider { pub name: String, // "OpenAI 官方" pub api_base_url: String, @@ -30,7 +30,8 @@ pub type AnalysisTemplateSets = HashMap; /// A single, self-contained set of analysis modules representing a complete workflow. /// e.g., "Standard Fundamental Analysis" -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, ToSchema)] +#[api_dto] +#[derive(PartialEq)] pub struct AnalysisTemplateSet { /// Human-readable name for the template set. pub name: String, @@ -40,7 +41,8 @@ pub struct AnalysisTemplateSet { } /// Configuration for a single analysis module. -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, ToSchema)] +#[api_dto] +#[derive(PartialEq)] pub struct AnalysisModuleConfig { pub name: String, pub provider_id: String, @@ -73,7 +75,8 @@ pub struct SystemConfig { pub analysis_modules: AnalysisModulesConfig, } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, ToSchema)] +#[api_dto] +#[derive(PartialEq)] #[serde(rename_all = "snake_case")] pub enum DataSourceProvider { Tushare, @@ -82,7 +85,8 @@ pub enum DataSourceProvider { Yfinance, } -#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, ToSchema)] +#[api_dto] +#[derive(PartialEq)] pub struct DataSourceConfig { pub provider: DataSourceProvider, pub api_key: Option, diff --git a/services/common-contracts/src/dtos.rs b/services/common-contracts/src/dtos.rs index 03e47de..5b4a19d 100644 --- a/services/common-contracts/src/dtos.rs +++ b/services/common-contracts/src/dtos.rs @@ -91,12 +91,12 @@ pub struct RealtimeQuoteDto { pub source: Option, } -use crate::observability::TaskStatus; +use crate::observability::ObservabilityTaskStatus; #[api_dto] pub struct ProviderStatusDto { pub last_updated: chrono::DateTime, - pub status: TaskStatus, + pub status: ObservabilityTaskStatus, pub data_version: Option, } diff --git a/services/common-contracts/src/messages.rs b/services/common-contracts/src/messages.rs index 97e1f94..24946cf 100644 --- a/services/common-contracts/src/messages.rs +++ b/services/common-contracts/src/messages.rs @@ -1,8 +1,8 @@ -use serde::{Serialize, Deserialize}; use uuid::Uuid; use crate::symbol_utils::CanonicalSymbol; use crate::subjects::{NatsSubject, SubjectMessage}; use std::collections::HashMap; +use service_kit::api_dto; // --- Commands --- @@ -10,7 +10,7 @@ use std::collections::HashMap; /// Command to initiate a new workflow. /// Published by: `api-gateway` /// Consumed by: `workflow-orchestrator` -#[derive(Clone, Debug, Serialize, Deserialize)] +#[api_dto] pub struct StartWorkflowCommand { pub request_id: Uuid, pub symbol: CanonicalSymbol, @@ -28,7 +28,7 @@ impl SubjectMessage for StartWorkflowCommand { /// Command to request a state snapshot for re-alignment. /// Published by: `api-gateway` (on client connect/reconnect) /// Consumed by: `workflow-orchestrator` -#[derive(Clone, Debug, Serialize, Deserialize)] +#[api_dto] pub struct SyncStateCommand { pub request_id: Uuid, } @@ -42,7 +42,7 @@ impl SubjectMessage for SyncStateCommand { /// Command to trigger data fetching. /// Published by: `workflow-orchestrator` (previously api-gateway) /// Consumed by: `*-provider-services` -#[derive(Clone, Debug, Serialize, Deserialize)] +#[api_dto] pub struct FetchCompanyDataCommand { pub request_id: Uuid, pub symbol: CanonicalSymbol, @@ -59,7 +59,7 @@ impl SubjectMessage for FetchCompanyDataCommand { /// Command to start a full report generation workflow. /// Published by: `workflow-orchestrator` (previously api-gateway) /// Consumed by: `report-generator-service` -#[derive(Clone, Debug, Serialize, Deserialize)] +#[api_dto] pub struct GenerateReportCommand { pub request_id: Uuid, pub symbol: CanonicalSymbol, @@ -76,7 +76,7 @@ impl SubjectMessage for GenerateReportCommand { // Topic: events.workflow.{request_id} /// Unified event stream for frontend consumption. -#[derive(Serialize, Deserialize, Debug, Clone)] +#[api_dto] #[serde(tag = "type", content = "payload")] pub enum WorkflowEvent { // 1. 流程初始化 (携带完整的任务依赖图) @@ -92,7 +92,8 @@ pub enum WorkflowEvent { task_type: TaskType, // DataFetch | DataProcessing | Analysis status: TaskStatus, // Pending, Scheduled, Running, Completed, Failed, Skipped message: Option, - timestamp: i64 + timestamp: i64, + progress: Option, // 0-100 }, // 3. 任务流式输出 (用于 LLM 打字机效果) @@ -124,19 +125,19 @@ pub enum WorkflowEvent { } } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[api_dto] pub struct WorkflowDag { pub nodes: Vec, pub edges: Vec // from -> to } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[api_dto] pub struct TaskDependency { pub from: String, pub to: String, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[api_dto] pub struct TaskNode { pub id: String, pub name: String, @@ -144,14 +145,16 @@ pub struct TaskNode { pub initial_status: TaskStatus } -#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Copy)] +#[api_dto] +#[derive(Copy, PartialEq)] pub enum TaskType { DataFetch, // 创造原始上下文 DataProcessing, // 消耗并转换上下文 (New) Analysis // 读取上下文生成新内容 } -#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Copy)] +#[api_dto] +#[derive(Copy, PartialEq)] pub enum TaskStatus { Pending, // 等待依赖 Scheduled, // 依赖满足,已下发给 Worker @@ -161,13 +164,13 @@ pub enum TaskStatus { Skipped // 因上游失败或策略原因被跳过 } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[api_dto] pub struct CompanyProfilePersistedEvent { pub request_id: Uuid, pub symbol: CanonicalSymbol, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[api_dto] pub struct FinancialsPersistedEvent { pub request_id: Uuid, pub symbol: CanonicalSymbol, @@ -187,7 +190,7 @@ impl SubjectMessage for FinancialsPersistedEvent { } } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[api_dto] pub struct DataFetchFailedEvent { pub request_id: Uuid, pub symbol: CanonicalSymbol, @@ -206,7 +209,7 @@ impl SubjectMessage for DataFetchFailedEvent { // Topic: events.analysis.report_generated /// Event emitted when a report generation task (or sub-module) is completed. /// Consumed by: `workflow-orchestrator` -#[derive(Serialize, Deserialize, Debug, Clone)] +#[api_dto] pub struct ReportGeneratedEvent { pub request_id: Uuid, pub symbol: CanonicalSymbol, @@ -222,7 +225,7 @@ impl SubjectMessage for ReportGeneratedEvent { } // Topic: events.analysis.report_failed -#[derive(Serialize, Deserialize, Debug, Clone)] +#[api_dto] pub struct ReportFailedEvent { pub request_id: Uuid, pub symbol: CanonicalSymbol, diff --git a/services/common-contracts/src/observability.rs b/services/common-contracts/src/observability.rs index 5a4a31f..0ee8250 100644 --- a/services/common-contracts/src/observability.rs +++ b/services/common-contracts/src/observability.rs @@ -1,16 +1,17 @@ use std::collections::HashMap; use chrono::{DateTime, Utc}; -use serde::{Serialize, Deserialize}; use uuid::Uuid; +use service_kit::api_dto; -#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +#[api_dto] +#[derive(Copy)] pub enum ServiceStatus { Ok, Degraded, Unhealthy, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[api_dto] pub struct HealthStatus { pub module_id: String, pub status: ServiceStatus, @@ -18,20 +19,21 @@ pub struct HealthStatus { pub details: HashMap, } -#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, utoipa::ToSchema)] +#[api_dto] +#[derive(Copy, PartialEq, Eq)] #[serde(rename_all = "snake_case")] -pub enum TaskStatus { +pub enum ObservabilityTaskStatus { Queued, InProgress, Completed, Failed, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[api_dto] pub struct TaskProgress { pub request_id: Uuid, pub task_name: String, - pub status: TaskStatus, + pub status: ObservabilityTaskStatus, pub progress_percent: u8, pub details: String, pub started_at: DateTime, diff --git a/services/common-contracts/src/symbol_utils.rs b/services/common-contracts/src/symbol_utils.rs index a5c51b9..1d91d2d 100644 --- a/services/common-contracts/src/symbol_utils.rs +++ b/services/common-contracts/src/symbol_utils.rs @@ -1,7 +1,8 @@ -use serde::{Deserialize, Serialize}; use std::fmt; +use service_kit::api_dto; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[api_dto] +#[derive(PartialEq, Eq)] pub enum Market { US, CN, @@ -25,8 +26,10 @@ impl From<&str> for Market { /// CanonicalSymbol 是系统内部唯一的股票代码标识符类型 /// 它封装了一个标准化的字符串(遵循 Yahoo Finance 格式) /// 使用 newtype 模式防止与普通 String 混淆 -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +#[api_dto] +#[derive(PartialEq, Eq, Hash)] #[serde(transparent)] +#[schema(value_type = String, example = "600519.SS")] pub struct CanonicalSymbol(String); impl CanonicalSymbol { diff --git a/services/data-persistence-service/Cargo.lock b/services/data-persistence-service/Cargo.lock index d41874d..8498f2c 100644 --- a/services/data-persistence-service/Cargo.lock +++ b/services/data-persistence-service/Cargo.lock @@ -3068,9 +3068,16 @@ dependencies = [ "serde_json", "url", "utoipa", + "utoipa-swagger-ui-vendored", "zip", ] +[[package]] +name = "utoipa-swagger-ui-vendored" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2eebbbfe4093922c2b6734d7c679ebfebd704a0d7e56dfcb0d05818ce28977d" + [[package]] name = "uuid" version = "1.18.1" diff --git a/services/data-persistence-service/Cargo.toml b/services/data-persistence-service/Cargo.toml index daff302..c934f69 100644 --- a/services/data-persistence-service/Cargo.toml +++ b/services/data-persistence-service/Cargo.toml @@ -43,7 +43,7 @@ serde_json = "1.0" # OpenAPI & Schema utoipa = { version = "5.4", features = ["axum_extras", "chrono", "uuid"] } -utoipa-swagger-ui = { version = "9.0", features = ["axum"] } +utoipa-swagger-ui = { version = "9.0", features = ["axum", "vendored"] } # Environment variables dotenvy = "0.15" diff --git a/services/data-persistence-service/Dockerfile b/services/data-persistence-service/Dockerfile index e2e8b0f..b7f09ba 100644 --- a/services/data-persistence-service/Dockerfile +++ b/services/data-persistence-service/Dockerfile @@ -31,6 +31,7 @@ RUN cargo build --release --bin data-persistence-service-server FROM debian:bookworm-slim AS runtime WORKDIR /app +RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates libssl-dev curl && rm -rf /var/lib/apt/lists/* RUN groupadd --system --gid 1001 appuser && \ useradd --system --uid 1001 --gid 1001 appuser USER appuser diff --git a/services/finnhub-provider-service/src/message_consumer.rs b/services/finnhub-provider-service/src/message_consumer.rs index c0004dc..d525875 100644 --- a/services/finnhub-provider-service/src/message_consumer.rs +++ b/services/finnhub-provider-service/src/message_consumer.rs @@ -1,7 +1,7 @@ use crate::error::Result; use crate::state::{AppState, ServiceOperationalStatus}; use common_contracts::messages::FetchCompanyDataCommand; -use common_contracts::observability::TaskStatus; +use common_contracts::observability::ObservabilityTaskStatus; use common_contracts::subjects::NatsSubject; use futures_util::StreamExt; use std::sync::Arc; @@ -113,7 +113,7 @@ pub async fn subscribe_to_data_commands(app_state: Arc, nats_client: a app_state.tasks.insert(task_id, common_contracts::observability::TaskProgress { request_id: task_id, task_name: format!("finnhub:{}", command.symbol), - status: TaskStatus::Queued, + status: ObservabilityTaskStatus::Queued, progress_percent: 0, details: "Command received".to_string(), started_at: chrono::Utc::now(), diff --git a/services/finnhub-provider-service/src/worker.rs b/services/finnhub-provider-service/src/worker.rs index 951e04a..0804048 100644 --- a/services/finnhub-provider-service/src/worker.rs +++ b/services/finnhub-provider-service/src/worker.rs @@ -4,7 +4,7 @@ use crate::state::AppState; use chrono::{Datelike, Utc, Duration}; use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto, SessionDataDto, ProviderCacheDto}; use common_contracts::messages::{CompanyProfilePersistedEvent, FetchCompanyDataCommand, FinancialsPersistedEvent, DataFetchFailedEvent}; -use common_contracts::observability::{TaskProgress, TaskStatus}; +use common_contracts::observability::{TaskProgress, ObservabilityTaskStatus}; use tracing::{error, info}; pub async fn handle_fetch_command( @@ -33,14 +33,14 @@ pub async fn handle_fetch_command( // Update task status if let Some(mut task) = state.tasks.get_mut(&command.request_id) { - task.status = TaskStatus::Failed; + task.status = ObservabilityTaskStatus::Failed; task.details = format!("Failed: {}", e); } else { // If task doesn't exist (e.g. failed at insert), create a failed task let task = TaskProgress { request_id: command.request_id, task_name: format!("finnhub:{}", command.symbol), - status: TaskStatus::Failed, + status: ObservabilityTaskStatus::Failed, progress_percent: 0, details: format!("Failed: {}", e), started_at: Utc::now(), @@ -65,7 +65,7 @@ async fn handle_fetch_command_inner( TaskProgress { request_id: command.request_id, task_name: format!("finnhub:{}", command.symbol), - status: TaskStatus::InProgress, + status: ObservabilityTaskStatus::InProgress, progress_percent: 10, details: "Fetching data from Finnhub".to_string(), started_at: chrono::Utc::now(), @@ -198,7 +198,7 @@ async fn handle_fetch_command_inner( // 4. Finalize if let Some(mut task) = state.tasks.get_mut(&command.request_id) { - task.status = TaskStatus::Completed; + task.status = ObservabilityTaskStatus::Completed; task.progress_percent = 100; task.details = "Workflow finished successfully".to_string(); } @@ -260,6 +260,6 @@ mod integration_tests { assert!(result.is_ok(), "Worker execution failed: {:?}", result.err()); let task = state.tasks.get(&request_id).expect("Task should exist"); - assert_eq!(task.status, TaskStatus::Completed); + assert_eq!(task.status, ObservabilityTaskStatus::Completed); } } diff --git a/services/tushare-provider-service/src/message_consumer.rs b/services/tushare-provider-service/src/message_consumer.rs index ee1c4db..069304c 100644 --- a/services/tushare-provider-service/src/message_consumer.rs +++ b/services/tushare-provider-service/src/message_consumer.rs @@ -1,7 +1,7 @@ use crate::error::Result; use crate::state::{AppState, ServiceOperationalStatus}; use common_contracts::messages::FetchCompanyDataCommand; -use common_contracts::observability::TaskStatus; +use common_contracts::observability::ObservabilityTaskStatus; use common_contracts::subjects::NatsSubject; use futures_util::StreamExt; use tracing::{error, info, warn}; @@ -94,7 +94,7 @@ async fn subscribe_and_process( state_for_closure.tasks.insert(task_id, common_contracts::observability::TaskProgress { request_id: task_id, task_name: format!("tushare:{}", command.symbol), - status: TaskStatus::Queued, + status: ObservabilityTaskStatus::Queued, progress_percent: 0, details: "Command received".to_string(), started_at: Utc::now(), @@ -112,7 +112,7 @@ async fn subscribe_and_process( ); // Update task to failed status if let Some(mut task) = workflow_state_for_error.tasks.get_mut(&task_id) { - task.status = TaskStatus::Failed; + task.status = ObservabilityTaskStatus::Failed; task.details = format!("Workflow failed: {}", e); } } @@ -131,7 +131,7 @@ async fn subscribe_and_process( None => { // Check if the task is in Failed state (set by the worker error handler) let is_failed = cleanup_state.tasks.get(&task_id) - .map(|t| t.status == TaskStatus::Failed) + .map(|t| t.status == ObservabilityTaskStatus::Failed) .unwrap_or(false); if is_failed { diff --git a/services/tushare-provider-service/src/worker.rs b/services/tushare-provider-service/src/worker.rs index 7bfe130..fb4eaee 100644 --- a/services/tushare-provider-service/src/worker.rs +++ b/services/tushare-provider-service/src/worker.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use common_contracts::{ dtos::{CompanyProfileDto, TimeSeriesFinancialDto, SessionDataDto, ProviderCacheDto}, messages::{CompanyProfilePersistedEvent, FetchCompanyDataCommand, FinancialsPersistedEvent, DataFetchFailedEvent}, - observability::TaskStatus, + observability::ObservabilityTaskStatus, persistence_client::PersistenceClient, }; use tokio::sync::mpsc; @@ -38,8 +38,8 @@ pub async fn run_tushare_workflow( // Ensure task status is failed in memory if it wasn't already if let Some(mut task) = state.tasks.get_mut(&task_id) { - if task.status != TaskStatus::Failed { - task.status = TaskStatus::Failed; + if task.status != ObservabilityTaskStatus::Failed { + task.status = ObservabilityTaskStatus::Failed; task.details = format!("Workflow failed: {}", e); } } @@ -63,7 +63,7 @@ async fn run_tushare_workflow_inner( let reason = "Execution failed: Tushare provider is not available (misconfigured).".to_string(); error!("{}", reason); if let Some(mut task) = state.tasks.get_mut(&task_id) { - task.status = TaskStatus::Failed; + task.status = ObservabilityTaskStatus::Failed; task.details = reason.clone(); } return Err(AppError::ProviderNotAvailable(reason)); @@ -78,7 +78,7 @@ async fn run_tushare_workflow_inner( .tasks .get_mut(&task_id) .ok_or_else(|| AppError::Internal("Task not found".to_string()))?; - entry.status = TaskStatus::InProgress; + entry.status = ObservabilityTaskStatus::InProgress; entry.progress_percent = 10; entry.details = "Checking cache...".to_string(); } @@ -169,7 +169,7 @@ async fn run_tushare_workflow_inner( .tasks .get_mut(&task_id) .ok_or_else(|| AppError::Internal("Task not found".to_string()))?; - entry.status = TaskStatus::Completed; + entry.status = ObservabilityTaskStatus::Completed; entry.progress_percent = 100; entry.details = "Workflow finished successfully".to_string(); } @@ -287,7 +287,7 @@ mod integration_tests { state.tasks.insert(request_id, TaskProgress { request_id, task_name: "tushare:600519".to_string(), - status: TaskStatus::Queued, + status: ObservabilityTaskStatus::Queued, progress_percent: 0, details: "Init".to_string(), started_at: Utc::now() @@ -300,6 +300,6 @@ mod integration_tests { assert!(result.is_ok(), "Worker execution failed: {:?}", result.err()); let task = state.tasks.get(&request_id).expect("Task should exist"); - assert_eq!(task.status, TaskStatus::Completed); + assert_eq!(task.status, ObservabilityTaskStatus::Completed); } } diff --git a/services/workflow-orchestrator-service/src/workflow.rs b/services/workflow-orchestrator-service/src/workflow.rs index 910f4f6..270e3f8 100644 --- a/services/workflow-orchestrator-service/src/workflow.rs +++ b/services/workflow-orchestrator-service/src/workflow.rs @@ -248,7 +248,7 @@ impl WorkflowEngine { for task_id in initial_tasks { let _ = machine.update_task_status(&task_id, TaskStatus::Scheduled); - self.broadcast_task_state(cmd.request_id, &task_id, TaskType::DataFetch, TaskStatus::Scheduled, None).await?; + self.broadcast_task_state(cmd.request_id, &task_id, TaskType::DataFetch, TaskStatus::Scheduled, None, None).await?; if task_id.starts_with("fetch:") { let fetch_cmd = FetchCompanyDataCommand { @@ -296,7 +296,7 @@ impl WorkflowEngine { index: 0 }).await?; - self.broadcast_task_state(evt.request_id, &task_id, TaskType::DataFetch, TaskStatus::Completed, Some("Data persisted".into())).await?; + self.broadcast_task_state(evt.request_id, &task_id, TaskType::DataFetch, TaskStatus::Completed, Some("Data persisted".into()), None).await?; self.trigger_next_tasks(&mut machine, next_tasks).await?; @@ -311,7 +311,7 @@ impl WorkflowEngine { let task_id = format!("fetch:{}", provider_id); let _ = machine.update_task_status(&task_id, TaskStatus::Failed); - self.broadcast_task_state(evt.request_id, &task_id, TaskType::DataFetch, TaskStatus::Failed, Some(evt.error)).await?; + self.broadcast_task_state(evt.request_id, &task_id, TaskType::DataFetch, TaskStatus::Failed, Some(evt.error), None).await?; self.repo.save(&machine).await?; } Ok(()) @@ -383,7 +383,7 @@ impl WorkflowEngine { let next_tasks = machine.update_task_status(&final_task_id, TaskStatus::Completed); - self.broadcast_task_state(evt.request_id, &final_task_id, TaskType::Analysis, TaskStatus::Completed, Some("Report generated".into())).await?; + self.broadcast_task_state(evt.request_id, &final_task_id, TaskType::Analysis, TaskStatus::Completed, Some("Report generated".into()), None).await?; // If this was the last node, the workflow is complete? // check_dependents returns next ready tasks. If empty, and no running tasks, we are done. @@ -411,7 +411,7 @@ impl WorkflowEngine { }; let _ = machine.update_task_status(&final_task_id, TaskStatus::Failed); - self.broadcast_task_state(evt.request_id, &final_task_id, TaskType::Analysis, TaskStatus::Failed, Some(evt.error.clone())).await?; + self.broadcast_task_state(evt.request_id, &final_task_id, TaskType::Analysis, TaskStatus::Failed, Some(evt.error.clone()), None).await?; // Propagate failure? self.broker.publish_event(evt.request_id, WorkflowEvent::WorkflowFailed { @@ -441,7 +441,7 @@ impl WorkflowEngine { async fn trigger_next_tasks(&self, machine: &mut WorkflowStateMachine, tasks: Vec) -> Result<()> { for task_id in tasks { machine.update_task_status(&task_id, TaskStatus::Scheduled); - self.broadcast_task_state(machine.request_id, &task_id, TaskType::Analysis, TaskStatus::Scheduled, None).await?; + self.broadcast_task_state(machine.request_id, &task_id, TaskType::Analysis, TaskStatus::Scheduled, None, None).await?; if task_id == "analysis:report" { let cmd = GenerateReportCommand { @@ -456,13 +456,14 @@ impl WorkflowEngine { Ok(()) } - async fn broadcast_task_state(&self, request_id: Uuid, task_id: &str, ttype: TaskType, status: TaskStatus, msg: Option) -> Result<()> { + async fn broadcast_task_state(&self, request_id: Uuid, task_id: &str, ttype: TaskType, status: TaskStatus, msg: Option, progress: Option) -> Result<()> { let event = WorkflowEvent::TaskStateChanged { task_id: task_id.to_string(), task_type: ttype, status, message: msg, timestamp: chrono::Utc::now().timestamp_millis(), + progress, // Passed through }; self.broker.publish_event(request_id, event).await } diff --git a/services/yfinance-provider-service/src/message_consumer.rs b/services/yfinance-provider-service/src/message_consumer.rs index efed13d..4350015 100644 --- a/services/yfinance-provider-service/src/message_consumer.rs +++ b/services/yfinance-provider-service/src/message_consumer.rs @@ -36,7 +36,7 @@ pub async fn run(state: AppState) -> Result<()> { { error!("Error handling fetch command: {:?}", e); if let Some(mut task) = state_clone.tasks.get_mut(&request_id) { - task.status = common_contracts::observability::TaskStatus::Failed; + task.status = common_contracts::observability::ObservabilityTaskStatus::Failed; task.details = format!("Worker failed: {}", e); } } diff --git a/services/yfinance-provider-service/src/worker.rs b/services/yfinance-provider-service/src/worker.rs index c2f8cf0..242c259 100644 --- a/services/yfinance-provider-service/src/worker.rs +++ b/services/yfinance-provider-service/src/worker.rs @@ -1,6 +1,6 @@ use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto, SessionDataDto, ProviderCacheDto}; use common_contracts::messages::{CompanyProfilePersistedEvent, FetchCompanyDataCommand, FinancialsPersistedEvent, DataFetchFailedEvent}; -use common_contracts::observability::{TaskProgress, TaskStatus}; +use common_contracts::observability::{TaskProgress, ObservabilityTaskStatus}; use common_contracts::persistence_client::PersistenceClient; use crate::error::{Result, AppError}; use crate::state::AppState; @@ -34,14 +34,14 @@ pub async fn handle_fetch_command( // Update task status if let Some(mut task) = state.tasks.get_mut(&command.request_id) { - task.status = TaskStatus::Failed; + task.status = ObservabilityTaskStatus::Failed; task.details = format!("Failed: {}", e); } else { // If task doesn't exist (e.g. failed at insert), create a failed task let task = TaskProgress { request_id: command.request_id, task_name: format!("yfinance:{}", command.symbol), - status: TaskStatus::Failed, + status: ObservabilityTaskStatus::Failed, progress_percent: 0, details: format!("Failed: {}", e), started_at: Utc::now(), @@ -78,7 +78,7 @@ async fn handle_fetch_command_inner( state.tasks.insert(task_id, common_contracts::observability::TaskProgress { request_id: task_id, task_name: format!("yfinance:{}", command.symbol), - status: TaskStatus::InProgress, + status: ObservabilityTaskStatus::InProgress, progress_percent: 5, details: "Checking cache...".to_string(), started_at: chrono::Utc::now(), @@ -95,7 +95,7 @@ async fn handle_fetch_command_inner( .map_err(|e| AppError::Internal(format!("Failed to deserialize cache: {}", e)))?; if let Some(mut task) = state.tasks.get_mut(&task_id) { - task.status = TaskStatus::InProgress; + task.status = ObservabilityTaskStatus::InProgress; task.progress_percent = 50; task.details = "Data retrieved from cache".to_string(); } @@ -104,7 +104,7 @@ async fn handle_fetch_command_inner( None => { info!("Cache MISS for {}", cache_key); if let Some(mut task) = state.tasks.get_mut(&task_id) { - task.status = TaskStatus::InProgress; + task.status = ObservabilityTaskStatus::InProgress; task.progress_percent = 20; task.details = "Fetching data from YFinance".to_string(); } @@ -192,7 +192,7 @@ async fn handle_fetch_command_inner( .await?; if let Some(mut task) = state.tasks.get_mut(&task_id) { - task.status = TaskStatus::Completed; + task.status = ObservabilityTaskStatus::Completed; task.progress_percent = 100; task.details = "Workflow finished successfully".to_string(); } @@ -256,6 +256,6 @@ mod integration_tests { assert!(result.is_ok(), "Worker execution failed: {:?}", result.err()); let task = state.tasks.get(&request_id).expect("Task should exist"); - assert_eq!(task.status, TaskStatus::Completed); + assert_eq!(task.status, ObservabilityTaskStatus::Completed); } }