Fundamental_Analysis/services/common-contracts/src/workflow_runner.rs
Lv, Qi 70b30b39d8 Refactor: Complete transition from analysis_results to workflow_history
- Removed dependencies on analysis_results table
- Implemented workflow_history storage in Data Persistence Service
- Updated Workflow Orchestrator to save workflow snapshots to history
- Refactored Frontend to consume workflow_history and fetch reports via VGCS
- Fixed Data Providers (Tushare, YFinance) to report output paths in metadata
- Updated documentation and task status
2025-11-29 17:55:54 +08:00

165 lines
6.4 KiB
Rust

use std::sync::Arc;
use anyhow::Result;
use tracing::{info, error};
use async_nats::Client;
use crate::workflow_types::{WorkflowTaskCommand, WorkflowTaskEvent, TaskStatus, TaskResult};
use crate::messages::WorkflowEvent as CommonWorkflowEvent;
use crate::workflow_node::{WorkflowNode, NodeContext};
use crate::subjects::SubjectMessage;
use workflow_context::WorkerContext;
pub struct WorkflowNodeRunner {
nats: Client,
}
impl WorkflowNodeRunner {
pub fn new(nats: Client) -> Self {
Self { nats }
}
pub async fn run<N>(&self, node: Arc<N>, cmd: WorkflowTaskCommand) -> Result<()>
where
N: WorkflowNode + 'static
{
let task_id = cmd.task_id.clone();
info!("Starting node execution: type={}, task_id={}", node.node_type(), task_id);
// 1. Prepare Context
let root_path = cmd.storage.root_path.clone();
let req_id = cmd.request_id.to_string();
let base_commit = cmd.context.base_commit.clone().unwrap_or_default();
let context = NodeContext::new(req_id.clone(), base_commit.clone(), root_path.clone());
// 2. Execute Node Logic (Async)
let exec_result = match node.execute(&context, &cmd.config).await {
Ok(res) => res,
Err(e) => {
return self.handle_failure(&cmd, &e.to_string()).await;
}
};
// 3. Render Report (Sync)
let report_md = match node.render_report(&exec_result) {
Ok(md) => md,
Err(e) => {
return self.handle_failure(&cmd, &format!("Report rendering failed: {}", e)).await;
}
};
// 4. VGCS Operations (Blocking)
let node_clone = node.clone();
let task_id_clone = task_id.clone();
let base_commit_clone = base_commit.clone();
let root_path_clone = root_path.clone();
let req_id_clone = req_id.clone();
let exec_result_artifacts = exec_result.artifacts;
let report_md_clone = report_md.clone();
let symbol = cmd.config.get("symbol").and_then(|s| s.as_str()).unwrap_or("unknown").to_string();
let symbol_for_blocking = symbol.clone();
// We also want to generate an execution log (basic one for now)
// In future, we might want to capture logs during execute()
let execution_log = format!("# Execution Log for {}\n\nTask ID: {}\nNode Type: {}\nStatus: Success\n", task_id, task_id, node.node_type());
let commit_res = tokio::task::spawn_blocking(move || -> Result<String> {
let mut ctx = WorkerContext::new(&root_path_clone, &req_id_clone, &base_commit_clone);
// Define output directory convention
let base_dir = format!("raw/{}/{}", node_clone.node_type(), symbol_for_blocking);
// Write Artifacts
for (filename, content) in exec_result_artifacts {
let full_path = format!("{}/{}", base_dir, filename);
let bytes = content.as_bytes()?;
// WorkerContext write_file takes &str for now
ctx.write_file(&full_path, std::str::from_utf8(&bytes).unwrap_or(""))?;
}
// Write Report
let report_path = format!("{}/report.md", base_dir);
ctx.write_file(&report_path, &report_md_clone)?;
// Write Execution Log
let log_path = format!("{}/_execution.md", base_dir);
ctx.write_file(&log_path, &execution_log)?;
// Commit
let commit_msg = format!("Task {} ({}) completed", task_id_clone, node_clone.node_type());
let new_commit = ctx.commit(&commit_msg)?;
Ok(new_commit)
}).await;
let new_commit = match commit_res {
Ok(Ok(commit)) => commit,
Ok(Err(e)) => return self.handle_failure(&cmd, &format!("VGCS error: {}", e)).await,
Err(e) => return self.handle_failure(&cmd, &format!("Task join error: {}", e)).await,
};
// 5. Publish Stream Update
let stream_event = CommonWorkflowEvent::TaskStreamUpdate {
task_id: task_id.clone(),
content_delta: report_md.clone(),
index: 0,
};
self.publish_common(&cmd.request_id, stream_event).await?;
// 5.1 Update Meta Summary with Paths
let mut summary = exec_result.meta_summary.clone().unwrap_or(serde_json::json!({}));
if let Some(obj) = summary.as_object_mut() {
// Reconstruct paths used in VGCS block (must match)
let base_dir = format!("raw/{}/{}", node.node_type(), symbol);
obj.insert("output_path".to_string(), serde_json::Value::String(format!("{}/report.md", base_dir)));
obj.insert("execution_log_path".to_string(), serde_json::Value::String(format!("{}/_execution.md", base_dir)));
}
// 6. Publish Completion Event
let event = WorkflowTaskEvent {
request_id: cmd.request_id,
task_id: task_id,
status: TaskStatus::Completed,
result: Some(TaskResult {
new_commit: Some(new_commit),
error: None,
summary: Some(summary),
}),
};
self.publish_event(event).await?;
info!("Task {} finished successfully", cmd.task_id);
Ok(())
}
async fn handle_failure(&self, cmd: &WorkflowTaskCommand, error_msg: &str) -> Result<()> {
error!("Task {} failed: {}", cmd.task_id, error_msg);
let event = WorkflowTaskEvent {
request_id: cmd.request_id,
task_id: cmd.task_id.clone(),
status: TaskStatus::Failed,
result: Some(TaskResult {
new_commit: None,
error: Some(error_msg.to_string()),
summary: None,
}),
};
self.publish_event(event).await
}
async fn publish_event(&self, event: WorkflowTaskEvent) -> Result<()> {
let subject = event.subject().to_string();
let payload = serde_json::to_vec(&event)?;
self.nats.publish(subject, payload.into()).await?;
Ok(())
}
async fn publish_common(&self, req_id: &uuid::Uuid, event: CommonWorkflowEvent) -> Result<()> {
let subject = crate::subjects::NatsSubject::WorkflowProgress(*req_id).to_string();
let payload = serde_json::to_vec(&event)?;
self.nats.publish(subject, payload.into()).await?;
Ok(())
}
}