Fundamental_Analysis/services/common-contracts/src/messages.rs
Lv, Qi 0cb31e363e Refactor E2E tests and improve error handling in Orchestrator
- Fix `simple_test_analysis` template in E2E test setup to align with Orchestrator's data fetch logic.
- Implement and verify additional E2E scenarios:
    - Scenario C: Partial Provider Failure (verified error propagation fix in Orchestrator).
    - Scenario D: Invalid Symbol input.
    - Scenario E: Analysis Module failure.
- Update `WorkflowStateMachine::handle_report_failed` to correctly scope error broadcasting to the specific task instead of failing effectively silently or broadly.
- Update testing strategy documentation to reflect completed Phase 4 testing.
- Skip Scenario B (Orchestrator Restart) as persistence is not yet implemented (decision made to defer persistence).
2025-11-21 20:44:32 +08:00

240 lines
6.9 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use serde::{Serialize, Deserialize};
use uuid::Uuid;
use crate::symbol_utils::CanonicalSymbol;
use crate::subjects::{NatsSubject, SubjectMessage};
use std::collections::HashMap;
// --- Commands ---
// Topic: workflow.commands.start
/// Command to initiate a new workflow.
/// Published by: `api-gateway`
/// Consumed by: `workflow-orchestrator`
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct StartWorkflowCommand {
pub request_id: Uuid,
pub symbol: CanonicalSymbol,
pub market: String,
pub template_id: String,
}
impl SubjectMessage for StartWorkflowCommand {
fn subject(&self) -> NatsSubject {
NatsSubject::WorkflowCommandStart
}
}
// Topic: workflow.commands.sync_state
/// Command to request a state snapshot for re-alignment.
/// Published by: `api-gateway` (on client connect/reconnect)
/// Consumed by: `workflow-orchestrator`
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct SyncStateCommand {
pub request_id: Uuid,
}
impl SubjectMessage for SyncStateCommand {
fn subject(&self) -> NatsSubject {
NatsSubject::WorkflowCommandSyncState
}
}
/// Command to trigger data fetching.
/// Published by: `workflow-orchestrator` (previously api-gateway)
/// Consumed by: `*-provider-services`
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FetchCompanyDataCommand {
pub request_id: Uuid,
pub symbol: CanonicalSymbol,
pub market: String,
pub template_id: Option<String>, // Optional trigger for analysis
}
impl SubjectMessage for FetchCompanyDataCommand {
fn subject(&self) -> NatsSubject {
NatsSubject::DataFetchCommands
}
}
/// Command to start a full report generation workflow.
/// Published by: `workflow-orchestrator` (previously api-gateway)
/// Consumed by: `report-generator-service`
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GenerateReportCommand {
pub request_id: Uuid,
pub symbol: CanonicalSymbol,
pub template_id: String,
}
impl SubjectMessage for GenerateReportCommand {
fn subject(&self) -> NatsSubject {
NatsSubject::AnalysisCommandGenerateReport
}
}
// --- Events ---
// Topic: events.workflow.{request_id}
/// Unified event stream for frontend consumption.
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type", content = "payload")]
pub enum WorkflowEvent {
// 1. 流程初始化 (携带完整的任务依赖图)
WorkflowStarted {
timestamp: i64,
// 定义所有任务及其依赖关系,前端可据此绘制流程图或进度条
task_graph: WorkflowDag
},
// 2. 任务状态变更 (核心事件)
TaskStateChanged {
task_id: String, // e.g., "fetch:tushare", "process:clean_financials", "module:swot_analysis"
task_type: TaskType, // DataFetch | DataProcessing | Analysis
status: TaskStatus, // Pending, Scheduled, Running, Completed, Failed, Skipped
message: Option<String>,
timestamp: i64
},
// 3. 任务流式输出 (用于 LLM 打字机效果)
TaskStreamUpdate {
task_id: String,
content_delta: String,
index: u32
},
// 4. 流程整体结束
WorkflowCompleted {
result_summary: serde_json::Value,
end_timestamp: i64
},
WorkflowFailed {
reason: String,
is_fatal: bool,
end_timestamp: i64
},
// 5. 状态快照 (用于重连/丢包恢复)
// 当前端重连或显式发送 SyncStateCommand 时Orchestrator 发送此事件
WorkflowStateSnapshot {
timestamp: i64,
task_graph: WorkflowDag,
tasks_status: HashMap<String, TaskStatus>, // 当前所有任务的最新状态
tasks_output: HashMap<String, Option<String>> // (可选) 已完成任务的关键输出摘要
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WorkflowDag {
pub nodes: Vec<TaskNode>,
pub edges: Vec<TaskDependency> // from -> to
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TaskDependency {
pub from: String,
pub to: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TaskNode {
pub id: String,
pub name: String,
pub r#type: TaskType,
pub initial_status: TaskStatus
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Copy)]
pub enum TaskType {
DataFetch, // 创造原始上下文
DataProcessing, // 消耗并转换上下文 (New)
Analysis // 读取上下文生成新内容
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Clone, Copy)]
pub enum TaskStatus {
Pending, // 等待依赖
Scheduled, // 依赖满足,已下发给 Worker
Running, // Worker 正在执行
Completed, // 执行成功
Failed, // 执行失败
Skipped // 因上游失败或策略原因被跳过
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CompanyProfilePersistedEvent {
pub request_id: Uuid,
pub symbol: CanonicalSymbol,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FinancialsPersistedEvent {
pub request_id: Uuid,
pub symbol: CanonicalSymbol,
pub years_updated: Vec<u16>,
pub template_id: Option<String>, // Pass-through for analysis trigger
// Identity fix: Mandatory provider ID
#[serde(default)]
pub provider_id: Option<String>,
// Output pass-through: Optional data preview/summary
#[serde(default)]
pub data_summary: Option<String>,
}
impl SubjectMessage for FinancialsPersistedEvent {
fn subject(&self) -> NatsSubject {
NatsSubject::DataFinancialsPersisted
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DataFetchFailedEvent {
pub request_id: Uuid,
pub symbol: CanonicalSymbol,
pub error: String,
// Identity fix: Mandatory provider ID
#[serde(default)]
pub provider_id: Option<String>,
}
impl SubjectMessage for DataFetchFailedEvent {
fn subject(&self) -> NatsSubject {
NatsSubject::DataFetchFailed
}
}
// Topic: events.analysis.report_generated
/// Event emitted when a report generation task (or sub-module) is completed.
/// Consumed by: `workflow-orchestrator`
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ReportGeneratedEvent {
pub request_id: Uuid,
pub symbol: CanonicalSymbol,
pub module_id: String, // Which part of the analysis finished
pub content_snapshot: Option<String>, // Optional short preview
pub model_id: Option<String>,
}
impl SubjectMessage for ReportGeneratedEvent {
fn subject(&self) -> NatsSubject {
NatsSubject::AnalysisReportGenerated
}
}
// Topic: events.analysis.report_failed
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ReportFailedEvent {
pub request_id: Uuid,
pub symbol: CanonicalSymbol,
pub module_id: String,
pub error: String,
}
impl SubjectMessage for ReportFailedEvent {
fn subject(&self) -> NatsSubject {
NatsSubject::AnalysisReportFailed
}
}