use anyhow::Result; use tempfile::TempDir; use workflow_context::{Vgcs, ContextStore, Transaction}; use common_contracts::messages::TaskType; use common_contracts::workflow_types::TaskStatus; use serde_json::json; use uuid::Uuid; use workflow_orchestrator_service::dag_scheduler::DagScheduler; #[test] fn test_scenario_a_happy_path() -> Result<()> { // Scenario A: Happy Path (A -> B) // 1. Setup let temp_dir = TempDir::new()?; let vgcs = Vgcs::new(temp_dir.path()); let req_id = Uuid::new_v4(); let req_id_str = req_id.to_string(); vgcs.init_repo(&req_id_str)?; // Initial Commit let mut tx = vgcs.begin_transaction(&req_id_str, "")?; let init_commit = Box::new(tx).commit("Init", "system")?; // 2. Build DAG let mut dag = DagScheduler::new(req_id, init_commit.clone()); dag.add_node("A".to_string(), None, TaskType::DataFetch, "key.a".into(), json!({})); dag.add_node("B".to_string(), None, TaskType::Analysis, "key.b".into(), json!({})); dag.add_dependency("A", "B"); // 3. Run Task A // Dispatch A (In real engine: Resolve Context -> Send NATS) let ctx_a = dag.resolve_context("A", &vgcs)?; assert_eq!(ctx_a.base_commit.as_ref().unwrap(), &init_commit); // Execute A (Worker Logic) let mut tx = vgcs.begin_transaction(&req_id_str, &init_commit)?; tx.write("data_a.json", b"{\"val\": 1}")?; let commit_a = Box::new(tx).commit("Task A Result", "worker")?; // Complete A dag.record_result("A", Some(commit_a.clone())); dag.update_status("A", TaskStatus::Completed); // 4. Run Task B // Check Ready let ready = dag.get_ready_downstream_tasks("A"); assert_eq!(ready, vec!["B"]); // Resolve Context B (Should be Commit A) let ctx_b = dag.resolve_context("B", &vgcs)?; assert_eq!(ctx_b.base_commit.as_ref().unwrap(), &commit_a); // Execute B let mut tx = vgcs.begin_transaction(&req_id_str, &commit_a)?; tx.write("report.md", b"# Report")?; let commit_b = Box::new(tx).commit("Task B Result", "worker")?; // Complete B dag.record_result("B", Some(commit_b.clone())); dag.update_status("B", TaskStatus::Completed); // 5. Verify Final State // Orchestrator would snapshot here. We check file existence. let files = vgcs.list_dir(&req_id_str, &commit_b, "")?; let names: Vec = files.iter().map(|f| f.name.clone()).collect(); assert!(names.contains(&"data_a.json".to_string())); assert!(names.contains(&"report.md".to_string())); Ok(()) } #[test] fn test_scenario_c_partial_failure() -> Result<()> { // Scenario C: Parallel Tasks (A, B) -> C. A fails. // 1. Setup let temp_dir = TempDir::new()?; let vgcs = Vgcs::new(temp_dir.path()); let req_id = Uuid::new_v4(); let req_id_str = req_id.to_string(); vgcs.init_repo(&req_id_str)?; let mut tx = vgcs.begin_transaction(&req_id_str, "")?; let init_commit = Box::new(tx).commit("Init", "system")?; // 2. DAG: A, B independent. C depends on BOTH. let mut dag = DagScheduler::new(req_id, init_commit.clone()); dag.add_node("A".to_string(), None, TaskType::DataFetch, "key.a".into(), json!({})); dag.add_node("B".to_string(), None, TaskType::DataFetch, "key.b".into(), json!({})); dag.add_node("C".to_string(), None, TaskType::Analysis, "key.c".into(), json!({})); dag.add_dependency("A", "C"); dag.add_dependency("B", "C"); // 3. Run A -> Failed dag.update_status("A", TaskStatus::Failed); // A produced no commit. // 4. Run B -> Success let mut tx = vgcs.begin_transaction(&req_id_str, &init_commit)?; tx.write("data_b.json", b"{}")?; let commit_b = Box::new(tx).commit("Task B", "worker")?; dag.record_result("B", Some(commit_b.clone())); dag.update_status("B", TaskStatus::Completed); // 5. Check C // C should NOT be ready because A is failed (not Completed). // is_ready checks: reverse_deps.all(|d| status == Completed) // A is Failed. // Triggering readiness check from B completion let ready_from_b = dag.get_ready_downstream_tasks("B"); // Updated logic: Failed dependencies DO allow downstream to proceed (perhaps to handle failure or skip) assert_eq!(ready_from_b, vec!["C"]); // Triggering readiness check from A completion (Failed) // Orchestrator logic for failure usually doesn't trigger downstream positive flow. assert_eq!(dag.nodes["C"].status, TaskStatus::Pending); Ok(()) } #[test] fn test_scenario_e_module_logic_check() -> Result<()> { // Scenario E: Parallel Branch Merge // A -> B // A -> C // B, C -> D // Verify 3-way merge logic in D let temp_dir = TempDir::new()?; let vgcs = Vgcs::new(temp_dir.path()); let req_id = Uuid::new_v4(); let req_id_str = req_id.to_string(); vgcs.init_repo(&req_id_str)?; let mut tx = vgcs.begin_transaction(&req_id_str, "")?; let init_commit = Box::new(tx).commit("Init", "system")?; let mut dag = DagScheduler::new(req_id, init_commit.clone()); dag.add_node("A".to_string(), None, TaskType::DataFetch, "key.a".into(), json!({})); dag.add_node("B".to_string(), None, TaskType::Analysis, "key.b".into(), json!({})); dag.add_node("C".to_string(), None, TaskType::Analysis, "key.c".into(), json!({})); dag.add_node("D".to_string(), None, TaskType::Analysis, "key.d".into(), json!({})); dag.add_dependency("A", "B"); dag.add_dependency("A", "C"); dag.add_dependency("B", "D"); dag.add_dependency("C", "D"); // Run A let mut tx = vgcs.begin_transaction(&req_id_str, &init_commit)?; tx.write("common.json", b"base")?; let commit_a = Box::new(tx).commit("A", "worker")?; dag.record_result("A", Some(commit_a.clone())); dag.update_status("A", TaskStatus::Completed); // Run B (Modify common, add b) let ctx_b = dag.resolve_context("B", &vgcs)?; let mut tx = vgcs.begin_transaction(&req_id_str, ctx_b.base_commit.as_ref().unwrap())?; tx.write("file_b.txt", b"B")?; let commit_b = Box::new(tx).commit("B", "worker")?; dag.record_result("B", Some(commit_b.clone())); dag.update_status("B", TaskStatus::Completed); // Run C (Modify common, add c) let ctx_c = dag.resolve_context("C", &vgcs)?; let mut tx = vgcs.begin_transaction(&req_id_str, ctx_c.base_commit.as_ref().unwrap())?; tx.write("file_c.txt", b"C")?; let commit_c = Box::new(tx).commit("C", "worker")?; dag.record_result("C", Some(commit_c.clone())); dag.update_status("C", TaskStatus::Completed); // Run D (Should Merge B and C) let ctx_d = dag.resolve_context("D", &vgcs)?; let merge_commit = ctx_d.base_commit.unwrap(); // Verify Merge let files = vgcs.list_dir(&req_id_str, &merge_commit, "")?; let names: Vec = files.iter().map(|f| f.name.clone()).collect(); assert!(names.contains(&"common.json".to_string())); assert!(names.contains(&"file_b.txt".to_string())); assert!(names.contains(&"file_c.txt".to_string())); Ok(()) }