use crate::error::Result; use crate::state::AppState; use common_contracts::messages::FetchCompanyDataCommand; use futures_util::StreamExt; use std::sync::Arc; use tracing::{error, info}; const SUBJECT_NAME: &str = "data_fetch_commands"; pub async fn run(state: AppState) -> Result<()> { info!("Starting NATS message consumer..."); let client = async_nats::connect(&state.config.nats_addr).await?; info!("Connected to NATS."); // This is a simple subscriber. For production, consider JetStream for durability. let mut subscriber = client.subscribe(SUBJECT_NAME.to_string()).await?; info!( "Consumer started, waiting for messages on subject '{}'", SUBJECT_NAME ); while let Some(message) = subscriber.next().await { info!("Received NATS message."); let state_clone = state.clone(); let publisher_clone = client.clone(); tokio::spawn(async move { match serde_json::from_slice::(&message.payload) { Ok(command) => { info!("Deserialized command for symbol: {}", command.symbol); if let Err(e) = crate::worker::handle_fetch_command(state_clone, command, publisher_clone) .await { error!("Error handling fetch command: {:?}", e); } } Err(e) => { error!("Failed to deserialize message: {}", e); } } }); } Ok(()) } pub async fn subscribe_to_data_commands(app_state: Arc, nats_client: async_nats::Client) -> Result<()> { let mut subscriber = nats_client.subscribe("data_fetch_commands".to_string()).await?; while let Some(message) = subscriber.next().await { let command: FetchCompanyDataCommand = match serde_json::from_slice(&message.payload) { Ok(c) => c, Err(e) => { error!("Failed to deserialize message: {}", e); continue; } }; let task_id = command.request_id; if command.market.to_uppercase() == "CN" { info!( "Skipping command for symbol '{}' as its market ('{}') is 'CN'.", command.symbol, command.market ); continue; } app_state.tasks.insert(task_id, common_contracts::observability::TaskProgress { request_id: task_id, task_name: format!("finnhub:{}", command.symbol), status: "Received".to_string(), progress_percent: 0, details: "Command received".to_string(), started_at: chrono::Utc::now(), }); // Spawn the workflow in a separate task let workflow_state = app_state.clone(); let publisher_clone = nats_client.clone(); tokio::spawn(async move { let state_owned = (*workflow_state).clone(); let result = crate::worker::handle_fetch_command(state_owned, command, publisher_clone).await; if let Err(e) = result { error!( "Error executing Finnhub workflow for task {}: {:?}", task_id, e ); } }); } Ok(()) }