Fundamental_Analysis/services/finnhub-provider-service/src/message_consumer.rs
Lv, Qi e9e4d0c1b3 chore: massive update covering recent refactoring and bug fixes
- fix: infinite message loop in workflow orchestrator
- feat: restore realtime LLM streaming from report generator to frontend
- refactor: major update to provider services (generic workers, workflow adapters)
- refactor: common contracts and message definitions updated
- feat: enhanced logging and observability in orchestrator
- docs: update project management tasks and status
- chore: dependency updates and config adjustments
2025-11-30 19:17:02 +08:00

91 lines
3.4 KiB
Rust

use crate::error::Result;
use crate::state::{AppState, ServiceOperationalStatus};
use common_contracts::workflow_types::WorkflowTaskCommand;
use futures_util::StreamExt;
use std::time::Duration;
use tracing::{error, info, warn};
pub async fn run(state: AppState) -> Result<()> {
info!("Starting NATS message consumer...");
loop {
let status = state.status.read().await.clone();
if let ServiceOperationalStatus::Degraded { reason } = status {
warn!(
"Service is in degraded state (reason: {}). Pausing message consumption for 5s.",
reason
);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
info!("Service is Active. Connecting to NATS...");
match async_nats::connect(&state.config.nats_addr).await {
Ok(client) => {
info!("Successfully connected to NATS.");
if let Err(e) = subscribe_workflow(state.clone(), client).await {
error!("NATS subscription error: {}. Reconnecting in 10s...", e);
}
}
Err(e) => {
error!("Failed to connect to NATS: {}. Retrying in 10s...", e);
}
}
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
use common_contracts::ack::TaskAcknowledgement;
async fn subscribe_workflow(state: AppState, client: async_nats::Client) -> Result<()> {
// Finnhub routing key: provider.finnhub
let subject = "workflow.cmd.provider.finnhub".to_string();
let mut subscriber = client.subscribe(subject.clone()).await?;
info!("Workflow Consumer started on '{}'", subject);
while let Some(message) = subscriber.next().await {
// Check status
let current_status = state.status.read().await.clone();
if matches!(current_status, ServiceOperationalStatus::Degraded {..}) {
warn!("Service became degraded. Disconnecting from NATS.");
// Reject if degraded
if let Some(reply_to) = message.reply {
let ack = TaskAcknowledgement::Rejected { reason: "Service degraded".to_string() };
if let Ok(payload) = serde_json::to_vec(&ack) {
let _ = client.publish(reply_to, payload.into()).await;
}
}
subscriber.unsubscribe().await?;
return Ok(());
}
// Accept
if let Some(reply_to) = message.reply.clone() {
let ack = TaskAcknowledgement::Accepted;
if let Ok(payload) = serde_json::to_vec(&ack) {
if let Err(e) = client.publish(reply_to, payload.into()).await {
error!("Failed to send Acceptance Ack: {}", e);
}
}
}
let state = state.clone();
let client = client.clone();
tokio::spawn(async move {
match serde_json::from_slice::<WorkflowTaskCommand>(&message.payload) {
Ok(cmd) => {
info!("Received workflow command for task: {}", cmd.task_id);
if let Err(e) = crate::generic_worker::handle_workflow_command(state, client, cmd).await {
error!("Generic worker handler failed: {}", e);
}
},
Err(e) => error!("Failed to parse WorkflowTaskCommand: {}", e),
}
});
}
Ok(())
}