- secrecy: migrate Secret<String> -> SecretString across services; adjust usages - warnings: clean unused imports/structs (no behavior change) - docker-compose: remove unnecessary host port mappings; rely on app-network; keep frontend only - build: set build.context to repo root; reference per-service Dockerfiles - add: root .dockerignore to exclude target/node_modules/ref/archive/docs and logs - data-persistence-service: refine cargo-chef with path deps; slim context; copy correct targets - Dockerfiles: normalize multi-stage builds; fix WORKDIR and release binary paths - nats: resolve 4222 conflict by not binding to host - verified: all rust services build successfully with new flow
96 lines
3.3 KiB
Rust
96 lines
3.3 KiB
Rust
use crate::error::Result;
|
|
use crate::state::AppState;
|
|
use common_contracts::messages::FetchCompanyDataCommand;
|
|
use futures_util::StreamExt;
|
|
use std::sync::Arc;
|
|
use tracing::{error, info};
|
|
|
|
const SUBJECT_NAME: &str = "data_fetch_commands";
|
|
|
|
pub async fn run(state: AppState) -> Result<()> {
|
|
info!("Starting NATS message consumer...");
|
|
|
|
let client = async_nats::connect(&state.config.nats_addr).await?;
|
|
info!("Connected to NATS.");
|
|
|
|
// This is a simple subscriber. For production, consider JetStream for durability.
|
|
let mut subscriber = client.subscribe(SUBJECT_NAME.to_string()).await?;
|
|
|
|
info!(
|
|
"Consumer started, waiting for messages on subject '{}'",
|
|
SUBJECT_NAME
|
|
);
|
|
|
|
while let Some(message) = subscriber.next().await {
|
|
info!("Received NATS message.");
|
|
let state_clone = state.clone();
|
|
let publisher_clone = client.clone();
|
|
|
|
tokio::spawn(async move {
|
|
match serde_json::from_slice::<FetchCompanyDataCommand>(&message.payload) {
|
|
Ok(command) => {
|
|
info!("Deserialized command for symbol: {}", command.symbol);
|
|
if let Err(e) =
|
|
crate::worker::handle_fetch_command(state_clone, command, publisher_clone)
|
|
.await
|
|
{
|
|
error!("Error handling fetch command: {:?}", e);
|
|
}
|
|
}
|
|
Err(e) => {
|
|
error!("Failed to deserialize message: {}", e);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn subscribe_to_data_commands(app_state: Arc<AppState>, nats_client: async_nats::Client) -> Result<()> {
|
|
let mut subscriber = nats_client.subscribe("data_fetch_commands".to_string()).await?;
|
|
|
|
while let Some(message) = subscriber.next().await {
|
|
let command: FetchCompanyDataCommand = match serde_json::from_slice(&message.payload) {
|
|
Ok(c) => c,
|
|
Err(e) => {
|
|
error!("Failed to deserialize message: {}", e);
|
|
continue;
|
|
}
|
|
};
|
|
let task_id = command.request_id;
|
|
|
|
if command.market.to_uppercase() == "CN" {
|
|
info!(
|
|
"Skipping command for symbol '{}' as its market ('{}') is 'CN'.",
|
|
command.symbol, command.market
|
|
);
|
|
continue;
|
|
}
|
|
|
|
app_state.tasks.insert(task_id, common_contracts::observability::TaskProgress {
|
|
request_id: task_id,
|
|
task_name: format!("finnhub:{}", command.symbol),
|
|
status: "Received".to_string(),
|
|
progress_percent: 0,
|
|
details: "Command received".to_string(),
|
|
started_at: chrono::Utc::now(),
|
|
});
|
|
|
|
// Spawn the workflow in a separate task
|
|
let workflow_state = app_state.clone();
|
|
let publisher_clone = nats_client.clone();
|
|
tokio::spawn(async move {
|
|
let state_owned = (*workflow_state).clone();
|
|
let result = crate::worker::handle_fetch_command(state_owned, command, publisher_clone).await;
|
|
if let Err(e) = result {
|
|
error!(
|
|
"Error executing Finnhub workflow for task {}: {:?}",
|
|
task_id, e
|
|
);
|
|
}
|
|
});
|
|
}
|
|
Ok(())
|
|
}
|