Fundamental_Analysis/services/workflow-orchestrator-service/tests/rehydration_test.rs
Lv, Qi e9e4d0c1b3 chore: massive update covering recent refactoring and bug fixes
- fix: infinite message loop in workflow orchestrator
- feat: restore realtime LLM streaming from report generator to frontend
- refactor: major update to provider services (generic workers, workflow adapters)
- refactor: common contracts and message definitions updated
- feat: enhanced logging and observability in orchestrator
- docs: update project management tasks and status
- chore: dependency updates and config adjustments
2025-11-30 19:17:02 +08:00

134 lines
5.4 KiB
Rust

use anyhow::Result;
use common_contracts::messages::{SyncStateCommand, TaskType, WorkflowEvent};
use common_contracts::workflow_types::TaskStatus;
use serde_json::json;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use uuid::Uuid;
use workflow_orchestrator_service::dag_scheduler::DagScheduler;
use workflow_orchestrator_service::logging::LogBufferManager;
use workflow_orchestrator_service::state::AppState;
use workflow_orchestrator_service::workflow::WorkflowEngine;
use workflow_orchestrator_service::config::AppConfig;
use futures::stream::StreamExt;
// Note: This test requires a running NATS server.
// Set NATS_URL environment variable if needed, otherwise defaults to localhost:4222
#[tokio::test]
async fn test_workflow_rehydration_flow() -> Result<()> {
// 1. Setup NATS
let nats_url = std::env::var("NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string());
let nats_client = async_nats::connect(&nats_url).await;
if nats_client.is_err() {
println!("Skipping test: NATS not available at {}", nats_url);
return Ok(());
}
let nats_client = nats_client?;
// 2. Setup AppState (Mocking Dependencies)
let config = AppConfig {
nats_addr: nats_url.clone(),
data_persistence_service_url: "http://localhost:3001".to_string(), // Mock URL
workflow_data_path: "/tmp/workflow_data".to_string(),
server_port: 0,
};
let (log_tx, _) = tokio::sync::broadcast::channel(100);
let log_manager = Arc::new(LogBufferManager::new("/tmp/workflow_logs"));
let state = Arc::new(AppState::new(config, log_manager.clone(), log_tx).await?);
let engine = WorkflowEngine::new(state.clone(), nats_client.clone());
// 3. Construct a Fake Workflow State (In-Memory)
let req_id = Uuid::new_v4();
let task_id = "task:fake_analysis".to_string();
let mut dag = DagScheduler::new(req_id, "init_commit".to_string());
dag.add_node(
task_id.clone(),
Some("Fake Analysis".to_string()),
TaskType::Analysis,
"fake.routing".to_string(),
json!({"some": "config"})
);
// Mark it as running so it captures stream
dag.update_status(&task_id, TaskStatus::Running);
// Insert into State
state.workflows.insert(req_id, Arc::new(Mutex::new(dag)));
// 4. Subscribe to Workflow Events (Simulating Frontend)
let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(req_id).to_string();
let mut sub = nats_client.subscribe(subject.clone()).await?;
// 5. Simulate Receiving Stream Data & Logs
// In real world, MessageConsumer calls these. Here we call Engine methods directly
// to simulate "Consumer received NATS msg -> updated DAG".
let log_msg = "[INFO] Starting deep analysis...".to_string();
let content_part1 = "Analysis Part 1...".to_string();
let content_part2 = "Analysis Part 2 [Done]".to_string();
engine.handle_task_log(task_id.clone(), log_msg.clone(), req_id).await?;
engine.handle_task_stream_update(task_id.clone(), content_part1.clone(), req_id).await?;
engine.handle_task_stream_update(task_id.clone(), content_part2.clone(), req_id).await?;
println!("State injected. Now simulating Page Refresh (SyncState)...");
// 6. Simulate Page Refresh -> Send SyncStateCommand
let sync_cmd = SyncStateCommand { request_id: req_id };
// We can call handle_sync_state directly or publish command.
// Let's call directly to ensure we test the logic, but verify the OUTPUT via NATS subscription.
engine.handle_sync_state(sync_cmd).await?;
// 7. Verify Snapshot Received on NATS
let mut snapshot_received = false;
// We might receive other events, loop until snapshot or timeout
let timeout = tokio::time::sleep(Duration::from_secs(2));
tokio::pin!(timeout);
loop {
tokio::select! {
Some(msg) = sub.next() => {
if let Ok(event) = serde_json::from_slice::<WorkflowEvent>(&msg.payload) {
match event {
WorkflowEvent::WorkflowStateSnapshot { task_states, .. } => {
println!("Received Snapshot!");
// Verify Task State
if let Some(ts) = task_states.get(&task_id) {
// Check Logs
assert!(ts.logs.contains(&log_msg), "Snapshot missing logs");
// Check Content
let full_content = format!("{}{}", content_part1, content_part2);
assert_eq!(ts.content.as_ref().unwrap(), &full_content, "Snapshot content mismatch");
println!("Snapshot verification passed!");
snapshot_received = true;
break;
} else {
panic!("Task state not found in snapshot");
}
},
_ => println!("Ignored other event: {:?}", event),
}
}
}
_ = &mut timeout => {
break;
}
}
}
assert!(snapshot_received, "Did not receive WorkflowStateSnapshot within timeout");
Ok(())
}