use crate::error::{AppError, Result}; use crate::persistence::PersistenceClient; use crate::state::AppState; use chrono::Datelike; use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto}; use common_contracts::messages::{CompanyProfilePersistedEvent, FetchCompanyDataCommand, FinancialsPersistedEvent}; use common_contracts::observability::TaskProgress; use tracing::{error, info}; pub async fn handle_fetch_command( state: AppState, command: FetchCompanyDataCommand, publisher: async_nats::Client, ) -> Result<()> { info!("Handling Finnhub fetch data command."); state.tasks.insert( command.request_id, TaskProgress { request_id: command.request_id, task_name: format!("finnhub:{}", command.symbol), status: "FetchingData".to_string(), progress_percent: 10, details: "Fetching data from Finnhub".to_string(), started_at: chrono::Utc::now(), }, ); let provider = match state.get_provider().await { Some(p) => p, None => { let reason = "Execution failed: Finnhub provider is not available (misconfigured).".to_string(); error!("{}", reason); if let Some(mut task) = state.tasks.get_mut(&command.request_id) { task.status = "Failed".to_string(); task.details = reason.clone(); } return Err(AppError::ProviderNotAvailable(reason)); } }; // 1. Fetch data via provider let (profile, financials): (CompanyProfileDto, Vec) = provider.fetch_all_data(&command.symbol).await?; // 2. Persist { if let Some(mut task) = state.tasks.get_mut(&command.request_id) { task.status = "PersistingData".to_string(); task.progress_percent = 60; task.details = "Persisting data to database".to_string(); } } let persistence_client = PersistenceClient::new(state.config.data_persistence_service_url.clone()); persistence_client.upsert_company_profile(profile).await?; let years_set: std::collections::BTreeSet = financials.iter().map(|f| f.period_date.year() as u16).collect(); persistence_client.batch_insert_financials(financials).await?; // 3. Publish events let profile_event = CompanyProfilePersistedEvent { request_id: command.request_id, symbol: command.symbol.clone(), }; publisher .publish( "events.data.company_profile_persisted".to_string(), serde_json::to_vec(&profile_event).unwrap().into(), ) .await?; let financials_event = FinancialsPersistedEvent { request_id: command.request_id, symbol: command.symbol.clone(), years_updated: years_set.into_iter().collect(), }; publisher .publish( "events.data.financials_persisted".to_string(), serde_json::to_vec(&financials_event).unwrap().into(), ) .await?; // 4. Finalize if let Some(mut task) = state.tasks.get_mut(&command.request_id) { task.status = "Completed".to_string(); task.progress_percent = 100; task.details = "Workflow finished successfully".to_string(); } info!("Task {} completed successfully.", command.request_id); Ok(()) }