fix: resolve infinite feedback loop in orchestrator and restore realtime LLM streaming
This commit is contained in:
parent
70b30b39d8
commit
5dc13fa735
@ -2,6 +2,7 @@ use std::collections::HashMap;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use common_contracts::messages::{GenerateReportCommand, WorkflowEvent};
|
use common_contracts::messages::{GenerateReportCommand, WorkflowEvent};
|
||||||
|
use common_contracts::workflow_types::{WorkflowTaskEvent, TaskStatus};
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
use tracing::{info, instrument, error};
|
use tracing::{info, instrument, error};
|
||||||
use workflow_context::WorkerContext;
|
use workflow_context::WorkerContext;
|
||||||
@ -43,6 +44,19 @@ async fn run_vgcs_based_generation(
|
|||||||
) -> Result<String> {
|
) -> Result<String> {
|
||||||
info!("Running VGCS based generation for task {:?}", command.task_id);
|
info!("Running VGCS based generation for task {:?}", command.task_id);
|
||||||
|
|
||||||
|
if let Some(task_id) = &command.task_id {
|
||||||
|
let running_evt = WorkflowTaskEvent {
|
||||||
|
request_id: command.request_id,
|
||||||
|
task_id: task_id.clone(),
|
||||||
|
status: TaskStatus::Running,
|
||||||
|
result: None,
|
||||||
|
};
|
||||||
|
let subject = common_contracts::subjects::NatsSubject::WorkflowEventTaskCompleted.to_string();
|
||||||
|
if let Ok(payload) = serde_json::to_vec(&running_evt) {
|
||||||
|
let _ = state.nats.publish(subject, payload.into()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let persistence_client = PersistenceClient::new(state.config.data_persistence_service_url.clone());
|
let persistence_client = PersistenceClient::new(state.config.data_persistence_service_url.clone());
|
||||||
let llm_providers = persistence_client.get_llm_providers_config().await.map_err(|e| ProviderError::Configuration(e.to_string()))?;
|
let llm_providers = persistence_client.get_llm_providers_config().await.map_err(|e| ProviderError::Configuration(e.to_string()))?;
|
||||||
|
|
||||||
@ -188,21 +202,25 @@ async fn run_vgcs_based_generation(
|
|||||||
let mut stream = llm_client.stream_text(final_prompt_to_send).await.map_err(|e| ProviderError::LlmApi(e.to_string()))?;
|
let mut stream = llm_client.stream_text(final_prompt_to_send).await.map_err(|e| ProviderError::LlmApi(e.to_string()))?;
|
||||||
|
|
||||||
let mut full_content = String::new();
|
let mut full_content = String::new();
|
||||||
|
let mut stream_index = 0;
|
||||||
|
|
||||||
while let Some(chunk_res) = stream.next().await {
|
while let Some(chunk_res) = stream.next().await {
|
||||||
if let Ok(chunk) = chunk_res {
|
if let Ok(chunk) = chunk_res {
|
||||||
if !chunk.is_empty() {
|
if !chunk.is_empty() {
|
||||||
full_content.push_str(&chunk);
|
full_content.push_str(&chunk);
|
||||||
|
|
||||||
// Publish Stream Update
|
// Send Stream Update to NATS
|
||||||
if let Some(task_id) = &command.task_id {
|
if let Some(task_id) = &command.task_id {
|
||||||
let stream_evt = WorkflowEvent::TaskStreamUpdate {
|
let event = WorkflowEvent::TaskStreamUpdate {
|
||||||
task_id: task_id.clone(),
|
task_id: task_id.clone(),
|
||||||
content_delta: chunk,
|
content_delta: chunk.clone(),
|
||||||
index: 0,
|
index: stream_index,
|
||||||
};
|
};
|
||||||
if let Ok(payload) = serde_json::to_vec(&stream_evt) {
|
stream_index += 1;
|
||||||
|
|
||||||
let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(command.request_id).to_string();
|
let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(command.request_id).to_string();
|
||||||
|
// We ignore errors here to prevent blocking the main generation flow
|
||||||
|
if let Ok(payload) = serde_json::to_vec(&event) {
|
||||||
let _ = state.nats.publish(subject, payload.into()).await;
|
let _ = state.nats.publish(subject, payload.into()).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -3,8 +3,10 @@ use common_contracts::workflow_types::{
|
|||||||
WorkflowTaskCommand, WorkflowTaskEvent, TaskStatus, StorageConfig
|
WorkflowTaskCommand, WorkflowTaskEvent, TaskStatus, StorageConfig
|
||||||
};
|
};
|
||||||
use common_contracts::messages::{
|
use common_contracts::messages::{
|
||||||
StartWorkflowCommand, SyncStateCommand, TaskType, WorkflowEvent, TaskStatus as MsgTaskStatus
|
StartWorkflowCommand, SyncStateCommand, TaskType, WorkflowEvent, TaskStatus as MsgTaskStatus,
|
||||||
|
TaskStateSnapshot
|
||||||
};
|
};
|
||||||
|
use common_contracts::ack::TaskAcknowledgement;
|
||||||
use common_contracts::subjects::SubjectMessage;
|
use common_contracts::subjects::SubjectMessage;
|
||||||
use common_contracts::symbol_utils::CanonicalSymbol;
|
use common_contracts::symbol_utils::CanonicalSymbol;
|
||||||
use common_contracts::dtos::{SessionDataDto, NewWorkflowHistory};
|
use common_contracts::dtos::{SessionDataDto, NewWorkflowHistory};
|
||||||
@ -12,6 +14,8 @@ use tracing::{info, warn, error};
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::dag_scheduler::DagScheduler;
|
use crate::dag_scheduler::DagScheduler;
|
||||||
use crate::state::AppState;
|
use crate::state::AppState;
|
||||||
@ -32,9 +36,9 @@ impl WorkflowEngine {
|
|||||||
|
|
||||||
pub async fn handle_start_workflow(&self, cmd: StartWorkflowCommand) -> Result<()> {
|
pub async fn handle_start_workflow(&self, cmd: StartWorkflowCommand) -> Result<()> {
|
||||||
let req_id = cmd.request_id;
|
let req_id = cmd.request_id;
|
||||||
info!("Starting workflow {}", req_id);
|
|
||||||
|
|
||||||
self.publish_log(req_id, "workflow", "INFO", "Workflow started. Initializing...").await;
|
info!(request_id = %req_id, "Starting workflow");
|
||||||
|
info!(request_id = %req_id, "Workflow started. Initializing...");
|
||||||
|
|
||||||
// 1. Init VGCS Repo
|
// 1. Init VGCS Repo
|
||||||
self.state.vgcs.init_repo(&req_id.to_string())?;
|
self.state.vgcs.init_repo(&req_id.to_string())?;
|
||||||
@ -55,17 +59,16 @@ impl WorkflowEngine {
|
|||||||
let mut dag = DagScheduler::new(req_id, initial_commit);
|
let mut dag = DagScheduler::new(req_id, initial_commit);
|
||||||
|
|
||||||
// 4. Fetch Template Config
|
// 4. Fetch Template Config
|
||||||
self.publish_log(req_id, "workflow", "INFO", "Fetching template configuration...").await;
|
info!("Fetching template configuration...");
|
||||||
let template_sets = self.state.persistence_client.get_analysis_template_sets().await?;
|
let template = self.state.persistence_client.get_template_by_id(&cmd.template_id).await.map_err(|e| {
|
||||||
let template = template_sets.get(&cmd.template_id).ok_or_else(|| {
|
anyhow::anyhow!("Failed to fetch template {}: {}", cmd.template_id, e)
|
||||||
anyhow::anyhow!("Template {} not found", cmd.template_id)
|
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// 3.1 Fetch Data Sources Config
|
// 3.1 Fetch Data Sources Config
|
||||||
let data_sources = self.state.persistence_client.get_data_sources_config().await?;
|
let data_sources = self.state.persistence_client.get_data_sources_config().await?;
|
||||||
|
|
||||||
// 4. Build DAG
|
// 4. Build DAG
|
||||||
self.build_dag(&mut dag, template, &data_sources, &cmd.template_id, &cmd.market, &cmd.symbol);
|
self.build_dag(&mut dag, &template, &data_sources, &cmd.template_id, &cmd.market, &cmd.symbol);
|
||||||
|
|
||||||
// 5. Save State
|
// 5. Save State
|
||||||
self.state.workflows.insert(req_id, Arc::new(Mutex::new(dag.clone())));
|
self.state.workflows.insert(req_id, Arc::new(Mutex::new(dag.clone())));
|
||||||
@ -82,7 +85,7 @@ impl WorkflowEngine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.publish_log(req_id, "workflow", "INFO", "DAG built and workflow initialized.").await;
|
info!("DAG built and workflow initialized.");
|
||||||
|
|
||||||
// 6. Trigger Initial Tasks
|
// 6. Trigger Initial Tasks
|
||||||
let initial_tasks = dag.get_initial_tasks();
|
let initial_tasks = dag.get_initial_tasks();
|
||||||
@ -92,6 +95,7 @@ impl WorkflowEngine {
|
|||||||
let mut dag_guard = dag_arc.lock().await;
|
let mut dag_guard = dag_arc.lock().await;
|
||||||
|
|
||||||
for task_id in initial_tasks {
|
for task_id in initial_tasks {
|
||||||
|
info!("Starting dispatch for initial task: {}", task_id);
|
||||||
self.dispatch_task(&mut dag_guard, &task_id, &self.state.vgcs).await?;
|
self.dispatch_task(&mut dag_guard, &task_id, &self.state.vgcs).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -100,7 +104,8 @@ impl WorkflowEngine {
|
|||||||
|
|
||||||
pub async fn handle_sync_state(&self, cmd: SyncStateCommand) -> Result<()> {
|
pub async fn handle_sync_state(&self, cmd: SyncStateCommand) -> Result<()> {
|
||||||
let req_id = cmd.request_id;
|
let req_id = cmd.request_id;
|
||||||
info!("Handling SyncStateCommand for {}", req_id);
|
// No held span guard across await
|
||||||
|
info!(request_id = %req_id, "Handling SyncStateCommand");
|
||||||
|
|
||||||
let dag_arc = match self.state.workflows.get(&req_id) {
|
let dag_arc = match self.state.workflows.get(&req_id) {
|
||||||
Some(d) => d.clone(),
|
Some(d) => d.clone(),
|
||||||
@ -127,6 +132,90 @@ impl WorkflowEngine {
|
|||||||
tasks_status.insert(task_id.clone(), status);
|
tasks_status.insert(task_id.clone(), status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Construct Comprehensive Task States
|
||||||
|
let mut task_states = HashMap::new();
|
||||||
|
|
||||||
|
// 1. Add active buffers
|
||||||
|
for (task_id, buffer) in &dag.task_execution_states {
|
||||||
|
// If task is not in nodes for some reason, skip
|
||||||
|
let status = dag.get_status(task_id);
|
||||||
|
let input_commit = dag.nodes.get(task_id).and_then(|n| n.input_commit.clone());
|
||||||
|
let output_commit = dag.commit_tracker.task_commits.get(task_id).cloned();
|
||||||
|
let metadata = dag.commit_tracker.task_metadata.get(task_id).cloned();
|
||||||
|
|
||||||
|
let status_dto = match status {
|
||||||
|
TaskStatus::Pending => MsgTaskStatus::Pending,
|
||||||
|
TaskStatus::Scheduled => MsgTaskStatus::Scheduled,
|
||||||
|
TaskStatus::Running => MsgTaskStatus::Running,
|
||||||
|
TaskStatus::Completed => MsgTaskStatus::Completed,
|
||||||
|
TaskStatus::Failed => MsgTaskStatus::Failed,
|
||||||
|
TaskStatus::Skipped => MsgTaskStatus::Skipped,
|
||||||
|
TaskStatus::Cancelled => MsgTaskStatus::Skipped,
|
||||||
|
};
|
||||||
|
|
||||||
|
// Safety: Truncate content if too large to avoid NATS payload limits (1MB default)
|
||||||
|
// We reserve some space for JSON overhead.
|
||||||
|
// If content is > 100KB, we truncate and append a warning.
|
||||||
|
// The full content should be fetched via separate API if needed.
|
||||||
|
let content_snapshot = if buffer.content_buffer.len() > 100_000 {
|
||||||
|
let mut s = buffer.content_buffer.chars().take(100_000).collect::<String>();
|
||||||
|
s.push_str("\n... [Content Truncated in Snapshot] ...");
|
||||||
|
Some(s)
|
||||||
|
} else {
|
||||||
|
Some(buffer.content_buffer.clone())
|
||||||
|
};
|
||||||
|
|
||||||
|
// Safety: Limit logs in snapshot
|
||||||
|
let logs_snapshot = if buffer.logs.len() > 100 {
|
||||||
|
buffer.logs.iter().rev().take(100).rev().cloned().collect()
|
||||||
|
} else {
|
||||||
|
buffer.logs.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
task_states.insert(task_id.clone(), TaskStateSnapshot {
|
||||||
|
task_id: task_id.clone(),
|
||||||
|
status: status_dto,
|
||||||
|
logs: logs_snapshot,
|
||||||
|
content: content_snapshot,
|
||||||
|
input_commit,
|
||||||
|
output_commit,
|
||||||
|
metadata,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Add remaining tasks (no buffer)
|
||||||
|
for task_id in dag.nodes.keys() {
|
||||||
|
if !task_states.contains_key(task_id) {
|
||||||
|
let status = dag.get_status(task_id);
|
||||||
|
let input_commit = dag.nodes.get(task_id).and_then(|n| n.input_commit.clone());
|
||||||
|
let output_commit = dag.commit_tracker.task_commits.get(task_id).cloned();
|
||||||
|
let metadata = dag.commit_tracker.task_metadata.get(task_id).cloned();
|
||||||
|
|
||||||
|
let status_dto = match status {
|
||||||
|
TaskStatus::Pending => MsgTaskStatus::Pending,
|
||||||
|
TaskStatus::Scheduled => MsgTaskStatus::Scheduled,
|
||||||
|
TaskStatus::Running => MsgTaskStatus::Running,
|
||||||
|
TaskStatus::Completed => MsgTaskStatus::Completed,
|
||||||
|
TaskStatus::Failed => MsgTaskStatus::Failed,
|
||||||
|
TaskStatus::Skipped => MsgTaskStatus::Skipped,
|
||||||
|
TaskStatus::Cancelled => MsgTaskStatus::Skipped,
|
||||||
|
};
|
||||||
|
|
||||||
|
task_states.insert(task_id.clone(), TaskStateSnapshot {
|
||||||
|
task_id: task_id.clone(),
|
||||||
|
status: status_dto,
|
||||||
|
logs: vec![],
|
||||||
|
content: None,
|
||||||
|
input_commit,
|
||||||
|
output_commit,
|
||||||
|
metadata,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read buffered logs for replay
|
||||||
|
let logs = self.state.log_manager.read_current_logs(&req_id.to_string()).unwrap_or_default();
|
||||||
|
|
||||||
// Create Snapshot Event
|
// Create Snapshot Event
|
||||||
let event = WorkflowEvent::WorkflowStateSnapshot {
|
let event = WorkflowEvent::WorkflowStateSnapshot {
|
||||||
timestamp: chrono::Utc::now().timestamp_millis(),
|
timestamp: chrono::Utc::now().timestamp_millis(),
|
||||||
@ -134,6 +223,8 @@ impl WorkflowEngine {
|
|||||||
tasks_status,
|
tasks_status,
|
||||||
tasks_output: dag.commit_tracker.task_commits.clone().into_iter().map(|(k, v)| (k, Some(v))).collect(),
|
tasks_output: dag.commit_tracker.task_commits.clone().into_iter().map(|(k, v)| (k, Some(v))).collect(),
|
||||||
tasks_metadata: dag.commit_tracker.task_metadata.clone(),
|
tasks_metadata: dag.commit_tracker.task_metadata.clone(),
|
||||||
|
task_states, // NEW
|
||||||
|
logs,
|
||||||
};
|
};
|
||||||
|
|
||||||
let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(req_id).to_string();
|
let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(req_id).to_string();
|
||||||
@ -145,8 +236,34 @@ impl WorkflowEngine {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- New Handler Methods for Stream Capture ---
|
||||||
|
|
||||||
|
pub async fn handle_task_stream_update(&self, task_id: String, content: String, req_id: Uuid) -> Result<()> {
|
||||||
|
if let Some(dag_arc) = self.state.workflows.get(&req_id) {
|
||||||
|
let mut dag = dag_arc.lock().await;
|
||||||
|
dag.append_content(&task_id, &content);
|
||||||
|
|
||||||
|
// We do NOT re-publish here. The Orchestrator listens to the public event stream
|
||||||
|
// merely to accumulate state for history/resume. The frontend receives the
|
||||||
|
// original event directly from the provider.
|
||||||
|
// Re-publishing would cause an infinite loop if the consumer listens to the same topic.
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle_task_log(&self, task_id: String, log: String, req_id: Uuid) -> Result<()> {
|
||||||
|
if let Some(dag_arc) = self.state.workflows.get(&req_id) {
|
||||||
|
let mut dag = dag_arc.lock().await;
|
||||||
|
dag.append_log(&task_id, log.clone());
|
||||||
|
|
||||||
|
// We do NOT re-publish here. See handle_task_stream_update.
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn handle_task_completed(&self, evt: WorkflowTaskEvent) -> Result<()> {
|
pub async fn handle_task_completed(&self, evt: WorkflowTaskEvent) -> Result<()> {
|
||||||
let req_id = evt.request_id;
|
let req_id = evt.request_id;
|
||||||
|
// No held span guard
|
||||||
|
|
||||||
let dag_arc = match self.state.workflows.get(&req_id) {
|
let dag_arc = match self.state.workflows.get(&req_id) {
|
||||||
Some(d) => d.clone(),
|
Some(d) => d.clone(),
|
||||||
@ -161,7 +278,7 @@ impl WorkflowEngine {
|
|||||||
// 1. Update Status & Record Commit
|
// 1. Update Status & Record Commit
|
||||||
dag.update_status(&evt.task_id, evt.status);
|
dag.update_status(&evt.task_id, evt.status);
|
||||||
|
|
||||||
self.publish_log(req_id, &evt.task_id, "INFO", &format!("Task status changed to {:?}", evt.status)).await;
|
info!("Task {} status changed to {:?}", evt.task_id, evt.status);
|
||||||
|
|
||||||
// Lookup task_type
|
// Lookup task_type
|
||||||
let task_type = dag.nodes.get(&evt.task_id).map(|n| n.task_type).unwrap_or(TaskType::DataFetch);
|
let task_type = dag.nodes.get(&evt.task_id).map(|n| n.task_type).unwrap_or(TaskType::DataFetch);
|
||||||
@ -225,7 +342,7 @@ impl WorkflowEngine {
|
|||||||
for task_id in ready_tasks {
|
for task_id in ready_tasks {
|
||||||
if let Err(e) = self.dispatch_task(&mut dag, &task_id, &self.state.vgcs).await {
|
if let Err(e) = self.dispatch_task(&mut dag, &task_id, &self.state.vgcs).await {
|
||||||
error!("Failed to dispatch task {}: {}", task_id, e);
|
error!("Failed to dispatch task {}: {}", task_id, e);
|
||||||
self.publish_log(req_id, &task_id, "ERROR", &format!("Failed to dispatch task: {}", e)).await;
|
info!("Failed to dispatch task {}: {}", task_id, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -235,6 +352,37 @@ impl WorkflowEngine {
|
|||||||
let end_time = chrono::Utc::now();
|
let end_time = chrono::Utc::now();
|
||||||
let timestamp = end_time.timestamp_millis();
|
let timestamp = end_time.timestamp_millis();
|
||||||
|
|
||||||
|
// --- Log Persistence (New) ---
|
||||||
|
let req_id_clone_for_log = req_id;
|
||||||
|
let vgcs_for_log = self.state.vgcs.clone();
|
||||||
|
let log_manager = self.state.log_manager.clone();
|
||||||
|
|
||||||
|
// We run this blocking operation here or spawn it?
|
||||||
|
// Spawn is safer to not block the loop, but we want it part of the "completion".
|
||||||
|
// Let's spawn, but it's fine if it's slightly async.
|
||||||
|
tokio::spawn(async move {
|
||||||
|
match log_manager.finalize(&req_id_clone_for_log.to_string()) {
|
||||||
|
Ok(log_content) => {
|
||||||
|
if !log_content.is_empty() {
|
||||||
|
|
||||||
|
let result = tokio::task::spawn_blocking(move || -> Result<String> {
|
||||||
|
let mut tx = vgcs_for_log.begin_transaction(&req_id_clone_for_log.to_string(), "")?;
|
||||||
|
tx.write("workflow.log", log_content.as_bytes())?;
|
||||||
|
// We use "System" as author
|
||||||
|
tx.commit("Persist Workflow Logs", "System")
|
||||||
|
}).await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(Ok(commit)) => info!("Persisted workflow logs to VGCS commit: {}", commit),
|
||||||
|
Ok(Err(e)) => error!("Failed to commit workflow logs: {}", e),
|
||||||
|
Err(e) => error!("Failed to join log persistence task: {}", e),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => error!("Failed to finalize logs: {}", e),
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// --- Snapshot Persistence ---
|
// --- Snapshot Persistence ---
|
||||||
let tasks_status_map = dag.nodes.iter().map(|(k, n)| {
|
let tasks_status_map = dag.nodes.iter().map(|(k, n)| {
|
||||||
let status = match n.status {
|
let status = match n.status {
|
||||||
@ -252,15 +400,75 @@ impl WorkflowEngine {
|
|||||||
let tasks_output_map = dag.commit_tracker.task_commits.clone().into_iter().map(|(k, v)| (k, Some(v))).collect::<std::collections::HashMap<_,_>>();
|
let tasks_output_map = dag.commit_tracker.task_commits.clone().into_iter().map(|(k, v)| (k, Some(v))).collect::<std::collections::HashMap<_,_>>();
|
||||||
let tasks_metadata_map = dag.commit_tracker.task_metadata.clone();
|
let tasks_metadata_map = dag.commit_tracker.task_metadata.clone();
|
||||||
|
|
||||||
|
// Construct Comprehensive Task States for final snapshot
|
||||||
|
let mut task_states = HashMap::new();
|
||||||
|
for (task_id, buffer) in &dag.task_execution_states {
|
||||||
|
let status = dag.get_status(task_id);
|
||||||
|
let input_commit = dag.nodes.get(task_id).and_then(|n| n.input_commit.clone());
|
||||||
|
let output_commit = dag.commit_tracker.task_commits.get(task_id).cloned();
|
||||||
|
let metadata = dag.commit_tracker.task_metadata.get(task_id).cloned();
|
||||||
|
|
||||||
|
let status_dto = match status {
|
||||||
|
TaskStatus::Pending => MsgTaskStatus::Pending,
|
||||||
|
TaskStatus::Scheduled => MsgTaskStatus::Scheduled,
|
||||||
|
TaskStatus::Running => MsgTaskStatus::Running,
|
||||||
|
TaskStatus::Completed => MsgTaskStatus::Completed,
|
||||||
|
TaskStatus::Failed => MsgTaskStatus::Failed,
|
||||||
|
TaskStatus::Skipped => MsgTaskStatus::Skipped,
|
||||||
|
TaskStatus::Cancelled => MsgTaskStatus::Skipped,
|
||||||
|
};
|
||||||
|
|
||||||
|
task_states.insert(task_id.clone(), TaskStateSnapshot {
|
||||||
|
task_id: task_id.clone(),
|
||||||
|
status: status_dto,
|
||||||
|
logs: buffer.logs.clone(),
|
||||||
|
content: Some(buffer.content_buffer.clone()),
|
||||||
|
input_commit,
|
||||||
|
output_commit,
|
||||||
|
metadata,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add remaining tasks
|
||||||
|
for task_id in dag.nodes.keys() {
|
||||||
|
if !task_states.contains_key(task_id) {
|
||||||
|
let status = dag.get_status(task_id);
|
||||||
|
let input_commit = dag.nodes.get(task_id).and_then(|n| n.input_commit.clone());
|
||||||
|
let output_commit = dag.commit_tracker.task_commits.get(task_id).cloned();
|
||||||
|
let metadata = dag.commit_tracker.task_metadata.get(task_id).cloned();
|
||||||
|
|
||||||
|
let status_dto = match status {
|
||||||
|
TaskStatus::Pending => MsgTaskStatus::Pending,
|
||||||
|
TaskStatus::Scheduled => MsgTaskStatus::Scheduled,
|
||||||
|
TaskStatus::Running => MsgTaskStatus::Running,
|
||||||
|
TaskStatus::Completed => MsgTaskStatus::Completed,
|
||||||
|
TaskStatus::Failed => MsgTaskStatus::Failed,
|
||||||
|
TaskStatus::Skipped => MsgTaskStatus::Skipped,
|
||||||
|
TaskStatus::Cancelled => MsgTaskStatus::Skipped,
|
||||||
|
};
|
||||||
|
|
||||||
|
task_states.insert(task_id.clone(), TaskStateSnapshot {
|
||||||
|
task_id: task_id.clone(),
|
||||||
|
status: status_dto,
|
||||||
|
logs: vec![],
|
||||||
|
content: None,
|
||||||
|
input_commit,
|
||||||
|
output_commit,
|
||||||
|
metadata,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let snapshot_event = WorkflowEvent::WorkflowStateSnapshot {
|
let snapshot_event = WorkflowEvent::WorkflowStateSnapshot {
|
||||||
timestamp,
|
timestamp,
|
||||||
task_graph: dag.to_dto(),
|
task_graph: dag.to_dto(),
|
||||||
tasks_status: tasks_status_map,
|
tasks_status: tasks_status_map,
|
||||||
tasks_output: tasks_output_map,
|
tasks_output: tasks_output_map,
|
||||||
tasks_metadata: tasks_metadata_map,
|
tasks_metadata: tasks_metadata_map,
|
||||||
|
logs: self.state.log_manager.read_current_logs(&req_id.to_string()).unwrap_or_default(),
|
||||||
|
task_states: std::collections::HashMap::new(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Extract symbol & market from any node
|
|
||||||
let symbol = dag.nodes.values().next()
|
let symbol = dag.nodes.values().next()
|
||||||
.and_then(|n| n.config.get("symbol"))
|
.and_then(|n| n.config.get("symbol"))
|
||||||
.and_then(|v| v.as_str())
|
.and_then(|v| v.as_str())
|
||||||
@ -288,13 +496,7 @@ impl WorkflowEngine {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// 2. Save New Workflow History
|
// 2. Save New Workflow History
|
||||||
let _start_time = dag.start_time; // We need to track start time in DAG or pass it
|
let _start_time = dag.start_time;
|
||||||
// For now, let's approximate or fetch if available.
|
|
||||||
// Actually, DAG doesn't track start time yet. We should probably add it.
|
|
||||||
// As a workaround, use now - X, or just now if we don't care about precision.
|
|
||||||
// Better: Assume orchestrator start log is close enough.
|
|
||||||
// Let's use end_time for both start/end if we don't track it, but that's bad.
|
|
||||||
// We will ignore start_time precision for this MVP refactor step.
|
|
||||||
let start_time_val = end_time;
|
let start_time_val = end_time;
|
||||||
|
|
||||||
let has_failures = dag.has_failures();
|
let has_failures = dag.has_failures();
|
||||||
@ -330,7 +532,7 @@ impl WorkflowEngine {
|
|||||||
|
|
||||||
let event = if dag.has_failures() {
|
let event = if dag.has_failures() {
|
||||||
info!("Workflow {} failed (some tasks failed)", req_id);
|
info!("Workflow {} failed (some tasks failed)", req_id);
|
||||||
self.publish_log(req_id, "workflow", "ERROR", "Workflow finished with failures.").await;
|
info!("Workflow finished with failures.");
|
||||||
WorkflowEvent::WorkflowFailed {
|
WorkflowEvent::WorkflowFailed {
|
||||||
end_timestamp: timestamp,
|
end_timestamp: timestamp,
|
||||||
reason: "Some tasks failed".to_string(),
|
reason: "Some tasks failed".to_string(),
|
||||||
@ -338,7 +540,7 @@ impl WorkflowEngine {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
info!("Workflow {} completed successfully", req_id);
|
info!("Workflow {} completed successfully", req_id);
|
||||||
self.publish_log(req_id, "workflow", "INFO", "Workflow completed successfully.").await;
|
info!("Workflow completed successfully.");
|
||||||
WorkflowEvent::WorkflowCompleted {
|
WorkflowEvent::WorkflowCompleted {
|
||||||
end_timestamp: timestamp,
|
end_timestamp: timestamp,
|
||||||
result_summary: Some(json!({})),
|
result_summary: Some(json!({})),
|
||||||
@ -367,7 +569,7 @@ impl WorkflowEngine {
|
|||||||
|
|
||||||
// 2. Update Status
|
// 2. Update Status
|
||||||
dag.update_status(task_id, TaskStatus::Scheduled);
|
dag.update_status(task_id, TaskStatus::Scheduled);
|
||||||
self.publish_log(dag.request_id, task_id, "INFO", "Task scheduled and dispatched.").await;
|
info!("Task {} scheduled and dispatched.", task_id);
|
||||||
|
|
||||||
// 3. Construct Command
|
// 3. Construct Command
|
||||||
let (routing_key, task_type, mut config, display_name) = {
|
let (routing_key, task_type, mut config, display_name) = {
|
||||||
@ -417,6 +619,8 @@ impl WorkflowEngine {
|
|||||||
obj.insert("input_bindings".to_string(), serde_json::to_value(&resolution.paths)?);
|
obj.insert("input_bindings".to_string(), serde_json::to_value(&resolution.paths)?);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
info!("Context resolution successful. Injecting bindings.");
|
||||||
|
|
||||||
// 2. Write Trace Sidecar to VGCS
|
// 2. Write Trace Sidecar to VGCS
|
||||||
let trace_path = io_binder.allocate_trace_path(task_type, &symbol, task_id, display_name.as_deref());
|
let trace_path = io_binder.allocate_trace_path(task_type, &symbol, task_id, display_name.as_deref());
|
||||||
|
|
||||||
@ -438,6 +642,7 @@ impl WorkflowEngine {
|
|||||||
match trace_commit_res {
|
match trace_commit_res {
|
||||||
Ok(Ok(new_commit)) => {
|
Ok(Ok(new_commit)) => {
|
||||||
info!("Written context resolution trace to {} (Commit: {})", trace_path, new_commit);
|
info!("Written context resolution trace to {} (Commit: {})", trace_path, new_commit);
|
||||||
|
|
||||||
// Update the base commit for the worker, so it sees the trace file (linear history)
|
// Update the base commit for the worker, so it sees the trace file (linear history)
|
||||||
current_base_commit = new_commit;
|
current_base_commit = new_commit;
|
||||||
// Update the context passed to the worker
|
// Update the context passed to the worker
|
||||||
@ -448,12 +653,16 @@ impl WorkflowEngine {
|
|||||||
// We are outside closure here.
|
// We are outside closure here.
|
||||||
dag.set_input_commit(task_id, current_base_commit);
|
dag.set_input_commit(task_id, current_base_commit);
|
||||||
},
|
},
|
||||||
Ok(Err(e)) => error!("Failed to write trace file: {}", e),
|
Ok(Err(e)) => {
|
||||||
|
error!("Failed to write trace file: {}", e);
|
||||||
|
warn!("Failed to write trace file: {}", e);
|
||||||
|
},
|
||||||
Err(e) => error!("Failed to join trace write task: {}", e),
|
Err(e) => error!("Failed to join trace write task: {}", e),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Context resolution failed for task {}: {}", task_id, e);
|
error!("Context resolution failed for task {}: {}", task_id, e);
|
||||||
|
warn!("Context resolution failed: {}", e);
|
||||||
// We proceed, but the worker might fail if it relies on inputs
|
// We proceed, but the worker might fail if it relies on inputs
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -461,6 +670,9 @@ impl WorkflowEngine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Capture for event
|
||||||
|
let input_commit_for_event = context.base_commit.clone();
|
||||||
|
|
||||||
let cmd = WorkflowTaskCommand {
|
let cmd = WorkflowTaskCommand {
|
||||||
request_id: dag.request_id,
|
request_id: dag.request_id,
|
||||||
task_id: task_id.to_string(),
|
task_id: task_id.to_string(),
|
||||||
@ -472,21 +684,72 @@ impl WorkflowEngine {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
// Special handling for Analysis Report Task to inject task_id into the specific command payload
|
// 4. Publish with Handshake (Request-Reply)
|
||||||
// (If the node config is used to build GenerateReportCommand downstream)
|
let subject = cmd.subject().to_string();
|
||||||
// Actually, WorkflowTaskCommand is generic. The specific worker (e.g. report-generator)
|
|
||||||
// usually consumes a specific command.
|
|
||||||
// BUT, the current architecture seems to have Orchestrator send `WorkflowTaskCommand`
|
|
||||||
// and the worker receives THAT?
|
|
||||||
|
|
||||||
// Let's check `report-generator-service` consumer.
|
|
||||||
|
|
||||||
// 4. Publish
|
|
||||||
let subject = cmd.subject().to_string(); // This uses the routing_key
|
|
||||||
let payload = serde_json::to_vec(&cmd)?;
|
let payload = serde_json::to_vec(&cmd)?;
|
||||||
|
|
||||||
info!("Dispatching task {} to subject {}", task_id, subject);
|
info!("Dispatching task {} to subject {} (waiting for ack)", task_id, subject);
|
||||||
self.nats.publish(subject, payload.into()).await?;
|
|
||||||
|
let request_timeout = std::time::Duration::from_secs(5);
|
||||||
|
let request_future = self.nats.request(subject.clone(), payload.into());
|
||||||
|
|
||||||
|
match tokio::time::timeout(request_timeout, request_future).await {
|
||||||
|
Ok(Ok(msg)) => {
|
||||||
|
// Parse Ack
|
||||||
|
match serde_json::from_slice::<TaskAcknowledgement>(&msg.payload) {
|
||||||
|
Ok(TaskAcknowledgement::Accepted) => {
|
||||||
|
info!("Task {} accepted by provider.", task_id);
|
||||||
|
// Task proceeds normally
|
||||||
|
},
|
||||||
|
Ok(TaskAcknowledgement::Rejected { reason }) => {
|
||||||
|
let err_msg = format!("Task rejected by provider: {}", reason);
|
||||||
|
warn!("Task {} rejected: {}", task_id, reason);
|
||||||
|
|
||||||
|
// Mark failed immediately
|
||||||
|
dag.update_status(task_id, TaskStatus::Failed);
|
||||||
|
error!("{}", err_msg);
|
||||||
|
|
||||||
|
// Emit failure event so frontend knows
|
||||||
|
let failure_event = WorkflowEvent::TaskStateChanged {
|
||||||
|
task_id: task_id.to_string(),
|
||||||
|
task_type,
|
||||||
|
status: MsgTaskStatus::Failed,
|
||||||
|
message: Some(err_msg.clone()),
|
||||||
|
timestamp: chrono::Utc::now().timestamp_millis(),
|
||||||
|
progress: None,
|
||||||
|
input_commit: input_commit_for_event,
|
||||||
|
output_commit: None,
|
||||||
|
};
|
||||||
|
// ... publish failure event ...
|
||||||
|
let subject_prog = common_contracts::subjects::NatsSubject::WorkflowProgress(dag.request_id).to_string();
|
||||||
|
if let Ok(p) = serde_json::to_vec(&failure_event) {
|
||||||
|
let _ = self.nats.publish(subject_prog, p.into()).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
return Err(anyhow::anyhow!(err_msg));
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
// Invalid Ack format, assume failure
|
||||||
|
let err_msg = format!("Invalid Ack from provider: {}", e);
|
||||||
|
error!("{}", err_msg);
|
||||||
|
dag.update_status(task_id, TaskStatus::Failed);
|
||||||
|
return Err(anyhow::anyhow!(err_msg));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
let err_msg = format!("NATS Request failed: {}", e);
|
||||||
|
error!("Task {} dispatch error: {}", task_id, e);
|
||||||
|
dag.update_status(task_id, TaskStatus::Failed);
|
||||||
|
return Err(anyhow::anyhow!(err_msg));
|
||||||
|
},
|
||||||
|
Err(_) => {
|
||||||
|
let err_msg = "Dispatch timeout (no ack from provider in 5s)";
|
||||||
|
error!("Task {} {}", task_id, err_msg);
|
||||||
|
dag.update_status(task_id, TaskStatus::Failed);
|
||||||
|
return Err(anyhow::anyhow!(err_msg));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -547,15 +810,31 @@ impl WorkflowEngine {
|
|||||||
if market == "MOCK" && fetch_tasks.is_empty() {
|
if market == "MOCK" && fetch_tasks.is_empty() {
|
||||||
let task_id = "fetch:mock".to_string();
|
let task_id = "fetch:mock".to_string();
|
||||||
fetch_tasks.push(task_id.clone());
|
fetch_tasks.push(task_id.clone());
|
||||||
|
|
||||||
|
let (actual_symbol, sim_mode) = if symbol.as_str().contains('|') {
|
||||||
|
let parts: Vec<&str> = symbol.as_str().split('|').collect();
|
||||||
|
(parts[0], Some(parts[1]))
|
||||||
|
} else {
|
||||||
|
(symbol.as_str(), None)
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut config = json!({
|
||||||
|
"symbol": actual_symbol,
|
||||||
|
"market": market
|
||||||
|
});
|
||||||
|
|
||||||
|
if let Some(mode) = sim_mode {
|
||||||
|
if let Some(obj) = config.as_object_mut() {
|
||||||
|
obj.insert("simulation_mode".to_string(), serde_json::Value::String(mode.to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
dag.add_node(
|
dag.add_node(
|
||||||
task_id,
|
task_id,
|
||||||
Some("Data Fetch (Mock)".to_string()),
|
Some("Data Fetch (Mock)".to_string()),
|
||||||
TaskType::DataFetch,
|
TaskType::DataFetch,
|
||||||
"provider.mock".to_string(),
|
"provider.mock".to_string(),
|
||||||
json!({
|
config
|
||||||
"symbol": symbol.as_str(),
|
|
||||||
"market": market
|
|
||||||
})
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -567,11 +846,6 @@ impl WorkflowEngine {
|
|||||||
for (module_id, module_config) in &template.modules {
|
for (module_id, module_config) in &template.modules {
|
||||||
let task_id = format!("analysis:{}", module_id);
|
let task_id = format!("analysis:{}", module_id);
|
||||||
|
|
||||||
// Pass module_id and template_id so the worker knows what to do
|
|
||||||
// We pass the FULL module config here if we want the worker to be stateless,
|
|
||||||
// BUT existing worker logic fetches template again.
|
|
||||||
// To support "Single Module Execution", we should probably pass the module_id.
|
|
||||||
|
|
||||||
let mut node_config = json!({
|
let mut node_config = json!({
|
||||||
"template_id": template_id,
|
"template_id": template_id,
|
||||||
"module_id": module_id,
|
"module_id": module_id,
|
||||||
@ -607,23 +881,4 @@ impl WorkflowEngine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn publish_log(&self, req_id: uuid::Uuid, task_id: &str, level: &str, message: &str) {
|
|
||||||
let event = WorkflowEvent::TaskLog {
|
|
||||||
task_id: task_id.to_string(),
|
|
||||||
level: level.to_string(),
|
|
||||||
message: message.to_string(),
|
|
||||||
timestamp: chrono::Utc::now().timestamp_millis(),
|
|
||||||
};
|
|
||||||
let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(req_id).to_string();
|
|
||||||
if let Ok(payload) = serde_json::to_vec(&event) {
|
|
||||||
// Fire and forget
|
|
||||||
let nats = self.nats.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = nats.publish(subject, payload.into()).await {
|
|
||||||
error!("Failed to publish TaskLog: {}", e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user