use std::sync::Arc; use common_contracts::{ dtos::{CompanyProfileDto, TimeSeriesFinancialDto}, messages::{CompanyProfilePersistedEvent, FetchCompanyDataCommand, FinancialsPersistedEvent}, }; use tokio::sync::mpsc; use tracing::{info, error}; use chrono::Datelike; use crate::{error::AppError, persistence::PersistenceClient, state::AppState}; pub async fn run_tushare_workflow( state: Arc, command: FetchCompanyDataCommand, completion_tx: mpsc::Sender<()>, ) -> Result<(), AppError> { let task_id = command.request_id; let symbol = command.symbol.clone(); let provider = match state.get_provider().await { Some(p) => p, None => { let reason = "Execution failed: Tushare provider is not available (misconfigured).".to_string(); error!("{}", reason); if let Some(mut task) = state.tasks.get_mut(&task_id) { task.status = "Failed".to_string(); task.details = reason.clone(); } return Err(AppError::ProviderNotAvailable(reason)); } }; // 1. Update task progress: Fetching data { let mut entry = state .tasks .get_mut(&task_id) .ok_or_else(|| AppError::Internal("Task not found".to_string()))?; entry.status = "CheckingCache".to_string(); entry.progress_percent = 5; entry.details = "Checking local data freshness".to_string(); } // Check freshness let persistence_client = PersistenceClient::new(state.config.data_persistence_service_url.clone()); let mut is_fresh = false; match persistence_client.get_company_profile(&command.symbol).await { Ok(Some(p)) => { if let Some(updated_at) = p.updated_at { let age = chrono::Utc::now() - updated_at; if age < chrono::Duration::hours(24) { info!("Data for {} is fresh (age: {}h). Skipping fetch.", command.symbol, age.num_hours()); is_fresh = true; } } } Ok(None) => {} Err(e) => tracing::warn!("Failed to check profile freshness: {}", e), } if is_fresh { { let mut entry = state .tasks .get_mut(&task_id) .ok_or_else(|| AppError::Internal("Task not found".to_string()))?; entry.status = "Completed".to_string(); entry.progress_percent = 100; entry.details = "Data retrieved from cache".to_string(); } let nats_client = async_nats::connect(&state.config.nats_addr) .await .map_err(|e| AppError::Internal(format!("NATS connection failed: {}", e)))?; let financials_event = FinancialsPersistedEvent { request_id: command.request_id, symbol: command.symbol.clone(), years_updated: vec![], template_id: command.template_id.clone(), }; nats_client .publish( "events.data.financials_persisted", serde_json::to_vec(&financials_event).unwrap().into(), ) .await?; let _ = completion_tx.send(()).await; return Ok(()); } { let mut entry = state .tasks .get_mut(&task_id) .ok_or_else(|| AppError::Internal("Task not found".to_string()))?; entry.status = "FetchingData".to_string(); entry.progress_percent = 10; entry.details = "Starting data fetch from Tushare".to_string(); } // 2. Fetch data using the provider let (profile, financials) = provider.fetch_all_data(&symbol).await?; // 3. Update task progress: Persisting data { let mut entry = state .tasks .get_mut(&task_id) .ok_or_else(|| AppError::Internal("Task not found".to_string()))?; entry.status = "PersistingData".to_string(); entry.progress_percent = 60; entry.details = "Data fetched, persisting to database".to_string(); } // 4. Persist data // persistence_client already created above persist_data( &persistence_client, &profile, &financials, &state, task_id, ) .await?; // 5. Publish events let nats_client = async_nats::connect(&state.config.nats_addr) .await .map_err(|e| AppError::Internal(format!("NATS connection failed: {}", e)))?; publish_events(&nats_client, &command, &financials).await?; // 6. Finalize task { let mut entry = state .tasks .get_mut(&task_id) .ok_or_else(|| AppError::Internal("Task not found".to_string()))?; entry.status = "Completed".to_string(); entry.progress_percent = 100; entry.details = "Workflow finished successfully".to_string(); } let _ = completion_tx.send(()).await; info!( "Tushare workflow for symbol {} completed successfully.", symbol ); Ok(()) } async fn persist_data( client: &PersistenceClient, profile: &CompanyProfileDto, financials: &[TimeSeriesFinancialDto], state: &Arc, task_id: uuid::Uuid, ) -> Result<(), AppError> { // In a real implementation, we'd use tokio::try_join! to run these in parallel. if let Err(e) = client.upsert_company_profile(profile.clone()).await { state .tasks .get_mut(&task_id) .unwrap() .details = format!("Failed to save profile: {}", e); return Err(e); } { let mut task = state.tasks.get_mut(&task_id).unwrap(); task.progress_percent = 75; task.details = "Company profile saved".to_string(); } if let Err(e) = client.batch_insert_financials(financials.to_vec()).await { state .tasks .get_mut(&task_id) .unwrap() .details = format!("Failed to save financials: {}", e); return Err(e); } { let mut task = state.tasks.get_mut(&task_id).unwrap(); task.progress_percent = 90; task.details = "Financial statements saved".to_string(); } Ok(()) } async fn publish_events( nats_client: &async_nats::Client, command: &FetchCompanyDataCommand, financials: &[TimeSeriesFinancialDto], ) -> Result<(), AppError> { let profile_event = CompanyProfilePersistedEvent { request_id: command.request_id, symbol: command.symbol.clone(), }; nats_client .publish( "events.data.company_profile_persisted", serde_json::to_vec(&profile_event).unwrap().into(), ) .await?; let years: std::collections::BTreeSet = financials .iter() .map(|f| f.period_date.year() as u16) .collect(); let financials_event = FinancialsPersistedEvent { request_id: command.request_id, symbol: command.symbol.clone(), years_updated: years.into_iter().collect(), template_id: command.template_id.clone(), }; nats_client .publish( "events.data.financials_persisted", serde_json::to_vec(&financials_event).unwrap().into(), ) .await?; Ok(()) }