use std::sync::Arc; use common_contracts::{ dtos::{CompanyProfileDto, TimeSeriesFinancialDto}, messages::{CompanyProfilePersistedEvent, FetchCompanyDataCommand, FinancialsPersistedEvent}, }; use tokio::sync::mpsc; use tracing::info; use chrono::Datelike; use crate::{error::ProviderError, persistence::PersistenceClient, state::AppState}; pub async fn run_tushare_workflow( state: Arc, command: FetchCompanyDataCommand, completion_tx: mpsc::Sender<()>, ) -> Result<(), ProviderError> { let task_id = command.request_id; let symbol = command.symbol.clone(); // 1. Update task progress: Fetching data { let mut entry = state .tasks .get_mut(&task_id) .ok_or_else(|| ProviderError::Internal(anyhow::anyhow!("Task not found")))?; entry.status = "FetchingData".to_string(); entry.progress_percent = 10; entry.details = "Starting data fetch from Tushare".to_string(); } // 2. Fetch data using the provider let (profile, financials) = state .tushare_provider .fetch_all_data(&symbol) .await?; // 3. Update task progress: Persisting data { let mut entry = state .tasks .get_mut(&task_id) .ok_or_else(|| ProviderError::Internal(anyhow::anyhow!("Task not found")))?; entry.status = "PersistingData".to_string(); entry.progress_percent = 60; entry.details = "Data fetched, persisting to database".to_string(); } // 4. Persist data let persistence_client = PersistenceClient::new(state.config.data_persistence_service_url.clone()); persist_data( &persistence_client, &profile, &financials, &state, task_id, ) .await?; // 5. Publish events let nats_client = async_nats::connect(&state.config.nats_addr) .await .map_err(|e| ProviderError::Internal(anyhow::anyhow!("NATS connection failed: {}", e)))?; publish_events(&nats_client, &command, &financials).await?; // 6. Finalize task { let mut entry = state .tasks .get_mut(&task_id) .ok_or_else(|| ProviderError::Internal(anyhow::anyhow!("Task not found")))?; entry.status = "Completed".to_string(); entry.progress_percent = 100; entry.details = "Workflow finished successfully".to_string(); } let _ = completion_tx.send(()).await; info!( "Tushare workflow for symbol {} completed successfully.", symbol ); Ok(()) } async fn persist_data( client: &PersistenceClient, profile: &CompanyProfileDto, financials: &[TimeSeriesFinancialDto], state: &Arc, task_id: uuid::Uuid, ) -> Result<(), ProviderError> { // In a real implementation, we'd use tokio::try_join! to run these in parallel. if let Err(e) = client.upsert_company_profile(profile.clone()).await { state .tasks .get_mut(&task_id) .unwrap() .details = format!("Failed to save profile: {}", e); return Err(e); } { let mut task = state.tasks.get_mut(&task_id).unwrap(); task.progress_percent = 75; task.details = "Company profile saved".to_string(); } if let Err(e) = client.batch_insert_financials(financials.to_vec()).await { state .tasks .get_mut(&task_id) .unwrap() .details = format!("Failed to save financials: {}", e); return Err(e); } { let mut task = state.tasks.get_mut(&task_id).unwrap(); task.progress_percent = 90; task.details = "Financial statements saved".to_string(); } Ok(()) } async fn publish_events( nats_client: &async_nats::Client, command: &FetchCompanyDataCommand, financials: &[TimeSeriesFinancialDto], ) -> Result<(), ProviderError> { let profile_event = CompanyProfilePersistedEvent { request_id: command.request_id, symbol: command.symbol.clone(), }; nats_client .publish( "events.data.company_profile_persisted", serde_json::to_vec(&profile_event).unwrap().into(), ) .await .map_err(|e| ProviderError::Internal(anyhow::anyhow!("Event publishing failed: {}", e)))?; let years: std::collections::BTreeSet = financials .iter() .map(|f| f.period_date.year() as u16) .collect(); let financials_event = FinancialsPersistedEvent { request_id: command.request_id, symbol: command.symbol.clone(), years_updated: years.into_iter().collect(), }; nats_client .publish( "events.data.financials_persisted", serde_json::to_vec(&financials_event).unwrap().into(), ) .await .map_err(|e| ProviderError::Internal(anyhow::anyhow!("Event publishing failed: {}", e)))?; Ok(()) }