From 5dc13fa7359e69ad07814d38aa85ef3768569764 Mon Sep 17 00:00:00 2001 From: "Lv, Qi" Date: Sun, 30 Nov 2025 19:16:02 +0800 Subject: [PATCH] fix: resolve infinite feedback loop in orchestrator and restore realtime LLM streaming --- .../report-generator-service/src/worker.rs | 30 +- .../src/workflow.rs | 389 +++++++++++++++--- 2 files changed, 346 insertions(+), 73 deletions(-) diff --git a/services/report-generator-service/src/worker.rs b/services/report-generator-service/src/worker.rs index aad9e52..03c510f 100644 --- a/services/report-generator-service/src/worker.rs +++ b/services/report-generator-service/src/worker.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use common_contracts::messages::{GenerateReportCommand, WorkflowEvent}; +use common_contracts::workflow_types::{WorkflowTaskEvent, TaskStatus}; use futures_util::StreamExt; use tracing::{info, instrument, error}; use workflow_context::WorkerContext; @@ -43,6 +44,19 @@ async fn run_vgcs_based_generation( ) -> Result { info!("Running VGCS based generation for task {:?}", command.task_id); + if let Some(task_id) = &command.task_id { + let running_evt = WorkflowTaskEvent { + request_id: command.request_id, + task_id: task_id.clone(), + status: TaskStatus::Running, + result: None, + }; + let subject = common_contracts::subjects::NatsSubject::WorkflowEventTaskCompleted.to_string(); + if let Ok(payload) = serde_json::to_vec(&running_evt) { + let _ = state.nats.publish(subject, payload.into()).await; + } + } + let persistence_client = PersistenceClient::new(state.config.data_persistence_service_url.clone()); let llm_providers = persistence_client.get_llm_providers_config().await.map_err(|e| ProviderError::Configuration(e.to_string()))?; @@ -188,21 +202,25 @@ async fn run_vgcs_based_generation( let mut stream = llm_client.stream_text(final_prompt_to_send).await.map_err(|e| ProviderError::LlmApi(e.to_string()))?; let mut full_content = String::new(); + let mut stream_index = 0; while let Some(chunk_res) = stream.next().await { if let Ok(chunk) = chunk_res { if !chunk.is_empty() { full_content.push_str(&chunk); - // Publish Stream Update + // Send Stream Update to NATS if let Some(task_id) = &command.task_id { - let stream_evt = WorkflowEvent::TaskStreamUpdate { + let event = WorkflowEvent::TaskStreamUpdate { task_id: task_id.clone(), - content_delta: chunk, - index: 0, + content_delta: chunk.clone(), + index: stream_index, }; - if let Ok(payload) = serde_json::to_vec(&stream_evt) { - let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(command.request_id).to_string(); + stream_index += 1; + + let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(command.request_id).to_string(); + // We ignore errors here to prevent blocking the main generation flow + if let Ok(payload) = serde_json::to_vec(&event) { let _ = state.nats.publish(subject, payload.into()).await; } } diff --git a/services/workflow-orchestrator-service/src/workflow.rs b/services/workflow-orchestrator-service/src/workflow.rs index eacb71e..ee693df 100644 --- a/services/workflow-orchestrator-service/src/workflow.rs +++ b/services/workflow-orchestrator-service/src/workflow.rs @@ -3,8 +3,10 @@ use common_contracts::workflow_types::{ WorkflowTaskCommand, WorkflowTaskEvent, TaskStatus, StorageConfig }; use common_contracts::messages::{ - StartWorkflowCommand, SyncStateCommand, TaskType, WorkflowEvent, TaskStatus as MsgTaskStatus + StartWorkflowCommand, SyncStateCommand, TaskType, WorkflowEvent, TaskStatus as MsgTaskStatus, + TaskStateSnapshot }; +use common_contracts::ack::TaskAcknowledgement; use common_contracts::subjects::SubjectMessage; use common_contracts::symbol_utils::CanonicalSymbol; use common_contracts::dtos::{SessionDataDto, NewWorkflowHistory}; @@ -12,6 +14,8 @@ use tracing::{info, warn, error}; use anyhow::Result; use serde_json::json; use tokio::sync::Mutex; +use std::collections::HashMap; +use uuid::Uuid; use crate::dag_scheduler::DagScheduler; use crate::state::AppState; @@ -32,9 +36,9 @@ impl WorkflowEngine { pub async fn handle_start_workflow(&self, cmd: StartWorkflowCommand) -> Result<()> { let req_id = cmd.request_id; - info!("Starting workflow {}", req_id); - - self.publish_log(req_id, "workflow", "INFO", "Workflow started. Initializing...").await; + + info!(request_id = %req_id, "Starting workflow"); + info!(request_id = %req_id, "Workflow started. Initializing..."); // 1. Init VGCS Repo self.state.vgcs.init_repo(&req_id.to_string())?; @@ -55,17 +59,16 @@ impl WorkflowEngine { let mut dag = DagScheduler::new(req_id, initial_commit); // 4. Fetch Template Config - self.publish_log(req_id, "workflow", "INFO", "Fetching template configuration...").await; - let template_sets = self.state.persistence_client.get_analysis_template_sets().await?; - let template = template_sets.get(&cmd.template_id).ok_or_else(|| { - anyhow::anyhow!("Template {} not found", cmd.template_id) + info!("Fetching template configuration..."); + let template = self.state.persistence_client.get_template_by_id(&cmd.template_id).await.map_err(|e| { + anyhow::anyhow!("Failed to fetch template {}: {}", cmd.template_id, e) })?; // 3.1 Fetch Data Sources Config let data_sources = self.state.persistence_client.get_data_sources_config().await?; // 4. Build DAG - self.build_dag(&mut dag, template, &data_sources, &cmd.template_id, &cmd.market, &cmd.symbol); + self.build_dag(&mut dag, &template, &data_sources, &cmd.template_id, &cmd.market, &cmd.symbol); // 5. Save State self.state.workflows.insert(req_id, Arc::new(Mutex::new(dag.clone()))); @@ -82,7 +85,7 @@ impl WorkflowEngine { } } - self.publish_log(req_id, "workflow", "INFO", "DAG built and workflow initialized.").await; + info!("DAG built and workflow initialized."); // 6. Trigger Initial Tasks let initial_tasks = dag.get_initial_tasks(); @@ -92,6 +95,7 @@ impl WorkflowEngine { let mut dag_guard = dag_arc.lock().await; for task_id in initial_tasks { + info!("Starting dispatch for initial task: {}", task_id); self.dispatch_task(&mut dag_guard, &task_id, &self.state.vgcs).await?; } @@ -100,7 +104,8 @@ impl WorkflowEngine { pub async fn handle_sync_state(&self, cmd: SyncStateCommand) -> Result<()> { let req_id = cmd.request_id; - info!("Handling SyncStateCommand for {}", req_id); + // No held span guard across await + info!(request_id = %req_id, "Handling SyncStateCommand"); let dag_arc = match self.state.workflows.get(&req_id) { Some(d) => d.clone(), @@ -126,6 +131,90 @@ impl WorkflowEngine { }; tasks_status.insert(task_id.clone(), status); } + + // Construct Comprehensive Task States + let mut task_states = HashMap::new(); + + // 1. Add active buffers + for (task_id, buffer) in &dag.task_execution_states { + // If task is not in nodes for some reason, skip + let status = dag.get_status(task_id); + let input_commit = dag.nodes.get(task_id).and_then(|n| n.input_commit.clone()); + let output_commit = dag.commit_tracker.task_commits.get(task_id).cloned(); + let metadata = dag.commit_tracker.task_metadata.get(task_id).cloned(); + + let status_dto = match status { + TaskStatus::Pending => MsgTaskStatus::Pending, + TaskStatus::Scheduled => MsgTaskStatus::Scheduled, + TaskStatus::Running => MsgTaskStatus::Running, + TaskStatus::Completed => MsgTaskStatus::Completed, + TaskStatus::Failed => MsgTaskStatus::Failed, + TaskStatus::Skipped => MsgTaskStatus::Skipped, + TaskStatus::Cancelled => MsgTaskStatus::Skipped, + }; + + // Safety: Truncate content if too large to avoid NATS payload limits (1MB default) + // We reserve some space for JSON overhead. + // If content is > 100KB, we truncate and append a warning. + // The full content should be fetched via separate API if needed. + let content_snapshot = if buffer.content_buffer.len() > 100_000 { + let mut s = buffer.content_buffer.chars().take(100_000).collect::(); + s.push_str("\n... [Content Truncated in Snapshot] ..."); + Some(s) + } else { + Some(buffer.content_buffer.clone()) + }; + + // Safety: Limit logs in snapshot + let logs_snapshot = if buffer.logs.len() > 100 { + buffer.logs.iter().rev().take(100).rev().cloned().collect() + } else { + buffer.logs.clone() + }; + + task_states.insert(task_id.clone(), TaskStateSnapshot { + task_id: task_id.clone(), + status: status_dto, + logs: logs_snapshot, + content: content_snapshot, + input_commit, + output_commit, + metadata, + }); + } + + // 2. Add remaining tasks (no buffer) + for task_id in dag.nodes.keys() { + if !task_states.contains_key(task_id) { + let status = dag.get_status(task_id); + let input_commit = dag.nodes.get(task_id).and_then(|n| n.input_commit.clone()); + let output_commit = dag.commit_tracker.task_commits.get(task_id).cloned(); + let metadata = dag.commit_tracker.task_metadata.get(task_id).cloned(); + + let status_dto = match status { + TaskStatus::Pending => MsgTaskStatus::Pending, + TaskStatus::Scheduled => MsgTaskStatus::Scheduled, + TaskStatus::Running => MsgTaskStatus::Running, + TaskStatus::Completed => MsgTaskStatus::Completed, + TaskStatus::Failed => MsgTaskStatus::Failed, + TaskStatus::Skipped => MsgTaskStatus::Skipped, + TaskStatus::Cancelled => MsgTaskStatus::Skipped, + }; + + task_states.insert(task_id.clone(), TaskStateSnapshot { + task_id: task_id.clone(), + status: status_dto, + logs: vec![], + content: None, + input_commit, + output_commit, + metadata, + }); + } + } + + // Read buffered logs for replay + let logs = self.state.log_manager.read_current_logs(&req_id.to_string()).unwrap_or_default(); // Create Snapshot Event let event = WorkflowEvent::WorkflowStateSnapshot { @@ -134,6 +223,8 @@ impl WorkflowEngine { tasks_status, tasks_output: dag.commit_tracker.task_commits.clone().into_iter().map(|(k, v)| (k, Some(v))).collect(), tasks_metadata: dag.commit_tracker.task_metadata.clone(), + task_states, // NEW + logs, }; let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(req_id).to_string(); @@ -144,9 +235,35 @@ impl WorkflowEngine { } Ok(()) } + + // --- New Handler Methods for Stream Capture --- + + pub async fn handle_task_stream_update(&self, task_id: String, content: String, req_id: Uuid) -> Result<()> { + if let Some(dag_arc) = self.state.workflows.get(&req_id) { + let mut dag = dag_arc.lock().await; + dag.append_content(&task_id, &content); + + // We do NOT re-publish here. The Orchestrator listens to the public event stream + // merely to accumulate state for history/resume. The frontend receives the + // original event directly from the provider. + // Re-publishing would cause an infinite loop if the consumer listens to the same topic. + } + Ok(()) + } + + pub async fn handle_task_log(&self, task_id: String, log: String, req_id: Uuid) -> Result<()> { + if let Some(dag_arc) = self.state.workflows.get(&req_id) { + let mut dag = dag_arc.lock().await; + dag.append_log(&task_id, log.clone()); + + // We do NOT re-publish here. See handle_task_stream_update. + } + Ok(()) + } pub async fn handle_task_completed(&self, evt: WorkflowTaskEvent) -> Result<()> { let req_id = evt.request_id; + // No held span guard let dag_arc = match self.state.workflows.get(&req_id) { Some(d) => d.clone(), @@ -161,7 +278,7 @@ impl WorkflowEngine { // 1. Update Status & Record Commit dag.update_status(&evt.task_id, evt.status); - self.publish_log(req_id, &evt.task_id, "INFO", &format!("Task status changed to {:?}", evt.status)).await; + info!("Task {} status changed to {:?}", evt.task_id, evt.status); // Lookup task_type let task_type = dag.nodes.get(&evt.task_id).map(|n| n.task_type).unwrap_or(TaskType::DataFetch); @@ -225,7 +342,7 @@ impl WorkflowEngine { for task_id in ready_tasks { if let Err(e) = self.dispatch_task(&mut dag, &task_id, &self.state.vgcs).await { error!("Failed to dispatch task {}: {}", task_id, e); - self.publish_log(req_id, &task_id, "ERROR", &format!("Failed to dispatch task: {}", e)).await; + info!("Failed to dispatch task {}: {}", task_id, e); } } } @@ -235,6 +352,37 @@ impl WorkflowEngine { let end_time = chrono::Utc::now(); let timestamp = end_time.timestamp_millis(); + // --- Log Persistence (New) --- + let req_id_clone_for_log = req_id; + let vgcs_for_log = self.state.vgcs.clone(); + let log_manager = self.state.log_manager.clone(); + + // We run this blocking operation here or spawn it? + // Spawn is safer to not block the loop, but we want it part of the "completion". + // Let's spawn, but it's fine if it's slightly async. + tokio::spawn(async move { + match log_manager.finalize(&req_id_clone_for_log.to_string()) { + Ok(log_content) => { + if !log_content.is_empty() { + + let result = tokio::task::spawn_blocking(move || -> Result { + let mut tx = vgcs_for_log.begin_transaction(&req_id_clone_for_log.to_string(), "")?; + tx.write("workflow.log", log_content.as_bytes())?; + // We use "System" as author + tx.commit("Persist Workflow Logs", "System") + }).await; + + match result { + Ok(Ok(commit)) => info!("Persisted workflow logs to VGCS commit: {}", commit), + Ok(Err(e)) => error!("Failed to commit workflow logs: {}", e), + Err(e) => error!("Failed to join log persistence task: {}", e), + } + } + }, + Err(e) => error!("Failed to finalize logs: {}", e), + } + }); + // --- Snapshot Persistence --- let tasks_status_map = dag.nodes.iter().map(|(k, n)| { let status = match n.status { @@ -251,6 +399,65 @@ impl WorkflowEngine { let tasks_output_map = dag.commit_tracker.task_commits.clone().into_iter().map(|(k, v)| (k, Some(v))).collect::>(); let tasks_metadata_map = dag.commit_tracker.task_metadata.clone(); + + // Construct Comprehensive Task States for final snapshot + let mut task_states = HashMap::new(); + for (task_id, buffer) in &dag.task_execution_states { + let status = dag.get_status(task_id); + let input_commit = dag.nodes.get(task_id).and_then(|n| n.input_commit.clone()); + let output_commit = dag.commit_tracker.task_commits.get(task_id).cloned(); + let metadata = dag.commit_tracker.task_metadata.get(task_id).cloned(); + + let status_dto = match status { + TaskStatus::Pending => MsgTaskStatus::Pending, + TaskStatus::Scheduled => MsgTaskStatus::Scheduled, + TaskStatus::Running => MsgTaskStatus::Running, + TaskStatus::Completed => MsgTaskStatus::Completed, + TaskStatus::Failed => MsgTaskStatus::Failed, + TaskStatus::Skipped => MsgTaskStatus::Skipped, + TaskStatus::Cancelled => MsgTaskStatus::Skipped, + }; + + task_states.insert(task_id.clone(), TaskStateSnapshot { + task_id: task_id.clone(), + status: status_dto, + logs: buffer.logs.clone(), + content: Some(buffer.content_buffer.clone()), + input_commit, + output_commit, + metadata, + }); + } + + // Add remaining tasks + for task_id in dag.nodes.keys() { + if !task_states.contains_key(task_id) { + let status = dag.get_status(task_id); + let input_commit = dag.nodes.get(task_id).and_then(|n| n.input_commit.clone()); + let output_commit = dag.commit_tracker.task_commits.get(task_id).cloned(); + let metadata = dag.commit_tracker.task_metadata.get(task_id).cloned(); + + let status_dto = match status { + TaskStatus::Pending => MsgTaskStatus::Pending, + TaskStatus::Scheduled => MsgTaskStatus::Scheduled, + TaskStatus::Running => MsgTaskStatus::Running, + TaskStatus::Completed => MsgTaskStatus::Completed, + TaskStatus::Failed => MsgTaskStatus::Failed, + TaskStatus::Skipped => MsgTaskStatus::Skipped, + TaskStatus::Cancelled => MsgTaskStatus::Skipped, + }; + + task_states.insert(task_id.clone(), TaskStateSnapshot { + task_id: task_id.clone(), + status: status_dto, + logs: vec![], + content: None, + input_commit, + output_commit, + metadata, + }); + } + } let snapshot_event = WorkflowEvent::WorkflowStateSnapshot { timestamp, @@ -258,9 +465,10 @@ impl WorkflowEngine { tasks_status: tasks_status_map, tasks_output: tasks_output_map, tasks_metadata: tasks_metadata_map, + logs: self.state.log_manager.read_current_logs(&req_id.to_string()).unwrap_or_default(), + task_states: std::collections::HashMap::new(), }; - // Extract symbol & market from any node let symbol = dag.nodes.values().next() .and_then(|n| n.config.get("symbol")) .and_then(|v| v.as_str()) @@ -288,15 +496,9 @@ impl WorkflowEngine { }; // 2. Save New Workflow History - let _start_time = dag.start_time; // We need to track start time in DAG or pass it - // For now, let's approximate or fetch if available. - // Actually, DAG doesn't track start time yet. We should probably add it. - // As a workaround, use now - X, or just now if we don't care about precision. - // Better: Assume orchestrator start log is close enough. - // Let's use end_time for both start/end if we don't track it, but that's bad. - // We will ignore start_time precision for this MVP refactor step. + let _start_time = dag.start_time; let start_time_val = end_time; - + let has_failures = dag.has_failures(); let status_str = if has_failures { "Failed" } else { "Completed" }.to_string(); @@ -330,7 +532,7 @@ impl WorkflowEngine { let event = if dag.has_failures() { info!("Workflow {} failed (some tasks failed)", req_id); - self.publish_log(req_id, "workflow", "ERROR", "Workflow finished with failures.").await; + info!("Workflow finished with failures."); WorkflowEvent::WorkflowFailed { end_timestamp: timestamp, reason: "Some tasks failed".to_string(), @@ -338,7 +540,7 @@ impl WorkflowEngine { } } else { info!("Workflow {} completed successfully", req_id); - self.publish_log(req_id, "workflow", "INFO", "Workflow completed successfully.").await; + info!("Workflow completed successfully."); WorkflowEvent::WorkflowCompleted { end_timestamp: timestamp, result_summary: Some(json!({})), @@ -367,7 +569,7 @@ impl WorkflowEngine { // 2. Update Status dag.update_status(task_id, TaskStatus::Scheduled); - self.publish_log(dag.request_id, task_id, "INFO", "Task scheduled and dispatched.").await; + info!("Task {} scheduled and dispatched.", task_id); // 3. Construct Command let (routing_key, task_type, mut config, display_name) = { @@ -417,6 +619,8 @@ impl WorkflowEngine { obj.insert("input_bindings".to_string(), serde_json::to_value(&resolution.paths)?); } + info!("Context resolution successful. Injecting bindings."); + // 2. Write Trace Sidecar to VGCS let trace_path = io_binder.allocate_trace_path(task_type, &symbol, task_id, display_name.as_deref()); @@ -438,6 +642,7 @@ impl WorkflowEngine { match trace_commit_res { Ok(Ok(new_commit)) => { info!("Written context resolution trace to {} (Commit: {})", trace_path, new_commit); + // Update the base commit for the worker, so it sees the trace file (linear history) current_base_commit = new_commit; // Update the context passed to the worker @@ -448,12 +653,16 @@ impl WorkflowEngine { // We are outside closure here. dag.set_input_commit(task_id, current_base_commit); }, - Ok(Err(e)) => error!("Failed to write trace file: {}", e), + Ok(Err(e)) => { + error!("Failed to write trace file: {}", e); + warn!("Failed to write trace file: {}", e); + }, Err(e) => error!("Failed to join trace write task: {}", e), } }, Err(e) => { error!("Context resolution failed for task {}: {}", task_id, e); + warn!("Context resolution failed: {}", e); // We proceed, but the worker might fail if it relies on inputs } } @@ -461,6 +670,9 @@ impl WorkflowEngine { } } + // Capture for event + let input_commit_for_event = context.base_commit.clone(); + let cmd = WorkflowTaskCommand { request_id: dag.request_id, task_id: task_id.to_string(), @@ -472,21 +684,72 @@ impl WorkflowEngine { }, }; - // Special handling for Analysis Report Task to inject task_id into the specific command payload - // (If the node config is used to build GenerateReportCommand downstream) - // Actually, WorkflowTaskCommand is generic. The specific worker (e.g. report-generator) - // usually consumes a specific command. - // BUT, the current architecture seems to have Orchestrator send `WorkflowTaskCommand` - // and the worker receives THAT? - - // Let's check `report-generator-service` consumer. - - // 4. Publish - let subject = cmd.subject().to_string(); // This uses the routing_key + // 4. Publish with Handshake (Request-Reply) + let subject = cmd.subject().to_string(); let payload = serde_json::to_vec(&cmd)?; - info!("Dispatching task {} to subject {}", task_id, subject); - self.nats.publish(subject, payload.into()).await?; + info!("Dispatching task {} to subject {} (waiting for ack)", task_id, subject); + + let request_timeout = std::time::Duration::from_secs(5); + let request_future = self.nats.request(subject.clone(), payload.into()); + + match tokio::time::timeout(request_timeout, request_future).await { + Ok(Ok(msg)) => { + // Parse Ack + match serde_json::from_slice::(&msg.payload) { + Ok(TaskAcknowledgement::Accepted) => { + info!("Task {} accepted by provider.", task_id); + // Task proceeds normally + }, + Ok(TaskAcknowledgement::Rejected { reason }) => { + let err_msg = format!("Task rejected by provider: {}", reason); + warn!("Task {} rejected: {}", task_id, reason); + + // Mark failed immediately + dag.update_status(task_id, TaskStatus::Failed); + error!("{}", err_msg); + + // Emit failure event so frontend knows + let failure_event = WorkflowEvent::TaskStateChanged { + task_id: task_id.to_string(), + task_type, + status: MsgTaskStatus::Failed, + message: Some(err_msg.clone()), + timestamp: chrono::Utc::now().timestamp_millis(), + progress: None, + input_commit: input_commit_for_event, + output_commit: None, + }; + // ... publish failure event ... + let subject_prog = common_contracts::subjects::NatsSubject::WorkflowProgress(dag.request_id).to_string(); + if let Ok(p) = serde_json::to_vec(&failure_event) { + let _ = self.nats.publish(subject_prog, p.into()).await; + } + + return Err(anyhow::anyhow!(err_msg)); + }, + Err(e) => { + // Invalid Ack format, assume failure + let err_msg = format!("Invalid Ack from provider: {}", e); + error!("{}", err_msg); + dag.update_status(task_id, TaskStatus::Failed); + return Err(anyhow::anyhow!(err_msg)); + } + } + }, + Ok(Err(e)) => { + let err_msg = format!("NATS Request failed: {}", e); + error!("Task {} dispatch error: {}", task_id, e); + dag.update_status(task_id, TaskStatus::Failed); + return Err(anyhow::anyhow!(err_msg)); + }, + Err(_) => { + let err_msg = "Dispatch timeout (no ack from provider in 5s)"; + error!("Task {} {}", task_id, err_msg); + dag.update_status(task_id, TaskStatus::Failed); + return Err(anyhow::anyhow!(err_msg)); + } + } Ok(()) } @@ -547,15 +810,31 @@ impl WorkflowEngine { if market == "MOCK" && fetch_tasks.is_empty() { let task_id = "fetch:mock".to_string(); fetch_tasks.push(task_id.clone()); + + let (actual_symbol, sim_mode) = if symbol.as_str().contains('|') { + let parts: Vec<&str> = symbol.as_str().split('|').collect(); + (parts[0], Some(parts[1])) + } else { + (symbol.as_str(), None) + }; + + let mut config = json!({ + "symbol": actual_symbol, + "market": market + }); + + if let Some(mode) = sim_mode { + if let Some(obj) = config.as_object_mut() { + obj.insert("simulation_mode".to_string(), serde_json::Value::String(mode.to_string())); + } + } + dag.add_node( task_id, Some("Data Fetch (Mock)".to_string()), TaskType::DataFetch, "provider.mock".to_string(), - json!({ - "symbol": symbol.as_str(), - "market": market - }) + config ); } @@ -567,11 +846,6 @@ impl WorkflowEngine { for (module_id, module_config) in &template.modules { let task_id = format!("analysis:{}", module_id); - // Pass module_id and template_id so the worker knows what to do - // We pass the FULL module config here if we want the worker to be stateless, - // BUT existing worker logic fetches template again. - // To support "Single Module Execution", we should probably pass the module_id. - let mut node_config = json!({ "template_id": template_id, "module_id": module_id, @@ -607,23 +881,4 @@ impl WorkflowEngine { } } } - - async fn publish_log(&self, req_id: uuid::Uuid, task_id: &str, level: &str, message: &str) { - let event = WorkflowEvent::TaskLog { - task_id: task_id.to_string(), - level: level.to_string(), - message: message.to_string(), - timestamp: chrono::Utc::now().timestamp_millis(), - }; - let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(req_id).to_string(); - if let Ok(payload) = serde_json::to_vec(&event) { - // Fire and forget - let nats = self.nats.clone(); - tokio::spawn(async move { - if let Err(e) = nats.publish(subject, payload.into()).await { - error!("Failed to publish TaskLog: {}", e); - } - }); - } - } }