use crate::error::Result; use crate::state::{AppState, ServiceOperationalStatus}; use common_contracts::messages::FetchCompanyDataCommand; use futures_util::StreamExt; use tracing::{error, info, warn}; use std::sync::Arc; use tokio::sync::mpsc; use chrono::Utc; use std::time::Duration; const SUBJECT_NAME: &str = "data_fetch_commands"; pub async fn run(state: AppState) -> Result<()> { info!("Starting NATS message consumer..."); loop { let status = state.status.read().await.clone(); if let ServiceOperationalStatus::Degraded { reason } = status { warn!( "Service is in degraded state (reason: {}). Pausing message consumption for 30s.", reason ); tokio::time::sleep(Duration::from_secs(30)).await; continue; } info!("Service is Active. Connecting to NATS..."); match async_nats::connect(&state.config.nats_addr).await { Ok(client) => { info!("Successfully connected to NATS."); if let Err(e) = subscribe_and_process(state.clone(), client).await { error!("NATS subscription error: {}. Reconnecting in 10s...", e); } } Err(e) => { error!("Failed to connect to NATS: {}. Retrying in 10s...", e); } } tokio::time::sleep(Duration::from_secs(10)).await; } } async fn subscribe_and_process( state: AppState, client: async_nats::Client, ) -> Result<()> { let mut subscriber = client.subscribe(SUBJECT_NAME.to_string()).await?; info!( "Consumer started, waiting for messages on subject '{}'", SUBJECT_NAME ); while let Some(message) = subscriber.next().await { let current_status = state.status.read().await.clone(); if matches!(current_status, ServiceOperationalStatus::Degraded {..}) { warn!("Service became degraded. Disconnecting from NATS and pausing consumption."); subscriber.unsubscribe().await?; return Ok(()); } info!("Received NATS message."); let state_for_closure = Arc::new(state.clone()); tokio::spawn(async move { if let Err(e) = serde_json::from_slice::(&message.payload) { error!("Failed to deserialize message: {}", e); warn!("Received non-json message: {:?}", message.payload); return; } let command = match serde_json::from_slice::(&message.payload) { Ok(c) => c, Err(e) => { error!("Failed to deserialize message: {}", e); return; } }; info!("Received data fetch command for symbol: {}", command.symbol); // Tushare is for the Chinese market ("CN") if command.market.to_uppercase() != "CN" { info!( "Skipping command for symbol '{}' as its market ('{}') is not 'CN'.", command.symbol, command.market ); return; } let (tx, rx) = mpsc::channel(1); let task_id = command.request_id; // Initialize deterministic progress entry state_for_closure.tasks.insert(task_id, common_contracts::observability::TaskProgress { request_id: task_id, task_name: format!("tushare:{}", command.symbol), status: "Received".to_string(), progress_percent: 0, details: "Command received".to_string(), started_at: Utc::now(), }); // Spawn the workflow in a separate task let workflow_state = state_for_closure.clone(); tokio::spawn(async move { let workflow_state_for_error = workflow_state.clone(); let result = crate::worker::run_tushare_workflow(workflow_state, command, tx).await; if let Err(e) = result { error!( "Error executing Tushare workflow for task {}: {:?}", task_id, e ); // Update task to failed status if let Some(mut task) = workflow_state_for_error.tasks.get_mut(&task_id) { task.status = "Failed".to_string(); task.details = format!("Workflow failed: {}", e); } } }); // Spawn a separate task to clean up the task entry after completion or timeout let cleanup_state = state_for_closure.clone(); tokio::spawn(async move { let mut rx = rx; match rx.recv().await { Some(_) => { info!("Task {} completed successfully, removing from map.", task_id); cleanup_state.tasks.remove(&task_id); } None => { warn!( "Task {} completion signal not received, removing after timeout.", task_id ); cleanup_state.tasks.remove(&task_id); } } }); }); } Ok(()) }