feat: implement API DTO export and verify E2E tests

- contracts: Add #[api_dto] macros to core structs for OpenAPI support
- api-gateway: Integrate utoipa for Swagger UI and OpenAPI spec generation
- config: Implement dynamic configuration APIs for DataSources and LLM Providers
- tests: Refactor E2E tests to support dynamic provider configuration and fix timeouts
- docs: Update backend status in backend_todos.md
This commit is contained in:
Lv, Qi 2025-11-22 22:14:01 +08:00
parent b43817919d
commit 70b1a27978
27 changed files with 770 additions and 173 deletions

View File

@ -39,6 +39,11 @@ services:
depends_on: depends_on:
postgres-db: postgres-db:
condition: service_healthy condition: service_healthy
healthcheck:
test: ["CMD-SHELL", "curl -fsS http://localhost:3000/health >/dev/null || exit 1"]
interval: 5s
timeout: 5s
retries: 10
# If you prefer live-reload or local code mount, consider switching to a dev Dockerfile. # If you prefer live-reload or local code mount, consider switching to a dev Dockerfile.
# volumes: # volumes:
# - ./:/workspace # - ./:/workspace
@ -90,13 +95,20 @@ services:
RUST_LOG: info,axum=info RUST_LOG: info,axum=info
RUST_BACKTRACE: "1" RUST_BACKTRACE: "1"
depends_on: depends_on:
- nats nats:
- data-persistence-service condition: service_started
- alphavantage-provider-service data-persistence-service:
- tushare-provider-service condition: service_healthy
- finnhub-provider-service alphavantage-provider-service:
- yfinance-provider-service condition: service_started
- report-generator-service tushare-provider-service:
condition: service_started
finnhub-provider-service:
condition: service_started
yfinance-provider-service:
condition: service_started
report-generator-service:
condition: service_started
networks: networks:
- app-network - app-network
healthcheck: healthcheck:

View File

@ -1,75 +1,48 @@
# 后端改造需求清单 (配合前端重构) # 后端改造需求清单 (配合前端重构)
日期: 2025-11-22 日期: 2025-11-22
状态: 待执行 状态: **已完成**
为了支持新的 "Puppet Architecture" 前端设计,后端需要进行以下适配性改造。 为了支持新的 "Puppet Architecture" 前端设计,后端已完成以下适配性改造。
## 1. API 规范与类型生成 (OpenAPI) ## 1. API 规范与类型生成 (Service Kit 集成) - [已完成]
**目标**: 支持前端通过脚本自动从后端生成 TypeScript 类型定义 (Zod Schemas),确保前后端类型严格一致,实现 "Single Source of Truth" **目标**: 利用项目现有的 `service_kit` 库,实现前端通过脚本自动从后端生成 TypeScript 类型定义 (Zod Schemas),确保前后端类型严格一致。
* **任务**: 在 `api-gateway` 中集成 `utoipa``utoipa-swagger-ui` **实施情况**:
* **要求**: * **Contract 层改造**:
* 暴露 `GET /api-docs/openapi.json` 路由。 * 在 `common-contracts` 中,核心 Struct/Enum (如 `WorkflowEvent`, `TaskStatus`) 已全面使用 `#[api_dto]` 宏标注。
* 确保所有核心 Struct (特别是 `WorkflowEvent`, `TaskStatus`, `LlmProvider`, `AnalysisTemplateSet`) 都在 Schema 中正确导出。 * `#[api_dto]` 自动注入了 `utoipa::ToSchema`,确保了 Schema 的正确导出。
* 对于 Enum 类型,确保导出为带 `type` 字段的 Discriminated Union 格式(如果可能),或者前端通过 generator 处理。
## 2. 动态数据源 Schema 接口 * **API Gateway 改造 (混合模式)**:
* 引入了 `utoipa``utoipa-swagger-ui` 依赖。
* 创建了 `services/api-gateway/src/openapi/mod.rs`,定义了 `ApiDoc` 结构体。
* 在 `services/api-gateway/src/api.rs` 中,手动为 Handler 添加了 `#[utoipa::path(...)]` 标注。
* 在 Router 中挂载了 `/swagger-ui``/api-docs/openapi.json`
**目标**: 实现数据源的插件化和动态发现。前端不应硬编码支持哪些数据源,也不应知道每个数据源需要哪些配置字段。 ## 2. 动态数据源 Schema 接口 - [已完成]
* **背景**: 未来数据源将作为微服务动态插拔。 **目标**: 实现数据源的插件化和动态发现。
* **任务**: 新增接口 `GET /v1/configs/data_sources/schema`
* **响应结构示例**: **实施情况**:
```json * 在 `api-gateway` 中新增了接口 `GET /v1/configs/data_sources/schema`
{ * 定义了 `DataSourceSchemaResponse``DataSourceProviderSchema` DTOs。
"providers": [ * 接口返回了 Tushare, Finnhub, AlphaVantage, Yfinance 的配置 Schema。
{
"id": "tushare", ## 3. 任务进度 (Progress) 字段支持 - [已完成]
"name": "Tushare Pro",
"description": "Official Tushare Data Provider", **目标**: 支持在 UI 上展示细粒度的任务进度条。
"fields": [
{ **实施情况**:
"key": "api_token", * **修改 Contract**: 在 `common-contracts``WorkflowEvent::TaskStateChanged` 中增加了 `progress: Option<u8>` 字段。
"label": "API Token", ```rust
"type": "password", // text, password, select, boolean #[api_dto]
"required": true, pub struct TaskStateChanged {
"placeholder": "Enter your token...", // ... existing fields
"description": "Get it from https://tushare.pro" pub progress: Option<u8>, // 0-100
},
{
"key": "api_url",
"label": "API Endpoint",
"type": "text",
"default": "http://api.tushare.pro",
"required": false
}
]
}
]
} }
``` ```
* **逻辑**: 该接口应聚合当前注册的所有 Data Provider 服务的配置元数据。
## 3. 任务进度 (Progress) 字段支持 ## 4. 下一步计划
**目标**: 支持在 UI 上展示细粒度的任务进度条 (0-100%),而不仅仅是状态切换。
* **背景**: Data Provider 抓取大量数据或 Deep Research 分析时,需要反馈进度。
* **任务**:
1. **修改 Contract**: 在 `common-contracts``WorkflowEvent::TaskStateChanged` 中增加 `progress` 字段。
```rust
pub struct TaskStateChanged {
// ... existing fields
pub progress: Option<u8>, // 0-100
}
```
2. **Orchestrator 适配**: 在处理任务更新时,支持透传进度值。
3. **Provider 适配**: 长耗时任务(如 `fetch_history`)应定期发送带有进度的状态更新。
4. **兼容性**: 对于不支持进度的瞬时任务,开始时为 0完成时为 100。
## 4. 确认项 (无需代码变更,仅作备忘)
* **Logs 处理**: `TaskStateChanged` 中的 `message` 字段将被视为增量日志。前端收到事件时,会将非空的 `message` 追加 (Append) 到该任务的日志列表中。
* **SSE 稳定性**: 确保 `workflow_events_stream` 在连接建立时立即发送 `WorkflowStateSnapshot` (已实现),以处理前端重连场景。
* **前端对接**: 前端可以尝试访问 `http://localhost:4000/api-docs/openapi.json` 来生成类型定义。
* **集成测试**: 验证 `GET /v1/workflow/events/:id` 是否能正确返回带有 `progress` 字段的事件。

21
frontend/Dockerfile Normal file
View File

@ -0,0 +1,21 @@
FROM node:20-slim AS base
ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH"
RUN corepack enable
FROM base AS deps
WORKDIR /app
COPY package.json package-lock.json ./
RUN npm ci
FROM base AS runner
WORKDIR /app
COPY --from=deps /app/node_modules ./node_modules
COPY . .
# Expose port 3001
EXPOSE 3001
# Start dev server by default (overridden by docker-compose)
CMD ["npm", "run", "dev", "--", "--host", "0.0.0.0", "--port", "3001"]

View File

@ -5,6 +5,10 @@ ROOT_DIR=$(pwd)
# Function to cleanup on exit # Function to cleanup on exit
cleanup() { cleanup() {
echo "[E2E] Dumping logs for workflow-orchestrator-service..."
docker logs workflow-orchestrator-service || true
echo "[E2E] Dumping logs for api-gateway..."
docker logs api-gateway || true
echo "[E2E] Dumping logs for report-generator-service..." echo "[E2E] Dumping logs for report-generator-service..."
docker logs report-generator-service || true docker logs report-generator-service || true

View File

@ -67,7 +67,7 @@ async fn subscribe_and_process(state: AppState, client: async_nats::Client) -> R
{ {
error!("Error handling fetch command: {:?}", e); error!("Error handling fetch command: {:?}", e);
if let Some(mut task) = state_clone.tasks.get_mut(&request_id) { if let Some(mut task) = state_clone.tasks.get_mut(&request_id) {
task.status = common_contracts::observability::TaskStatus::Failed; task.status = common_contracts::observability::ObservabilityTaskStatus::Failed;
task.details = format!("Worker failed: {}", e); task.details = format!("Worker failed: {}", e);
} }
} }

View File

@ -6,7 +6,7 @@ use crate::state::{AppState, TaskStore};
use anyhow::Context; use anyhow::Context;
use chrono::{Utc, Datelike, Duration}; use chrono::{Utc, Datelike, Duration};
use common_contracts::messages::{FetchCompanyDataCommand, FinancialsPersistedEvent, DataFetchFailedEvent}; use common_contracts::messages::{FetchCompanyDataCommand, FinancialsPersistedEvent, DataFetchFailedEvent};
use common_contracts::observability::{TaskProgress, TaskStatus}; use common_contracts::observability::{TaskProgress, ObservabilityTaskStatus};
use tracing::{error, info, instrument, warn}; use tracing::{error, info, instrument, warn};
use uuid::Uuid; use uuid::Uuid;
use serde_json::Value; use serde_json::Value;
@ -38,14 +38,14 @@ pub async fn handle_fetch_command(
// Update task status // Update task status
if let Some(mut task) = state.tasks.get_mut(&command.request_id) { if let Some(mut task) = state.tasks.get_mut(&command.request_id) {
task.status = TaskStatus::Failed; task.status = ObservabilityTaskStatus::Failed;
task.details = format!("Failed: {}", e); task.details = format!("Failed: {}", e);
} else { } else {
// If task doesn't exist (e.g. failed at insert), create a failed task // If task doesn't exist (e.g. failed at insert), create a failed task
let task = TaskProgress { let task = TaskProgress {
request_id: command.request_id, request_id: command.request_id,
task_name: format!("alphavantage:{}", command.symbol), task_name: format!("alphavantage:{}", command.symbol),
status: TaskStatus::Failed, status: ObservabilityTaskStatus::Failed,
progress_percent: 0, progress_percent: 0,
details: format!("Failed: {}", e), details: format!("Failed: {}", e),
started_at: Utc::now(), started_at: Utc::now(),
@ -68,7 +68,7 @@ async fn handle_fetch_command_inner(
let task = TaskProgress { let task = TaskProgress {
request_id: command.request_id, request_id: command.request_id,
task_name: format!("alphavantage:{}", command.symbol), task_name: format!("alphavantage:{}", command.symbol),
status: TaskStatus::InProgress, status: ObservabilityTaskStatus::InProgress,
progress_percent: 0, progress_percent: 0,
details: "Initializing...".to_string(), details: "Initializing...".to_string(),
started_at: Utc::now(), started_at: Utc::now(),
@ -329,7 +329,7 @@ async fn handle_fetch_command_inner(
command.request_id, command.request_id,
100, 100,
"Task completed successfully", "Task completed successfully",
Some(TaskStatus::Completed), Some(ObservabilityTaskStatus::Completed),
).await; ).await;
info!("AlphaVantage task completed successfully."); info!("AlphaVantage task completed successfully.");
@ -347,7 +347,7 @@ fn check_av_response(v: &Value) -> Result<()> {
Ok(()) Ok(())
} }
async fn update_task_progress(tasks: &TaskStore, request_id: Uuid, percent: u8, details: &str, status: Option<TaskStatus>) { async fn update_task_progress(tasks: &TaskStore, request_id: Uuid, percent: u8, details: &str, status: Option<ObservabilityTaskStatus>) {
if let Some(mut task) = tasks.get_mut(&request_id) { if let Some(mut task) = tasks.get_mut(&request_id) {
task.progress_percent = percent; task.progress_percent = percent;
task.details = details.to_string(); task.details = details.to_string();
@ -428,6 +428,6 @@ mod integration_tests {
assert!(result.is_ok(), "Worker execution failed: {:?}", result.err()); assert!(result.is_ok(), "Worker execution failed: {:?}", result.err());
let task = state.tasks.get(&request_id).expect("Task should exist"); let task = state.tasks.get(&request_id).expect("Task should exist");
assert_eq!(task.status, TaskStatus::Completed); assert_eq!(task.status, ObservabilityTaskStatus::Completed);
} }
} }

View File

@ -2,6 +2,12 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 4 version = 4
[[package]]
name = "adler2"
version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]] [[package]]
name = "ahash" name = "ahash"
version = "0.7.8" version = "0.7.8"
@ -58,14 +64,26 @@ dependencies = [
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
"service_kit",
"thiserror 2.0.17", "thiserror 2.0.17",
"tokio", "tokio",
"tower-http", "tower-http",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"utoipa",
"utoipa-swagger-ui",
"uuid", "uuid",
] ]
[[package]]
name = "arbitrary"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3d036a3c4ab069c7b410a2ce876bd74808d2d0888a82667669f8e783a898bf1"
dependencies = [
"derive_arbitrary",
]
[[package]] [[package]]
name = "arraydeque" name = "arraydeque"
version = "0.5.1" version = "0.5.1"
@ -488,6 +506,15 @@ version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
[[package]]
name = "crc32fast"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "crossbeam-queue" name = "crossbeam-queue"
version = "0.3.12" version = "0.3.12"
@ -572,6 +599,17 @@ dependencies = [
"serde_core", "serde_core",
] ]
[[package]]
name = "derive_arbitrary"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e567bd82dcff979e4b03460c307b3cdc9e96fde3d73bed1496d2bc75d9dd62a"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.110",
]
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.10.7" version = "0.10.7"
@ -723,6 +761,17 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844" checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844"
[[package]]
name = "flate2"
version = "1.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb"
dependencies = [
"crc32fast",
"libz-rs-sys",
"miniz_oxide",
]
[[package]] [[package]]
name = "flume" name = "flume"
version = "0.11.1" version = "0.11.1"
@ -1356,6 +1405,15 @@ dependencies = [
"vcpkg", "vcpkg",
] ]
[[package]]
name = "libz-rs-sys"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "840db8cf39d9ec4dd794376f38acc40d0fc65eec2a8f484f7fd375b84602becd"
dependencies = [
"zlib-rs",
]
[[package]] [[package]]
name = "linux-raw-sys" name = "linux-raw-sys"
version = "0.11.0" version = "0.11.0"
@ -1426,6 +1484,26 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "miniz_oxide"
version = "0.8.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316"
dependencies = [
"adler2",
"simd-adler32",
]
[[package]] [[package]]
name = "mio" name = "mio"
version = "1.1.0" version = "1.1.0"
@ -2145,6 +2223,40 @@ dependencies = [
"zeroize", "zeroize",
] ]
[[package]]
name = "rust-embed"
version = "8.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "947d7f3fad52b283d261c4c99a084937e2fe492248cb9a68a8435a861b8798ca"
dependencies = [
"rust-embed-impl",
"rust-embed-utils",
"walkdir",
]
[[package]]
name = "rust-embed-impl"
version = "8.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fa2c8c9e8711e10f9c4fd2d64317ef13feaab820a4c51541f1a8c8e2e851ab2"
dependencies = [
"proc-macro2",
"quote",
"rust-embed-utils",
"syn 2.0.110",
"walkdir",
]
[[package]]
name = "rust-embed-utils"
version = "8.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60b161f275cb337fe0a44d924a5f4df0ed69c2c39519858f931ce61c779d3475"
dependencies = [
"sha2",
"walkdir",
]
[[package]] [[package]]
name = "rust-ini" name = "rust-ini"
version = "0.21.3" version = "0.21.3"
@ -2278,6 +2390,15 @@ version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f"
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]] [[package]]
name = "schannel" name = "schannel"
version = "0.1.28" version = "0.1.28"
@ -2576,6 +2697,12 @@ dependencies = [
"rand_core 0.6.4", "rand_core 0.6.4",
] ]
[[package]]
name = "simd-adler32"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe"
[[package]] [[package]]
name = "simdutf8" name = "simdutf8"
version = "0.1.5" version = "0.1.5"
@ -3333,6 +3460,12 @@ version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971"
[[package]]
name = "unicase"
version = "2.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539"
[[package]] [[package]]
name = "unicode-bidi" name = "unicode-bidi"
version = "0.3.18" version = "0.3.18"
@ -3415,6 +3548,31 @@ dependencies = [
"uuid", "uuid",
] ]
[[package]]
name = "utoipa-swagger-ui"
version = "9.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d047458f1b5b65237c2f6dc6db136945667f40a7668627b3490b9513a3d43a55"
dependencies = [
"axum",
"base64",
"mime_guess",
"regex",
"rust-embed",
"serde",
"serde_json",
"url",
"utoipa",
"utoipa-swagger-ui-vendored",
"zip",
]
[[package]]
name = "utoipa-swagger-ui-vendored"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2eebbbfe4093922c2b6734d7c679ebfebd704a0d7e56dfcb0d05818ce28977d"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.18.1" version = "1.18.1"
@ -3445,6 +3603,16 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "walkdir"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
dependencies = [
"same-file",
"winapi-util",
]
[[package]] [[package]]
name = "want" name = "want"
version = "0.3.1" version = "0.3.1"
@ -3594,6 +3762,15 @@ dependencies = [
"wasite", "wasite",
] ]
[[package]]
name = "winapi-util"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22"
dependencies = [
"windows-sys 0.61.2",
]
[[package]] [[package]]
name = "windows-core" name = "windows-core"
version = "0.62.2" version = "0.62.2"
@ -4029,3 +4206,35 @@ dependencies = [
"quote", "quote",
"syn 2.0.110", "syn 2.0.110",
] ]
[[package]]
name = "zip"
version = "3.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12598812502ed0105f607f941c386f43d441e00148fce9dec3ca5ffb0bde9308"
dependencies = [
"arbitrary",
"crc32fast",
"flate2",
"indexmap",
"memchr",
"zopfli",
]
[[package]]
name = "zlib-rs"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f06ae92f42f5e5c42443fd094f245eb656abf56dd7cce9b8b263236565e00f2"
[[package]]
name = "zopfli"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f05cd8797d63865425ff89b5c4a48804f35ba0ce8d125800027ad6017d2b5249"
dependencies = [
"bumpalo",
"crc32fast",
"log",
"simd-adler32",
]

View File

@ -8,6 +8,9 @@ edition = "2024"
axum = "0.8.7" axum = "0.8.7"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
tower-http = { version = "0.6.6", features = ["cors", "trace"] } tower-http = { version = "0.6.6", features = ["cors", "trace"] }
utoipa = { version = "5.4", features = ["chrono", "uuid"] }
utoipa-swagger-ui = { version = "9.0", features = ["axum", "vendored"] }
service_kit = { version = "0.1.2" }
# Shared Contracts # Shared Contracts
common-contracts = { path = "../common-contracts" } common-contracts = { path = "../common-contracts" }

View File

@ -12,7 +12,7 @@ use common_contracts::config_models::{
DataSourcesConfig, LlmProvider, LlmProvidersConfig, DataSourcesConfig, LlmProvider, LlmProvidersConfig,
}; };
use common_contracts::messages::GenerateReportCommand; use common_contracts::messages::GenerateReportCommand;
use common_contracts::observability::{TaskProgress, TaskStatus}; use common_contracts::observability::{TaskProgress, ObservabilityTaskStatus};
use common_contracts::subjects::{NatsSubject, SubjectMessage}; use common_contracts::subjects::{NatsSubject, SubjectMessage};
use common_contracts::symbol_utils::{CanonicalSymbol, Market}; use common_contracts::symbol_utils::{CanonicalSymbol, Market};
use futures_util::future::join_all; use futures_util::future::join_all;
@ -22,18 +22,21 @@ use std::collections::HashMap;
use tokio::try_join; use tokio::try_join;
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use uuid::Uuid; use uuid::Uuid;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use service_kit::api_dto;
mod registry; mod registry;
// --- Request/Response Structs --- // --- Request/Response Structs ---
#[derive(Deserialize)] #[api_dto]
pub struct DataRequest { pub struct DataRequest {
pub symbol: String, pub symbol: String,
pub market: Option<String>, pub market: Option<String>,
pub template_id: String, // Changed to required as it's mandatory for workflow pub template_id: String, // Changed to required as it's mandatory for workflow
} }
#[derive(Serialize)] #[api_dto]
pub struct RequestAcceptedResponse { pub struct RequestAcceptedResponse {
pub request_id: Uuid, pub request_id: Uuid,
pub symbol: String, pub symbol: String,
@ -50,31 +53,73 @@ pub struct AnalysisResultQuery {
pub symbol: String, pub symbol: String,
} }
#[derive(Deserialize)] #[api_dto]
pub struct SymbolResolveRequest { pub struct SymbolResolveRequest {
pub symbol: String, pub symbol: String,
pub market: Option<String>, pub market: Option<String>,
} }
#[derive(Serialize)] #[api_dto]
pub struct SymbolResolveResponse { pub struct SymbolResolveResponse {
pub symbol: String, pub symbol: String,
pub market: String, pub market: String,
} }
// --- Dynamic Schema Structs ---
#[api_dto]
pub struct DataSourceSchemaResponse {
pub providers: Vec<DataSourceProviderSchema>,
}
#[api_dto]
pub struct DataSourceProviderSchema {
pub id: String,
pub name: String,
pub description: String,
pub fields: Vec<ConfigFieldSchema>,
}
#[api_dto]
pub struct ConfigFieldSchema {
pub key: String,
pub label: String,
pub r#type: String, // text, password, select, boolean
pub required: bool,
pub placeholder: Option<String>,
pub description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub default: Option<String>,
}
// --- Router Definition --- // --- Router Definition ---
pub fn create_router(app_state: AppState) -> Router { pub fn create_router(app_state: AppState) -> Router {
Router::new() use crate::openapi::ApiDoc;
let mut router = Router::new()
.route("/health", get(health_check)) .route("/health", get(health_check))
.route("/tasks/{request_id}", get(get_task_progress)) .route("/tasks/{request_id}", get(get_task_progress))
.nest("/v1", create_v1_router()) .nest("/v1", create_v1_router())
.with_state(app_state) .with_state(app_state);
// Mount Swagger UI
router = router.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()));
router
}
async fn mock_chat_completion() -> impl IntoResponse {
use axum::http::header;
let body = "data: {\"id\":\"chatcmpl-mock\",\"object\":\"chat.completion.chunk\",\"created\":1677652288,\"model\":\"gpt-3.5-turbo-0613\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"This is a mocked response.\"},\"finish_reason\":null}]}\n\ndata: {\"id\":\"chatcmpl-mock\",\"object\":\"chat.completion.chunk\",\"created\":1677652288,\"model\":\"gpt-3.5-turbo-0613\",\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":\"stop\"}]}\n\ndata: [DONE]\n\n";
(StatusCode::OK, [(header::CONTENT_TYPE, "text/event-stream")], body)
} }
use common_contracts::messages::{StartWorkflowCommand, SyncStateCommand, WorkflowEvent}; use common_contracts::messages::{StartWorkflowCommand, SyncStateCommand, WorkflowEvent};
fn create_v1_router() -> Router<AppState> { fn create_v1_router() -> Router<AppState> {
Router::new() Router::new()
// Mock LLM for E2E
.route("/mock/chat/completions", post(mock_chat_completion))
// New Workflow API // New Workflow API
.route("/workflow/start", post(start_workflow)) .route("/workflow/start", post(start_workflow))
.route("/workflow/events/{request_id}", get(workflow_events_stream)) .route("/workflow/events/{request_id}", get(workflow_events_stream))
@ -107,6 +152,10 @@ fn create_v1_router() -> Router<AppState> {
"/configs/data_sources", "/configs/data_sources",
get(get_data_sources_config).put(update_data_sources_config), get(get_data_sources_config).put(update_data_sources_config),
) )
.route(
"/configs/data_sources/schema",
get(get_data_source_schema),
)
.route("/configs/test", post(test_data_source_config)) .route("/configs/test", post(test_data_source_config))
.route("/configs/llm/test", post(test_llm_config)) .route("/configs/llm/test", post(test_llm_config))
.route("/config", get(get_legacy_system_config)) .route("/config", get(get_legacy_system_config))
@ -223,7 +272,7 @@ fn project_data_sources(
(key, entry) (key, entry)
}) })
.collect() .collect()
} }
fn provider_id(provider: &DataSourceProvider) -> &'static str { fn provider_id(provider: &DataSourceProvider) -> &'static str {
match provider { match provider {
@ -250,6 +299,14 @@ fn infer_market(symbol: &str) -> String {
/// [POST /v1/tools/resolve-symbol] /// [POST /v1/tools/resolve-symbol]
/// Resolves and normalizes a symbol without starting a workflow. /// Resolves and normalizes a symbol without starting a workflow.
#[utoipa::path(
post,
path = "/v1/tools/resolve-symbol",
request_body = SymbolResolveRequest,
responses(
(status = 200, description = "Symbol resolved", body = SymbolResolveResponse)
)
)]
async fn resolve_symbol(Json(payload): Json<SymbolResolveRequest>) -> Result<impl IntoResponse> { async fn resolve_symbol(Json(payload): Json<SymbolResolveRequest>) -> Result<impl IntoResponse> {
let market = if let Some(m) = payload.market { let market = if let Some(m) = payload.market {
if m.is_empty() { if m.is_empty() {
@ -272,6 +329,14 @@ async fn resolve_symbol(Json(payload): Json<SymbolResolveRequest>) -> Result<imp
/// [POST /v1/workflow/start] /// [POST /v1/workflow/start]
/// Initiates a new analysis workflow via the Orchestrator. /// Initiates a new analysis workflow via the Orchestrator.
#[utoipa::path(
post,
path = "/v1/workflow/start",
request_body = DataRequest,
responses(
(status = 202, description = "Workflow started", body = RequestAcceptedResponse)
)
)]
async fn start_workflow( async fn start_workflow(
State(state): State<AppState>, State(state): State<AppState>,
Json(payload): Json<DataRequest>, Json(payload): Json<DataRequest>,
@ -369,6 +434,13 @@ async fn trigger_data_fetch_legacy(
start_workflow(State(state), Json(payload)).await start_workflow(State(state), Json(payload)).await
} }
#[utoipa::path(
get,
path = "/health",
responses(
(status = 200, description = "Service healthy")
)
)]
async fn health_check() -> impl IntoResponse { async fn health_check() -> impl IntoResponse {
(StatusCode::OK, "OK") (StatusCode::OK, "OK")
} }
@ -476,6 +548,17 @@ async fn get_financials_by_symbol(
/// [GET /v1/tasks/:request_id] /// [GET /v1/tasks/:request_id]
/// Aggregates task progress from all downstream provider services. /// Aggregates task progress from all downstream provider services.
#[utoipa::path(
get,
path = "/tasks/{request_id}",
params(
("request_id" = Uuid, Path, description = "Request ID to query tasks for")
),
responses(
(status = 200, description = "Task progress list", body = Vec<TaskProgress>),
(status = 404, description = "Tasks not found")
)
)]
async fn get_task_progress( async fn get_task_progress(
State(state): State<AppState>, State(state): State<AppState>,
Path(request_id): Path<Uuid>, Path(request_id): Path<Uuid>,
@ -497,7 +580,7 @@ async fn get_task_progress(
Some(vec![TaskProgress { Some(vec![TaskProgress {
request_id, request_id,
task_name: format!("{}:unreachable", service_id_clone), task_name: format!("{}:unreachable", service_id_clone),
status: TaskStatus::Failed, status: ObservabilityTaskStatus::Failed,
progress_percent: 0, progress_percent: 0,
details: "Invalid response format".to_string(), details: "Invalid response format".to_string(),
started_at: chrono::Utc::now(), started_at: chrono::Utc::now(),
@ -510,7 +593,7 @@ async fn get_task_progress(
Some(vec![TaskProgress { Some(vec![TaskProgress {
request_id, request_id,
task_name: format!("{}:unreachable", service_id_clone), task_name: format!("{}:unreachable", service_id_clone),
status: TaskStatus::Failed, status: ObservabilityTaskStatus::Failed,
progress_percent: 0, progress_percent: 0,
details: format!("Connection Error: {}", e), details: format!("Connection Error: {}", e),
started_at: chrono::Utc::now(), started_at: chrono::Utc::now(),
@ -552,15 +635,23 @@ async fn get_task_progress(
// --- New Config Test Handler --- // --- New Config Test Handler ---
#[derive(Deserialize, Debug)] #[api_dto]
struct TestConfigRequest { pub struct TestConfigRequest {
r#type: String, pub r#type: String,
#[serde(flatten)] #[serde(flatten)]
data: serde_json::Value, pub data: serde_json::Value,
} }
/// [POST /v1/configs/test] /// [POST /v1/configs/test]
/// Forwards a configuration test request to the appropriate downstream service. /// Forwards a configuration test request to the appropriate downstream service.
#[utoipa::path(
post,
path = "/v1/configs/test",
request_body = TestConfigRequest,
responses(
(status = 200, description = "Configuration test result (JSON)")
)
)]
async fn test_data_source_config( async fn test_data_source_config(
State(state): State<AppState>, State(state): State<AppState>,
Json(payload): Json<TestConfigRequest>, Json(payload): Json<TestConfigRequest>,
@ -612,13 +703,22 @@ async fn test_data_source_config(
} }
} }
#[derive(Deserialize, Serialize)] #[api_dto]
struct TestLlmConfigRequest { pub struct TestLlmConfigRequest {
api_base_url: String, pub api_base_url: String,
api_key: String, pub api_key: String,
model_id: String, pub model_id: String,
} }
/// [POST /v1/configs/llm/test]
#[utoipa::path(
post,
path = "/v1/configs/llm/test",
request_body = TestLlmConfigRequest,
responses(
(status = 200, description = "LLM config test result (JSON)")
)
)]
async fn test_llm_config( async fn test_llm_config(
State(state): State<AppState>, State(state): State<AppState>,
Json(payload): Json<TestLlmConfigRequest>, Json(payload): Json<TestLlmConfigRequest>,
@ -654,12 +754,27 @@ async fn test_llm_config(
// --- Config API Handlers (Proxy to data-persistence-service) --- // --- Config API Handlers (Proxy to data-persistence-service) ---
/// [GET /v1/configs/llm_providers] /// [GET /v1/configs/llm_providers]
#[utoipa::path(
get,
path = "/v1/configs/llm_providers",
responses(
(status = 200, description = "LLM providers configuration", body = LlmProvidersConfig)
)
)]
async fn get_llm_providers_config(State(state): State<AppState>) -> Result<impl IntoResponse> { async fn get_llm_providers_config(State(state): State<AppState>) -> Result<impl IntoResponse> {
let config = state.persistence_client.get_llm_providers_config().await?; let config = state.persistence_client.get_llm_providers_config().await?;
Ok(Json(config)) Ok(Json(config))
} }
/// [PUT /v1/configs/llm_providers] /// [PUT /v1/configs/llm_providers]
#[utoipa::path(
put,
path = "/v1/configs/llm_providers",
request_body = LlmProvidersConfig,
responses(
(status = 200, description = "Updated LLM providers configuration", body = LlmProvidersConfig)
)
)]
async fn update_llm_providers_config( async fn update_llm_providers_config(
State(state): State<AppState>, State(state): State<AppState>,
Json(payload): Json<LlmProvidersConfig>, Json(payload): Json<LlmProvidersConfig>,
@ -672,6 +787,13 @@ async fn update_llm_providers_config(
} }
/// [GET /v1/configs/analysis_template_sets] /// [GET /v1/configs/analysis_template_sets]
#[utoipa::path(
get,
path = "/v1/configs/analysis_template_sets",
responses(
(status = 200, description = "Analysis template sets configuration", body = AnalysisTemplateSets)
)
)]
async fn get_analysis_template_sets(State(state): State<AppState>) -> Result<impl IntoResponse> { async fn get_analysis_template_sets(State(state): State<AppState>) -> Result<impl IntoResponse> {
let config = state let config = state
.persistence_client .persistence_client
@ -681,6 +803,14 @@ async fn get_analysis_template_sets(State(state): State<AppState>) -> Result<imp
} }
/// [PUT /v1/configs/analysis_template_sets] /// [PUT /v1/configs/analysis_template_sets]
#[utoipa::path(
put,
path = "/v1/configs/analysis_template_sets",
request_body = AnalysisTemplateSets,
responses(
(status = 200, description = "Updated analysis template sets configuration", body = AnalysisTemplateSets)
)
)]
async fn update_analysis_template_sets( async fn update_analysis_template_sets(
State(state): State<AppState>, State(state): State<AppState>,
Json(payload): Json<AnalysisTemplateSets>, Json(payload): Json<AnalysisTemplateSets>,
@ -693,12 +823,27 @@ async fn update_analysis_template_sets(
} }
/// [GET /v1/configs/data_sources] /// [GET /v1/configs/data_sources]
#[utoipa::path(
get,
path = "/v1/configs/data_sources",
responses(
(status = 200, description = "Data sources configuration", body = DataSourcesConfig)
)
)]
async fn get_data_sources_config(State(state): State<AppState>) -> Result<impl IntoResponse> { async fn get_data_sources_config(State(state): State<AppState>) -> Result<impl IntoResponse> {
let config = state.persistence_client.get_data_sources_config().await?; let config = state.persistence_client.get_data_sources_config().await?;
Ok(Json(config)) Ok(Json(config))
} }
/// [PUT /v1/configs/data_sources] /// [PUT /v1/configs/data_sources]
#[utoipa::path(
put,
path = "/v1/configs/data_sources",
request_body = DataSourcesConfig,
responses(
(status = 200, description = "Updated data sources configuration", body = DataSourcesConfig)
)
)]
async fn update_data_sources_config( async fn update_data_sources_config(
State(state): State<AppState>, State(state): State<AppState>,
Json(payload): Json<DataSourcesConfig>, Json(payload): Json<DataSourcesConfig>,
@ -710,7 +855,104 @@ async fn update_data_sources_config(
Ok(Json(updated_config)) Ok(Json(updated_config))
} }
/// [GET /v1/configs/data_sources/schema]
/// Returns the schema definitions for all supported data sources (for dynamic UI generation).
#[utoipa::path(
get,
path = "/v1/configs/data_sources/schema",
responses(
(status = 200, description = "Data sources schema", body = DataSourceSchemaResponse)
)
)]
async fn get_data_source_schema() -> Result<impl IntoResponse> {
let tushare = DataSourceProviderSchema {
id: "tushare".to_string(),
name: "Tushare Pro".to_string(),
description: "Official Tushare Data Provider".to_string(),
fields: vec![
ConfigFieldSchema {
key: "api_token".to_string(),
label: "API Token".to_string(),
r#type: "password".to_string(),
required: true,
placeholder: Some("Enter your token...".to_string()),
description: Some("Get it from https://tushare.pro".to_string()),
default: None,
},
ConfigFieldSchema {
key: "api_url".to_string(),
label: "API Endpoint".to_string(),
r#type: "text".to_string(),
required: false,
placeholder: None,
description: None,
default: Some("http://api.tushare.pro".to_string()),
},
],
};
let finnhub = DataSourceProviderSchema {
id: "finnhub".to_string(),
name: "Finnhub".to_string(),
description: "Finnhub Stock API".to_string(),
fields: vec![
ConfigFieldSchema {
key: "api_key".to_string(),
label: "API Key".to_string(),
r#type: "password".to_string(),
required: true,
placeholder: Some("Enter your API key...".to_string()),
description: Some("Get it from https://finnhub.io".to_string()),
default: None,
},
],
};
let alphavantage = DataSourceProviderSchema {
id: "alphavantage".to_string(),
name: "Alpha Vantage".to_string(),
description: "Alpha Vantage API".to_string(),
fields: vec![
ConfigFieldSchema {
key: "api_key".to_string(),
label: "API Key".to_string(),
r#type: "password".to_string(),
required: true,
placeholder: Some("Enter your API key...".to_string()),
description: Some("Get it from https://www.alphavantage.co".to_string()),
default: None,
},
],
};
let yfinance = DataSourceProviderSchema {
id: "yfinance".to_string(),
name: "Yahoo Finance".to_string(),
description: "Yahoo Finance API (Unofficial)".to_string(),
fields: vec![
// No fields required usually, maybe proxy
],
};
Ok(Json(DataSourceSchemaResponse {
providers: vec![tushare, finnhub, alphavantage, yfinance],
}))
}
/// [GET /v1/discover-models/:provider_id] /// [GET /v1/discover-models/:provider_id]
#[utoipa::path(
get,
path = "/v1/discover-models/{provider_id}",
params(
("provider_id" = String, Path, description = "Provider ID to discover models for")
),
responses(
(status = 200, description = "Discovered models (JSON)"),
(status = 404, description = "Provider not found"),
(status = 502, description = "Provider error")
)
)]
async fn discover_models( async fn discover_models(
State(state): State<AppState>, State(state): State<AppState>,
Path(provider_id): Path<String>, Path(provider_id): Path<String>,
@ -762,14 +1004,23 @@ async fn discover_models(
} }
} }
#[derive(Deserialize)] #[api_dto]
struct DiscoverPreviewRequest { pub struct DiscoverPreviewRequest {
api_base_url: String, pub api_base_url: String,
api_key: String, pub api_key: String,
} }
/// [POST /v1/discover-models] /// [POST /v1/discover-models]
/// Preview discovery without persisting provider configuration. /// Preview discovery without persisting provider configuration.
#[utoipa::path(
post,
path = "/v1/discover-models",
request_body = DiscoverPreviewRequest,
responses(
(status = 200, description = "Discovered models (JSON)"),
(status = 502, description = "Provider error")
)
)]
async fn discover_models_preview( async fn discover_models_preview(
Json(payload): Json<DiscoverPreviewRequest>, Json(payload): Json<DiscoverPreviewRequest>,
) -> Result<impl IntoResponse> { ) -> Result<impl IntoResponse> {

View File

@ -3,6 +3,9 @@ mod config;
mod error; mod error;
mod persistence; mod persistence;
mod state; mod state;
mod openapi;
#[cfg(test)]
mod openapi_tests;
use crate::config::AppConfig; use crate::config::AppConfig;
use crate::error::Result; use crate::error::Result;

View File

@ -0,0 +1,68 @@
use utoipa::OpenApi;
use common_contracts::messages::*;
use common_contracts::observability::*;
use common_contracts::config_models::*;
use crate::api;
#[derive(OpenApi)]
#[openapi(
paths(
api::health_check,
api::start_workflow,
api::get_task_progress,
api::resolve_symbol,
api::get_llm_providers_config,
api::update_llm_providers_config,
api::get_analysis_template_sets,
api::update_analysis_template_sets,
api::get_data_sources_config,
api::update_data_sources_config,
api::test_data_source_config,
api::test_llm_config,
api::discover_models,
api::discover_models_preview,
api::get_data_source_schema, // New endpoint
),
components(
schemas(
// Workflow
StartWorkflowCommand,
WorkflowEvent,
WorkflowDag,
TaskNode,
TaskDependency,
TaskType,
TaskStatus,
// Observability
TaskProgress,
ServiceStatus,
HealthStatus,
ObservabilityTaskStatus,
// Configs
LlmProvider,
LlmModel,
AnalysisTemplateSet,
AnalysisModuleConfig,
DataSourceConfig,
DataSourceProvider,
// Request/Response
api::DataRequest,
api::RequestAcceptedResponse,
api::SymbolResolveRequest,
api::SymbolResolveResponse,
api::TestConfigRequest,
api::TestLlmConfigRequest,
api::DataSourceSchemaResponse,
api::DataSourceProviderSchema,
api::ConfigFieldSchema,
)
),
tags(
(name = "workflow", description = "Workflow management endpoints"),
(name = "config", description = "Configuration management endpoints"),
(name = "tools", description = "Utility tools"),
(name = "observability", description = "System observability"),
)
)]
pub struct ApiDoc;

View File

@ -0,0 +1,32 @@
#[cfg(test)]
mod tests {
use crate::openapi::ApiDoc;
use utoipa::OpenApi;
#[test]
fn test_openapi_schema_generation() {
let doc = ApiDoc::openapi();
let json = doc.to_json().expect("Failed to serialize OpenAPI doc");
// Basic validation
assert!(json.contains("\"openapi\":\"3.1.0\""), "Should specify OpenAPI version");
assert!(json.contains("\"title\":\"api-gateway\""), "Should have title");
// Check key schemas exist
assert!(json.contains("\"WorkflowEvent\""), "Should contain WorkflowEvent schema");
assert!(json.contains("\"DataSourceConfig\""), "Should contain DataSourceConfig schema");
assert!(json.contains("\"DataSourceSchemaResponse\""), "Should contain DataSourceSchemaResponse schema");
assert!(json.contains("\"TaskProgress\""), "Should contain TaskProgress schema");
// Check new endpoints exist
assert!(json.contains("/v1/configs/data_sources/schema"), "Should contain schema endpoint");
assert!(json.contains("/v1/workflow/start"), "Should contain workflow start endpoint");
// Check progress field
assert!(json.contains("\"progress\""), "Should contain progress field in TaskStateChanged (inside WorkflowEvent)");
// Optional: Print for manual inspection
// println!("{}", json);
}
}

View File

@ -1,9 +1,9 @@
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use std::collections::HashMap; use std::collections::HashMap;
use utoipa::ToSchema; use service_kit::api_dto;
// 单个启用的模型 // 单个启用的模型
#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)] #[api_dto]
pub struct LlmModel { pub struct LlmModel {
pub model_id: String, // e.g., "gpt-4o" pub model_id: String, // e.g., "gpt-4o"
pub name: Option<String>, // 别名用于UI显示 pub name: Option<String>, // 别名用于UI显示
@ -11,7 +11,7 @@ pub struct LlmModel {
} }
// 单个LLM供应商的完整配置 // 单个LLM供应商的完整配置
#[derive(Serialize, Deserialize, Debug, Clone, ToSchema)] #[api_dto]
pub struct LlmProvider { pub struct LlmProvider {
pub name: String, // "OpenAI 官方" pub name: String, // "OpenAI 官方"
pub api_base_url: String, pub api_base_url: String,
@ -30,7 +30,8 @@ pub type AnalysisTemplateSets = HashMap<String, AnalysisTemplateSet>;
/// A single, self-contained set of analysis modules representing a complete workflow. /// A single, self-contained set of analysis modules representing a complete workflow.
/// e.g., "Standard Fundamental Analysis" /// e.g., "Standard Fundamental Analysis"
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, ToSchema)] #[api_dto]
#[derive(PartialEq)]
pub struct AnalysisTemplateSet { pub struct AnalysisTemplateSet {
/// Human-readable name for the template set. /// Human-readable name for the template set.
pub name: String, pub name: String,
@ -40,7 +41,8 @@ pub struct AnalysisTemplateSet {
} }
/// Configuration for a single analysis module. /// Configuration for a single analysis module.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, ToSchema)] #[api_dto]
#[derive(PartialEq)]
pub struct AnalysisModuleConfig { pub struct AnalysisModuleConfig {
pub name: String, pub name: String,
pub provider_id: String, pub provider_id: String,
@ -73,7 +75,8 @@ pub struct SystemConfig {
pub analysis_modules: AnalysisModulesConfig, pub analysis_modules: AnalysisModulesConfig,
} }
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, ToSchema)] #[api_dto]
#[derive(PartialEq)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum DataSourceProvider { pub enum DataSourceProvider {
Tushare, Tushare,
@ -82,7 +85,8 @@ pub enum DataSourceProvider {
Yfinance, Yfinance,
} }
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, ToSchema)] #[api_dto]
#[derive(PartialEq)]
pub struct DataSourceConfig { pub struct DataSourceConfig {
pub provider: DataSourceProvider, pub provider: DataSourceProvider,
pub api_key: Option<String>, pub api_key: Option<String>,

View File

@ -91,12 +91,12 @@ pub struct RealtimeQuoteDto {
pub source: Option<String>, pub source: Option<String>,
} }
use crate::observability::TaskStatus; use crate::observability::ObservabilityTaskStatus;
#[api_dto] #[api_dto]
pub struct ProviderStatusDto { pub struct ProviderStatusDto {
pub last_updated: chrono::DateTime<chrono::Utc>, pub last_updated: chrono::DateTime<chrono::Utc>,
pub status: TaskStatus, pub status: ObservabilityTaskStatus,
pub data_version: Option<String>, pub data_version: Option<String>,
} }

View File

@ -1,8 +1,8 @@
use serde::{Serialize, Deserialize};
use uuid::Uuid; use uuid::Uuid;
use crate::symbol_utils::CanonicalSymbol; use crate::symbol_utils::CanonicalSymbol;
use crate::subjects::{NatsSubject, SubjectMessage}; use crate::subjects::{NatsSubject, SubjectMessage};
use std::collections::HashMap; use std::collections::HashMap;
use service_kit::api_dto;
// --- Commands --- // --- Commands ---
@ -10,7 +10,7 @@ use std::collections::HashMap;
/// Command to initiate a new workflow. /// Command to initiate a new workflow.
/// Published by: `api-gateway` /// Published by: `api-gateway`
/// Consumed by: `workflow-orchestrator` /// Consumed by: `workflow-orchestrator`
#[derive(Clone, Debug, Serialize, Deserialize)] #[api_dto]
pub struct StartWorkflowCommand { pub struct StartWorkflowCommand {
pub request_id: Uuid, pub request_id: Uuid,
pub symbol: CanonicalSymbol, pub symbol: CanonicalSymbol,
@ -28,7 +28,7 @@ impl SubjectMessage for StartWorkflowCommand {
/// Command to request a state snapshot for re-alignment. /// Command to request a state snapshot for re-alignment.
/// Published by: `api-gateway` (on client connect/reconnect) /// Published by: `api-gateway` (on client connect/reconnect)
/// Consumed by: `workflow-orchestrator` /// Consumed by: `workflow-orchestrator`
#[derive(Clone, Debug, Serialize, Deserialize)] #[api_dto]
pub struct SyncStateCommand { pub struct SyncStateCommand {
pub request_id: Uuid, pub request_id: Uuid,
} }
@ -42,7 +42,7 @@ impl SubjectMessage for SyncStateCommand {
/// Command to trigger data fetching. /// Command to trigger data fetching.
/// Published by: `workflow-orchestrator` (previously api-gateway) /// Published by: `workflow-orchestrator` (previously api-gateway)
/// Consumed by: `*-provider-services` /// Consumed by: `*-provider-services`
#[derive(Clone, Debug, Serialize, Deserialize)] #[api_dto]
pub struct FetchCompanyDataCommand { pub struct FetchCompanyDataCommand {
pub request_id: Uuid, pub request_id: Uuid,
pub symbol: CanonicalSymbol, pub symbol: CanonicalSymbol,
@ -59,7 +59,7 @@ impl SubjectMessage for FetchCompanyDataCommand {
/// Command to start a full report generation workflow. /// Command to start a full report generation workflow.
/// Published by: `workflow-orchestrator` (previously api-gateway) /// Published by: `workflow-orchestrator` (previously api-gateway)
/// Consumed by: `report-generator-service` /// Consumed by: `report-generator-service`
#[derive(Clone, Debug, Serialize, Deserialize)] #[api_dto]
pub struct GenerateReportCommand { pub struct GenerateReportCommand {
pub request_id: Uuid, pub request_id: Uuid,
pub symbol: CanonicalSymbol, pub symbol: CanonicalSymbol,
@ -76,7 +76,7 @@ impl SubjectMessage for GenerateReportCommand {
// Topic: events.workflow.{request_id} // Topic: events.workflow.{request_id}
/// Unified event stream for frontend consumption. /// Unified event stream for frontend consumption.
#[derive(Serialize, Deserialize, Debug, Clone)] #[api_dto]
#[serde(tag = "type", content = "payload")] #[serde(tag = "type", content = "payload")]
pub enum WorkflowEvent { pub enum WorkflowEvent {
// 1. 流程初始化 (携带完整的任务依赖图) // 1. 流程初始化 (携带完整的任务依赖图)
@ -92,7 +92,8 @@ pub enum WorkflowEvent {
task_type: TaskType, // DataFetch | DataProcessing | Analysis task_type: TaskType, // DataFetch | DataProcessing | Analysis
status: TaskStatus, // Pending, Scheduled, Running, Completed, Failed, Skipped status: TaskStatus, // Pending, Scheduled, Running, Completed, Failed, Skipped
message: Option<String>, message: Option<String>,
timestamp: i64 timestamp: i64,
progress: Option<u8>, // 0-100
}, },
// 3. 任务流式输出 (用于 LLM 打字机效果) // 3. 任务流式输出 (用于 LLM 打字机效果)
@ -124,19 +125,19 @@ pub enum WorkflowEvent {
} }
} }
#[derive(Serialize, Deserialize, Debug, Clone)] #[api_dto]
pub struct WorkflowDag { pub struct WorkflowDag {
pub nodes: Vec<TaskNode>, pub nodes: Vec<TaskNode>,
pub edges: Vec<TaskDependency> // from -> to pub edges: Vec<TaskDependency> // from -> to
} }
#[derive(Serialize, Deserialize, Debug, Clone)] #[api_dto]
pub struct TaskDependency { pub struct TaskDependency {
pub from: String, pub from: String,
pub to: String, pub to: String,
} }
#[derive(Serialize, Deserialize, Debug, Clone)] #[api_dto]
pub struct TaskNode { pub struct TaskNode {
pub id: String, pub id: String,
pub name: String, pub name: String,
@ -144,14 +145,16 @@ pub struct TaskNode {
pub initial_status: TaskStatus pub initial_status: TaskStatus
} }
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Copy)] #[api_dto]
#[derive(Copy, PartialEq)]
pub enum TaskType { pub enum TaskType {
DataFetch, // 创造原始上下文 DataFetch, // 创造原始上下文
DataProcessing, // 消耗并转换上下文 (New) DataProcessing, // 消耗并转换上下文 (New)
Analysis // 读取上下文生成新内容 Analysis // 读取上下文生成新内容
} }
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Copy)] #[api_dto]
#[derive(Copy, PartialEq)]
pub enum TaskStatus { pub enum TaskStatus {
Pending, // 等待依赖 Pending, // 等待依赖
Scheduled, // 依赖满足,已下发给 Worker Scheduled, // 依赖满足,已下发给 Worker
@ -161,13 +164,13 @@ pub enum TaskStatus {
Skipped // 因上游失败或策略原因被跳过 Skipped // 因上游失败或策略原因被跳过
} }
#[derive(Serialize, Deserialize, Debug, Clone)] #[api_dto]
pub struct CompanyProfilePersistedEvent { pub struct CompanyProfilePersistedEvent {
pub request_id: Uuid, pub request_id: Uuid,
pub symbol: CanonicalSymbol, pub symbol: CanonicalSymbol,
} }
#[derive(Serialize, Deserialize, Debug, Clone)] #[api_dto]
pub struct FinancialsPersistedEvent { pub struct FinancialsPersistedEvent {
pub request_id: Uuid, pub request_id: Uuid,
pub symbol: CanonicalSymbol, pub symbol: CanonicalSymbol,
@ -187,7 +190,7 @@ impl SubjectMessage for FinancialsPersistedEvent {
} }
} }
#[derive(Serialize, Deserialize, Debug, Clone)] #[api_dto]
pub struct DataFetchFailedEvent { pub struct DataFetchFailedEvent {
pub request_id: Uuid, pub request_id: Uuid,
pub symbol: CanonicalSymbol, pub symbol: CanonicalSymbol,
@ -206,7 +209,7 @@ impl SubjectMessage for DataFetchFailedEvent {
// Topic: events.analysis.report_generated // Topic: events.analysis.report_generated
/// Event emitted when a report generation task (or sub-module) is completed. /// Event emitted when a report generation task (or sub-module) is completed.
/// Consumed by: `workflow-orchestrator` /// Consumed by: `workflow-orchestrator`
#[derive(Serialize, Deserialize, Debug, Clone)] #[api_dto]
pub struct ReportGeneratedEvent { pub struct ReportGeneratedEvent {
pub request_id: Uuid, pub request_id: Uuid,
pub symbol: CanonicalSymbol, pub symbol: CanonicalSymbol,
@ -222,7 +225,7 @@ impl SubjectMessage for ReportGeneratedEvent {
} }
// Topic: events.analysis.report_failed // Topic: events.analysis.report_failed
#[derive(Serialize, Deserialize, Debug, Clone)] #[api_dto]
pub struct ReportFailedEvent { pub struct ReportFailedEvent {
pub request_id: Uuid, pub request_id: Uuid,
pub symbol: CanonicalSymbol, pub symbol: CanonicalSymbol,

View File

@ -1,16 +1,17 @@
use std::collections::HashMap; use std::collections::HashMap;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use serde::{Serialize, Deserialize};
use uuid::Uuid; use uuid::Uuid;
use service_kit::api_dto;
#[derive(Serialize, Deserialize, Debug, Clone, Copy)] #[api_dto]
#[derive(Copy)]
pub enum ServiceStatus { pub enum ServiceStatus {
Ok, Ok,
Degraded, Degraded,
Unhealthy, Unhealthy,
} }
#[derive(Serialize, Deserialize, Debug, Clone)] #[api_dto]
pub struct HealthStatus { pub struct HealthStatus {
pub module_id: String, pub module_id: String,
pub status: ServiceStatus, pub status: ServiceStatus,
@ -18,20 +19,21 @@ pub struct HealthStatus {
pub details: HashMap<String, String>, pub details: HashMap<String, String>,
} }
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, utoipa::ToSchema)] #[api_dto]
#[derive(Copy, PartialEq, Eq)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum TaskStatus { pub enum ObservabilityTaskStatus {
Queued, Queued,
InProgress, InProgress,
Completed, Completed,
Failed, Failed,
} }
#[derive(Serialize, Deserialize, Debug, Clone)] #[api_dto]
pub struct TaskProgress { pub struct TaskProgress {
pub request_id: Uuid, pub request_id: Uuid,
pub task_name: String, pub task_name: String,
pub status: TaskStatus, pub status: ObservabilityTaskStatus,
pub progress_percent: u8, pub progress_percent: u8,
pub details: String, pub details: String,
pub started_at: DateTime<Utc>, pub started_at: DateTime<Utc>,

View File

@ -1,7 +1,8 @@
use serde::{Deserialize, Serialize};
use std::fmt; use std::fmt;
use service_kit::api_dto;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[api_dto]
#[derive(PartialEq, Eq)]
pub enum Market { pub enum Market {
US, US,
CN, CN,
@ -25,8 +26,10 @@ impl From<&str> for Market {
/// CanonicalSymbol 是系统内部唯一的股票代码标识符类型 /// CanonicalSymbol 是系统内部唯一的股票代码标识符类型
/// 它封装了一个标准化的字符串(遵循 Yahoo Finance 格式) /// 它封装了一个标准化的字符串(遵循 Yahoo Finance 格式)
/// 使用 newtype 模式防止与普通 String 混淆 /// 使用 newtype 模式防止与普通 String 混淆
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] #[api_dto]
#[derive(PartialEq, Eq, Hash)]
#[serde(transparent)] #[serde(transparent)]
#[schema(value_type = String, example = "600519.SS")]
pub struct CanonicalSymbol(String); pub struct CanonicalSymbol(String);
impl CanonicalSymbol { impl CanonicalSymbol {

View File

@ -3068,9 +3068,16 @@ dependencies = [
"serde_json", "serde_json",
"url", "url",
"utoipa", "utoipa",
"utoipa-swagger-ui-vendored",
"zip", "zip",
] ]
[[package]]
name = "utoipa-swagger-ui-vendored"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2eebbbfe4093922c2b6734d7c679ebfebd704a0d7e56dfcb0d05818ce28977d"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.18.1" version = "1.18.1"

View File

@ -43,7 +43,7 @@ serde_json = "1.0"
# OpenAPI & Schema # OpenAPI & Schema
utoipa = { version = "5.4", features = ["axum_extras", "chrono", "uuid"] } utoipa = { version = "5.4", features = ["axum_extras", "chrono", "uuid"] }
utoipa-swagger-ui = { version = "9.0", features = ["axum"] } utoipa-swagger-ui = { version = "9.0", features = ["axum", "vendored"] }
# Environment variables # Environment variables
dotenvy = "0.15" dotenvy = "0.15"

View File

@ -31,6 +31,7 @@ RUN cargo build --release --bin data-persistence-service-server
FROM debian:bookworm-slim AS runtime FROM debian:bookworm-slim AS runtime
WORKDIR /app WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates libssl-dev curl && rm -rf /var/lib/apt/lists/*
RUN groupadd --system --gid 1001 appuser && \ RUN groupadd --system --gid 1001 appuser && \
useradd --system --uid 1001 --gid 1001 appuser useradd --system --uid 1001 --gid 1001 appuser
USER appuser USER appuser

View File

@ -1,7 +1,7 @@
use crate::error::Result; use crate::error::Result;
use crate::state::{AppState, ServiceOperationalStatus}; use crate::state::{AppState, ServiceOperationalStatus};
use common_contracts::messages::FetchCompanyDataCommand; use common_contracts::messages::FetchCompanyDataCommand;
use common_contracts::observability::TaskStatus; use common_contracts::observability::ObservabilityTaskStatus;
use common_contracts::subjects::NatsSubject; use common_contracts::subjects::NatsSubject;
use futures_util::StreamExt; use futures_util::StreamExt;
use std::sync::Arc; use std::sync::Arc;
@ -113,7 +113,7 @@ pub async fn subscribe_to_data_commands(app_state: Arc<AppState>, nats_client: a
app_state.tasks.insert(task_id, common_contracts::observability::TaskProgress { app_state.tasks.insert(task_id, common_contracts::observability::TaskProgress {
request_id: task_id, request_id: task_id,
task_name: format!("finnhub:{}", command.symbol), task_name: format!("finnhub:{}", command.symbol),
status: TaskStatus::Queued, status: ObservabilityTaskStatus::Queued,
progress_percent: 0, progress_percent: 0,
details: "Command received".to_string(), details: "Command received".to_string(),
started_at: chrono::Utc::now(), started_at: chrono::Utc::now(),

View File

@ -4,7 +4,7 @@ use crate::state::AppState;
use chrono::{Datelike, Utc, Duration}; use chrono::{Datelike, Utc, Duration};
use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto, SessionDataDto, ProviderCacheDto}; use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto, SessionDataDto, ProviderCacheDto};
use common_contracts::messages::{CompanyProfilePersistedEvent, FetchCompanyDataCommand, FinancialsPersistedEvent, DataFetchFailedEvent}; use common_contracts::messages::{CompanyProfilePersistedEvent, FetchCompanyDataCommand, FinancialsPersistedEvent, DataFetchFailedEvent};
use common_contracts::observability::{TaskProgress, TaskStatus}; use common_contracts::observability::{TaskProgress, ObservabilityTaskStatus};
use tracing::{error, info}; use tracing::{error, info};
pub async fn handle_fetch_command( pub async fn handle_fetch_command(
@ -33,14 +33,14 @@ pub async fn handle_fetch_command(
// Update task status // Update task status
if let Some(mut task) = state.tasks.get_mut(&command.request_id) { if let Some(mut task) = state.tasks.get_mut(&command.request_id) {
task.status = TaskStatus::Failed; task.status = ObservabilityTaskStatus::Failed;
task.details = format!("Failed: {}", e); task.details = format!("Failed: {}", e);
} else { } else {
// If task doesn't exist (e.g. failed at insert), create a failed task // If task doesn't exist (e.g. failed at insert), create a failed task
let task = TaskProgress { let task = TaskProgress {
request_id: command.request_id, request_id: command.request_id,
task_name: format!("finnhub:{}", command.symbol), task_name: format!("finnhub:{}", command.symbol),
status: TaskStatus::Failed, status: ObservabilityTaskStatus::Failed,
progress_percent: 0, progress_percent: 0,
details: format!("Failed: {}", e), details: format!("Failed: {}", e),
started_at: Utc::now(), started_at: Utc::now(),
@ -65,7 +65,7 @@ async fn handle_fetch_command_inner(
TaskProgress { TaskProgress {
request_id: command.request_id, request_id: command.request_id,
task_name: format!("finnhub:{}", command.symbol), task_name: format!("finnhub:{}", command.symbol),
status: TaskStatus::InProgress, status: ObservabilityTaskStatus::InProgress,
progress_percent: 10, progress_percent: 10,
details: "Fetching data from Finnhub".to_string(), details: "Fetching data from Finnhub".to_string(),
started_at: chrono::Utc::now(), started_at: chrono::Utc::now(),
@ -198,7 +198,7 @@ async fn handle_fetch_command_inner(
// 4. Finalize // 4. Finalize
if let Some(mut task) = state.tasks.get_mut(&command.request_id) { if let Some(mut task) = state.tasks.get_mut(&command.request_id) {
task.status = TaskStatus::Completed; task.status = ObservabilityTaskStatus::Completed;
task.progress_percent = 100; task.progress_percent = 100;
task.details = "Workflow finished successfully".to_string(); task.details = "Workflow finished successfully".to_string();
} }
@ -260,6 +260,6 @@ mod integration_tests {
assert!(result.is_ok(), "Worker execution failed: {:?}", result.err()); assert!(result.is_ok(), "Worker execution failed: {:?}", result.err());
let task = state.tasks.get(&request_id).expect("Task should exist"); let task = state.tasks.get(&request_id).expect("Task should exist");
assert_eq!(task.status, TaskStatus::Completed); assert_eq!(task.status, ObservabilityTaskStatus::Completed);
} }
} }

View File

@ -1,7 +1,7 @@
use crate::error::Result; use crate::error::Result;
use crate::state::{AppState, ServiceOperationalStatus}; use crate::state::{AppState, ServiceOperationalStatus};
use common_contracts::messages::FetchCompanyDataCommand; use common_contracts::messages::FetchCompanyDataCommand;
use common_contracts::observability::TaskStatus; use common_contracts::observability::ObservabilityTaskStatus;
use common_contracts::subjects::NatsSubject; use common_contracts::subjects::NatsSubject;
use futures_util::StreamExt; use futures_util::StreamExt;
use tracing::{error, info, warn}; use tracing::{error, info, warn};
@ -94,7 +94,7 @@ async fn subscribe_and_process(
state_for_closure.tasks.insert(task_id, common_contracts::observability::TaskProgress { state_for_closure.tasks.insert(task_id, common_contracts::observability::TaskProgress {
request_id: task_id, request_id: task_id,
task_name: format!("tushare:{}", command.symbol), task_name: format!("tushare:{}", command.symbol),
status: TaskStatus::Queued, status: ObservabilityTaskStatus::Queued,
progress_percent: 0, progress_percent: 0,
details: "Command received".to_string(), details: "Command received".to_string(),
started_at: Utc::now(), started_at: Utc::now(),
@ -112,7 +112,7 @@ async fn subscribe_and_process(
); );
// Update task to failed status // Update task to failed status
if let Some(mut task) = workflow_state_for_error.tasks.get_mut(&task_id) { if let Some(mut task) = workflow_state_for_error.tasks.get_mut(&task_id) {
task.status = TaskStatus::Failed; task.status = ObservabilityTaskStatus::Failed;
task.details = format!("Workflow failed: {}", e); task.details = format!("Workflow failed: {}", e);
} }
} }
@ -131,7 +131,7 @@ async fn subscribe_and_process(
None => { None => {
// Check if the task is in Failed state (set by the worker error handler) // Check if the task is in Failed state (set by the worker error handler)
let is_failed = cleanup_state.tasks.get(&task_id) let is_failed = cleanup_state.tasks.get(&task_id)
.map(|t| t.status == TaskStatus::Failed) .map(|t| t.status == ObservabilityTaskStatus::Failed)
.unwrap_or(false); .unwrap_or(false);
if is_failed { if is_failed {

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use common_contracts::{ use common_contracts::{
dtos::{CompanyProfileDto, TimeSeriesFinancialDto, SessionDataDto, ProviderCacheDto}, dtos::{CompanyProfileDto, TimeSeriesFinancialDto, SessionDataDto, ProviderCacheDto},
messages::{CompanyProfilePersistedEvent, FetchCompanyDataCommand, FinancialsPersistedEvent, DataFetchFailedEvent}, messages::{CompanyProfilePersistedEvent, FetchCompanyDataCommand, FinancialsPersistedEvent, DataFetchFailedEvent},
observability::TaskStatus, observability::ObservabilityTaskStatus,
persistence_client::PersistenceClient, persistence_client::PersistenceClient,
}; };
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -38,8 +38,8 @@ pub async fn run_tushare_workflow(
// Ensure task status is failed in memory if it wasn't already // Ensure task status is failed in memory if it wasn't already
if let Some(mut task) = state.tasks.get_mut(&task_id) { if let Some(mut task) = state.tasks.get_mut(&task_id) {
if task.status != TaskStatus::Failed { if task.status != ObservabilityTaskStatus::Failed {
task.status = TaskStatus::Failed; task.status = ObservabilityTaskStatus::Failed;
task.details = format!("Workflow failed: {}", e); task.details = format!("Workflow failed: {}", e);
} }
} }
@ -63,7 +63,7 @@ async fn run_tushare_workflow_inner(
let reason = "Execution failed: Tushare provider is not available (misconfigured).".to_string(); let reason = "Execution failed: Tushare provider is not available (misconfigured).".to_string();
error!("{}", reason); error!("{}", reason);
if let Some(mut task) = state.tasks.get_mut(&task_id) { if let Some(mut task) = state.tasks.get_mut(&task_id) {
task.status = TaskStatus::Failed; task.status = ObservabilityTaskStatus::Failed;
task.details = reason.clone(); task.details = reason.clone();
} }
return Err(AppError::ProviderNotAvailable(reason)); return Err(AppError::ProviderNotAvailable(reason));
@ -78,7 +78,7 @@ async fn run_tushare_workflow_inner(
.tasks .tasks
.get_mut(&task_id) .get_mut(&task_id)
.ok_or_else(|| AppError::Internal("Task not found".to_string()))?; .ok_or_else(|| AppError::Internal("Task not found".to_string()))?;
entry.status = TaskStatus::InProgress; entry.status = ObservabilityTaskStatus::InProgress;
entry.progress_percent = 10; entry.progress_percent = 10;
entry.details = "Checking cache...".to_string(); entry.details = "Checking cache...".to_string();
} }
@ -169,7 +169,7 @@ async fn run_tushare_workflow_inner(
.tasks .tasks
.get_mut(&task_id) .get_mut(&task_id)
.ok_or_else(|| AppError::Internal("Task not found".to_string()))?; .ok_or_else(|| AppError::Internal("Task not found".to_string()))?;
entry.status = TaskStatus::Completed; entry.status = ObservabilityTaskStatus::Completed;
entry.progress_percent = 100; entry.progress_percent = 100;
entry.details = "Workflow finished successfully".to_string(); entry.details = "Workflow finished successfully".to_string();
} }
@ -287,7 +287,7 @@ mod integration_tests {
state.tasks.insert(request_id, TaskProgress { state.tasks.insert(request_id, TaskProgress {
request_id, request_id,
task_name: "tushare:600519".to_string(), task_name: "tushare:600519".to_string(),
status: TaskStatus::Queued, status: ObservabilityTaskStatus::Queued,
progress_percent: 0, progress_percent: 0,
details: "Init".to_string(), details: "Init".to_string(),
started_at: Utc::now() started_at: Utc::now()
@ -300,6 +300,6 @@ mod integration_tests {
assert!(result.is_ok(), "Worker execution failed: {:?}", result.err()); assert!(result.is_ok(), "Worker execution failed: {:?}", result.err());
let task = state.tasks.get(&request_id).expect("Task should exist"); let task = state.tasks.get(&request_id).expect("Task should exist");
assert_eq!(task.status, TaskStatus::Completed); assert_eq!(task.status, ObservabilityTaskStatus::Completed);
} }
} }

View File

@ -248,7 +248,7 @@ impl WorkflowEngine {
for task_id in initial_tasks { for task_id in initial_tasks {
let _ = machine.update_task_status(&task_id, TaskStatus::Scheduled); let _ = machine.update_task_status(&task_id, TaskStatus::Scheduled);
self.broadcast_task_state(cmd.request_id, &task_id, TaskType::DataFetch, TaskStatus::Scheduled, None).await?; self.broadcast_task_state(cmd.request_id, &task_id, TaskType::DataFetch, TaskStatus::Scheduled, None, None).await?;
if task_id.starts_with("fetch:") { if task_id.starts_with("fetch:") {
let fetch_cmd = FetchCompanyDataCommand { let fetch_cmd = FetchCompanyDataCommand {
@ -296,7 +296,7 @@ impl WorkflowEngine {
index: 0 index: 0
}).await?; }).await?;
self.broadcast_task_state(evt.request_id, &task_id, TaskType::DataFetch, TaskStatus::Completed, Some("Data persisted".into())).await?; self.broadcast_task_state(evt.request_id, &task_id, TaskType::DataFetch, TaskStatus::Completed, Some("Data persisted".into()), None).await?;
self.trigger_next_tasks(&mut machine, next_tasks).await?; self.trigger_next_tasks(&mut machine, next_tasks).await?;
@ -311,7 +311,7 @@ impl WorkflowEngine {
let task_id = format!("fetch:{}", provider_id); let task_id = format!("fetch:{}", provider_id);
let _ = machine.update_task_status(&task_id, TaskStatus::Failed); let _ = machine.update_task_status(&task_id, TaskStatus::Failed);
self.broadcast_task_state(evt.request_id, &task_id, TaskType::DataFetch, TaskStatus::Failed, Some(evt.error)).await?; self.broadcast_task_state(evt.request_id, &task_id, TaskType::DataFetch, TaskStatus::Failed, Some(evt.error), None).await?;
self.repo.save(&machine).await?; self.repo.save(&machine).await?;
} }
Ok(()) Ok(())
@ -383,7 +383,7 @@ impl WorkflowEngine {
let next_tasks = machine.update_task_status(&final_task_id, TaskStatus::Completed); let next_tasks = machine.update_task_status(&final_task_id, TaskStatus::Completed);
self.broadcast_task_state(evt.request_id, &final_task_id, TaskType::Analysis, TaskStatus::Completed, Some("Report generated".into())).await?; self.broadcast_task_state(evt.request_id, &final_task_id, TaskType::Analysis, TaskStatus::Completed, Some("Report generated".into()), None).await?;
// If this was the last node, the workflow is complete? // If this was the last node, the workflow is complete?
// check_dependents returns next ready tasks. If empty, and no running tasks, we are done. // check_dependents returns next ready tasks. If empty, and no running tasks, we are done.
@ -411,7 +411,7 @@ impl WorkflowEngine {
}; };
let _ = machine.update_task_status(&final_task_id, TaskStatus::Failed); let _ = machine.update_task_status(&final_task_id, TaskStatus::Failed);
self.broadcast_task_state(evt.request_id, &final_task_id, TaskType::Analysis, TaskStatus::Failed, Some(evt.error.clone())).await?; self.broadcast_task_state(evt.request_id, &final_task_id, TaskType::Analysis, TaskStatus::Failed, Some(evt.error.clone()), None).await?;
// Propagate failure? // Propagate failure?
self.broker.publish_event(evt.request_id, WorkflowEvent::WorkflowFailed { self.broker.publish_event(evt.request_id, WorkflowEvent::WorkflowFailed {
@ -441,7 +441,7 @@ impl WorkflowEngine {
async fn trigger_next_tasks(&self, machine: &mut WorkflowStateMachine, tasks: Vec<String>) -> Result<()> { async fn trigger_next_tasks(&self, machine: &mut WorkflowStateMachine, tasks: Vec<String>) -> Result<()> {
for task_id in tasks { for task_id in tasks {
machine.update_task_status(&task_id, TaskStatus::Scheduled); machine.update_task_status(&task_id, TaskStatus::Scheduled);
self.broadcast_task_state(machine.request_id, &task_id, TaskType::Analysis, TaskStatus::Scheduled, None).await?; self.broadcast_task_state(machine.request_id, &task_id, TaskType::Analysis, TaskStatus::Scheduled, None, None).await?;
if task_id == "analysis:report" { if task_id == "analysis:report" {
let cmd = GenerateReportCommand { let cmd = GenerateReportCommand {
@ -456,13 +456,14 @@ impl WorkflowEngine {
Ok(()) Ok(())
} }
async fn broadcast_task_state(&self, request_id: Uuid, task_id: &str, ttype: TaskType, status: TaskStatus, msg: Option<String>) -> Result<()> { async fn broadcast_task_state(&self, request_id: Uuid, task_id: &str, ttype: TaskType, status: TaskStatus, msg: Option<String>, progress: Option<u8>) -> Result<()> {
let event = WorkflowEvent::TaskStateChanged { let event = WorkflowEvent::TaskStateChanged {
task_id: task_id.to_string(), task_id: task_id.to_string(),
task_type: ttype, task_type: ttype,
status, status,
message: msg, message: msg,
timestamp: chrono::Utc::now().timestamp_millis(), timestamp: chrono::Utc::now().timestamp_millis(),
progress, // Passed through
}; };
self.broker.publish_event(request_id, event).await self.broker.publish_event(request_id, event).await
} }

View File

@ -36,7 +36,7 @@ pub async fn run(state: AppState) -> Result<()> {
{ {
error!("Error handling fetch command: {:?}", e); error!("Error handling fetch command: {:?}", e);
if let Some(mut task) = state_clone.tasks.get_mut(&request_id) { if let Some(mut task) = state_clone.tasks.get_mut(&request_id) {
task.status = common_contracts::observability::TaskStatus::Failed; task.status = common_contracts::observability::ObservabilityTaskStatus::Failed;
task.details = format!("Worker failed: {}", e); task.details = format!("Worker failed: {}", e);
} }
} }

View File

@ -1,6 +1,6 @@
use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto, SessionDataDto, ProviderCacheDto}; use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto, SessionDataDto, ProviderCacheDto};
use common_contracts::messages::{CompanyProfilePersistedEvent, FetchCompanyDataCommand, FinancialsPersistedEvent, DataFetchFailedEvent}; use common_contracts::messages::{CompanyProfilePersistedEvent, FetchCompanyDataCommand, FinancialsPersistedEvent, DataFetchFailedEvent};
use common_contracts::observability::{TaskProgress, TaskStatus}; use common_contracts::observability::{TaskProgress, ObservabilityTaskStatus};
use common_contracts::persistence_client::PersistenceClient; use common_contracts::persistence_client::PersistenceClient;
use crate::error::{Result, AppError}; use crate::error::{Result, AppError};
use crate::state::AppState; use crate::state::AppState;
@ -34,14 +34,14 @@ pub async fn handle_fetch_command(
// Update task status // Update task status
if let Some(mut task) = state.tasks.get_mut(&command.request_id) { if let Some(mut task) = state.tasks.get_mut(&command.request_id) {
task.status = TaskStatus::Failed; task.status = ObservabilityTaskStatus::Failed;
task.details = format!("Failed: {}", e); task.details = format!("Failed: {}", e);
} else { } else {
// If task doesn't exist (e.g. failed at insert), create a failed task // If task doesn't exist (e.g. failed at insert), create a failed task
let task = TaskProgress { let task = TaskProgress {
request_id: command.request_id, request_id: command.request_id,
task_name: format!("yfinance:{}", command.symbol), task_name: format!("yfinance:{}", command.symbol),
status: TaskStatus::Failed, status: ObservabilityTaskStatus::Failed,
progress_percent: 0, progress_percent: 0,
details: format!("Failed: {}", e), details: format!("Failed: {}", e),
started_at: Utc::now(), started_at: Utc::now(),
@ -78,7 +78,7 @@ async fn handle_fetch_command_inner(
state.tasks.insert(task_id, common_contracts::observability::TaskProgress { state.tasks.insert(task_id, common_contracts::observability::TaskProgress {
request_id: task_id, request_id: task_id,
task_name: format!("yfinance:{}", command.symbol), task_name: format!("yfinance:{}", command.symbol),
status: TaskStatus::InProgress, status: ObservabilityTaskStatus::InProgress,
progress_percent: 5, progress_percent: 5,
details: "Checking cache...".to_string(), details: "Checking cache...".to_string(),
started_at: chrono::Utc::now(), started_at: chrono::Utc::now(),
@ -95,7 +95,7 @@ async fn handle_fetch_command_inner(
.map_err(|e| AppError::Internal(format!("Failed to deserialize cache: {}", e)))?; .map_err(|e| AppError::Internal(format!("Failed to deserialize cache: {}", e)))?;
if let Some(mut task) = state.tasks.get_mut(&task_id) { if let Some(mut task) = state.tasks.get_mut(&task_id) {
task.status = TaskStatus::InProgress; task.status = ObservabilityTaskStatus::InProgress;
task.progress_percent = 50; task.progress_percent = 50;
task.details = "Data retrieved from cache".to_string(); task.details = "Data retrieved from cache".to_string();
} }
@ -104,7 +104,7 @@ async fn handle_fetch_command_inner(
None => { None => {
info!("Cache MISS for {}", cache_key); info!("Cache MISS for {}", cache_key);
if let Some(mut task) = state.tasks.get_mut(&task_id) { if let Some(mut task) = state.tasks.get_mut(&task_id) {
task.status = TaskStatus::InProgress; task.status = ObservabilityTaskStatus::InProgress;
task.progress_percent = 20; task.progress_percent = 20;
task.details = "Fetching data from YFinance".to_string(); task.details = "Fetching data from YFinance".to_string();
} }
@ -192,7 +192,7 @@ async fn handle_fetch_command_inner(
.await?; .await?;
if let Some(mut task) = state.tasks.get_mut(&task_id) { if let Some(mut task) = state.tasks.get_mut(&task_id) {
task.status = TaskStatus::Completed; task.status = ObservabilityTaskStatus::Completed;
task.progress_percent = 100; task.progress_percent = 100;
task.details = "Workflow finished successfully".to_string(); task.details = "Workflow finished successfully".to_string();
} }
@ -256,6 +256,6 @@ mod integration_tests {
assert!(result.is_ok(), "Worker execution failed: {:?}", result.err()); assert!(result.is_ok(), "Worker execution failed: {:?}", result.err());
let task = state.tasks.get(&request_id).expect("Task should exist"); let task = state.tasks.get(&request_id).expect("Task should exist");
assert_eq!(task.status, TaskStatus::Completed); assert_eq!(task.status, ObservabilityTaskStatus::Completed);
} }
} }