use anyhow::{Result, Context}; use tracing::{info, error, warn}; use common_contracts::workflow_types::{WorkflowTaskCommand, WorkflowTaskEvent, TaskStatus, TaskResult}; use common_contracts::subjects::SubjectMessage; use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto}; use workflow_context::WorkerContext; use crate::state::AppState; use serde_json::json; pub async fn handle_workflow_command(state: AppState, nats: async_nats::Client, cmd: WorkflowTaskCommand) -> Result<()> { info!("Processing generic workflow command: task_id={}", cmd.task_id); // 1. Parse Config let symbol_code = cmd.config.get("symbol").and_then(|s| s.as_str()).unwrap_or("").to_string(); let market = cmd.config.get("market").and_then(|s| s.as_str()).unwrap_or("US").to_string(); if symbol_code.is_empty() { return send_failure(&nats, &cmd, "Missing symbol in config").await; } // 2. Initialize Worker Context // Note: We use the provided base_commit. If it's empty, it means start from scratch (or empty repo). // We need to mount the volume. let root_path = cmd.storage.root_path.clone(); // 3. Fetch Data (with Cache) let fetch_result = fetch_and_cache(&state, &symbol_code, &market).await; let (profile, financials) = match fetch_result { Ok(data) => data, Err(e) => return send_failure(&nats, &cmd, &format!("Fetch failed: {}", e)).await, }; // 4. Write to VGCS (Spawn blocking task for Git operations) let req_id = cmd.request_id.to_string(); let base_commit = cmd.context.base_commit.clone().unwrap_or_default(); let _task_id = cmd.task_id.clone(); // Clone data needed for closure let profile_clone = profile.clone(); let financials_clone = financials.clone(); let symbol_code_clone = symbol_code.clone(); let commit_result = tokio::task::spawn_blocking(move || -> Result { let mut ctx = WorkerContext::new(&root_path, &req_id, &base_commit); let base_dir = format!("raw/yfinance/{}", symbol_code_clone); let profile_json = serde_json::to_string_pretty(&profile_clone) .context("Failed to serialize profile")?; ctx.write_file(&format!("{}/profile.json", base_dir), &profile_json)?; let financials_json = serde_json::to_string_pretty(&financials_clone) .context("Failed to serialize financials")?; ctx.write_file(&format!("{}/financials.json", base_dir), &financials_json)?; ctx.commit(&format!("Fetched YFinance data for {}", symbol_code_clone)) }).await; let new_commit = match commit_result { Ok(res) => match res { Ok(c) => c, Err(e) => return send_failure(&nats, &cmd, &format!("VGCS failed: {}", e)).await, }, Err(e) => return send_failure(&nats, &cmd, &format!("Task join error: {}", e)).await, }; info!("Task {} completed. New commit: {}", cmd.task_id, new_commit); // 6. Send Success Event let event = WorkflowTaskEvent { request_id: cmd.request_id, task_id: cmd.task_id, status: TaskStatus::Completed, result: Some(TaskResult { new_commit: Some(new_commit), error: None, summary: Some(json!({ "symbol": symbol_code, "records": financials.len() })), }), }; publish_event(&nats, event).await } async fn fetch_and_cache(state: &AppState, symbol: &str, _market: &str) -> Result<(CompanyProfileDto, Vec)> { // 1. Get Provider // yfinance_provider is likely Arc, and Provider is Clone/ThreadSafe. let provider = state.yfinance_provider.clone(); // 2. Call fetch let (profile, financials) = provider.fetch_all_data(symbol).await .context("Failed to fetch data from YFinance")?; // 3. Write to DB Cache use common_contracts::persistence_client::PersistenceClient; use common_contracts::workflow_harness::TaskState; // For get_persistence_url let persistence_url = state.get_persistence_url(); let p_client = PersistenceClient::new(persistence_url); if let Err(e) = p_client.upsert_company_profile(profile.clone()).await { warn!("Failed to cache company profile: {}", e); } Ok((profile, financials)) } async fn send_failure(nats: &async_nats::Client, cmd: &WorkflowTaskCommand, error_msg: &str) -> Result<()> { error!("Task {} failed: {}", cmd.task_id, error_msg); let event = WorkflowTaskEvent { request_id: cmd.request_id, task_id: cmd.task_id.clone(), status: TaskStatus::Failed, result: Some(TaskResult { new_commit: None, error: Some(error_msg.to_string()), summary: None, }), }; publish_event(nats, event).await } async fn publish_event(nats: &async_nats::Client, event: WorkflowTaskEvent) -> Result<()> { let subject = event.subject().to_string(); let payload = serde_json::to_vec(&event)?; nats.publish(subject, payload.into()).await?; Ok(()) }