use crate::error::Result; use crate::state::AppState; use axum::{ extract::{Path, State}, http::StatusCode, response::{IntoResponse, Json}, routing::{get, post}, Router, }; use common_contracts::messages::FetchCompanyDataCommand; use common_contracts::observability::{HealthStatus, ServiceStatus, TaskProgress}; use futures_util::future::join_all; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use tracing::{info, warn}; use uuid::Uuid; const DATA_FETCH_QUEUE: &str = "data_fetch_commands"; // --- Request/Response Structs --- #[derive(Deserialize)] pub struct DataRequest { pub symbol: String, pub market: String, } #[derive(Serialize)] pub struct RequestAcceptedResponse { pub request_id: Uuid, } // --- Router Definition --- pub fn create_router(app_state: AppState) -> Router { Router::new() .route("/health", get(health_check)) .route("/tasks", get(get_current_tasks)) // This is the old, stateless one .route("/v1/data-requests", post(trigger_data_fetch)) .route("/v1/companies/:symbol/profile", get(get_company_profile)) .route("/v1/tasks/:request_id", get(get_task_progress)) .with_state(app_state) } // --- Health & Stateless Tasks --- async fn health_check(State(state): State) -> Json { let mut details = HashMap::new(); // 提供确定性且无副作用的健康详情,避免访问不存在的状态字段 details.insert("message_bus".to_string(), "nats".to_string()); details.insert("nats_addr".to_string(), state.config.nats_addr.clone()); let status = HealthStatus { module_id: "api-gateway".to_string(), status: ServiceStatus::Ok, version: env!("CARGO_PKG_VERSION").to_string(), details, }; Json(status) } async fn get_current_tasks() -> Json> { Json(vec![]) } // --- API Handlers --- /// [POST /v1/data-requests] /// Triggers the data fetching process by publishing a command to the message bus. async fn trigger_data_fetch( State(state): State, Json(payload): Json, ) -> Result { let request_id = Uuid::new_v4(); let command = FetchCompanyDataCommand { request_id, symbol: payload.symbol, market: payload.market, }; info!(request_id = %request_id, "Publishing data fetch command"); state .nats_client .publish( DATA_FETCH_QUEUE.to_string(), serde_json::to_vec(&command).unwrap().into(), ) .await?; Ok(( StatusCode::ACCEPTED, Json(RequestAcceptedResponse { request_id }), )) } /// [GET /v1/companies/:symbol/profile] /// Queries the persisted company profile from the data-persistence-service. async fn get_company_profile( State(state): State, Path(symbol): Path, ) -> Result { let profile = state.persistence_client.get_company_profile(&symbol).await?; Ok(Json(profile)) } /// [GET /v1/tasks/:request_id] /// Aggregates task progress from all downstream provider services. async fn get_task_progress( State(state): State, Path(request_id): Path, ) -> Result { let client = reqwest::Client::new(); let fetches = state .config .provider_services .iter() .map(|service_url| { let client = client.clone(); let url = format!("{}/tasks", service_url); async move { match client.get(&url).send().await { Ok(resp) => match resp.json::>().await { Ok(tasks) => Some(tasks), Err(e) => { warn!("Failed to decode tasks from {}: {}", url, e); None } }, Err(e) => { warn!("Failed to fetch tasks from {}: {}", url, e); None } } } }); let results = join_all(fetches).await; let mut merged: Vec = Vec::new(); for maybe_tasks in results { if let Some(tasks) = maybe_tasks { merged.extend(tasks); } } if let Some(task) = merged.into_iter().find(|t| t.request_id == request_id) { Ok((StatusCode::OK, Json(task)).into_response()) } else { Ok((StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Task not found"}))).into_response()) } }