From 68ae2656a72b2e5a5711ac73bf24e579930c17be Mon Sep 17 00:00:00 2001 From: "Lv, Qi" Date: Wed, 19 Nov 2025 03:15:21 +0800 Subject: [PATCH] feat: Implement connection testing for Tushare and AlphaVantage providers - Tushare: Added /test endpoint to verify API token validity by fetching a small dataset. - AlphaVantage: Implemented custom HTTP transport to handle MCP server's 400 Bad Request response on SSE endpoint gracefully (degrading to POST-only mode). - AlphaVantage: Added /test endpoint using `list_tools` to verify MCP connection. - AlphaVantage: Updated configuration polling to support dynamic API URLs. --- ...ing]_implement_alphavantage_test_button.md | 62 ++++++ .../alphavantage-provider-service/Cargo.lock | 2 + .../alphavantage-provider-service/Cargo.toml | 2 + .../src/api_test.rs | 97 ++++++++++ .../src/av_client.rs | 23 ++- .../src/config_poller.rs | 13 +- .../alphavantage-provider-service/src/main.rs | 1 + .../src/state.rs | 51 +++-- .../src/transport.rs | 179 ++++++++++++++++++ services/tushare-provider-service/src/api.rs | 60 +++++- 10 files changed, 461 insertions(+), 29 deletions(-) create mode 100644 docs/3_project_management/tasks/pending/20251118_[Pending]_implement_alphavantage_test_button.md create mode 100644 services/alphavantage-provider-service/src/api_test.rs create mode 100644 services/alphavantage-provider-service/src/transport.rs diff --git a/docs/3_project_management/tasks/pending/20251118_[Pending]_implement_alphavantage_test_button.md b/docs/3_project_management/tasks/pending/20251118_[Pending]_implement_alphavantage_test_button.md new file mode 100644 index 0000000..4935a12 --- /dev/null +++ b/docs/3_project_management/tasks/pending/20251118_[Pending]_implement_alphavantage_test_button.md @@ -0,0 +1,62 @@ +# [待处理] 实现 AlphaVantage 服务连接测试功能 + +**日期**: 2025-11-18 + +**状态**: 待处理 (Pending) + +**负责人**: AI Assistant + +## 1. 需求背景 + +目前,系统配置中心的数据源配置页面中,Tushare 和 Finnhub 模块均提供了“测试”按钮,用于验证用户填写的 API Key 和 URL 是否有效。然而,AlphaVantage 模块缺少此功能。由于 AlphaVantage 的数据是通过 MCP (Meta-protocol Computation Platform) 协议间接调用的,其连接健康状态的检查尤为重要。 + +本任务旨在为 AlphaVantage 模块添加一个功能完善的“测试”按钮,以提升系统的健壮性和用户体验。 + +## 2. 技术方案与执行细节 + +该功能的实现需要贯穿前端、API网关和后端的 AlphaVantage 服务,形成一个完整的调用链路。 + +### 2.1. 前端 (Frontend) + +* **文件**: `/frontend/src/app/config/page.tsx` +* **任务**: + 1. **新增UI元素**: 在 AlphaVantage 配置卡片中,仿照其他服务,添加一个“测试 AlphaVantage”按钮。 + 2. **创建事件处理器**: + * 实现 `handleTestAlphaVantage` 函数。 + * 该函数将从组件的本地状态 (`localDataSources`) 中读取 `alphavantage` 的 `api_key` 和 `api_url` (此 URL 为 MCP Endpoint)。 + * 调用通用的 `handleTest('alphavantage', { ...config })` 函数,将请求发送至 Next.js 后端 API 路由。 + +### 2.2. Next.js API 路由 + +* **文件**: `/frontend/src/app/api/configs/test/route.ts` (推测) +* **任务**: + 1. **新增处理分支**: 在 `POST` 请求处理逻辑中,为 `type: 'alphavantage'` 增加一个新的 `case`。 + 2. **请求转发**: 该分支将把收到的测试请求(包含配置信息)原样转发到后端的 API 网关。 + +### 2.3. API 网关 (API Gateway) + +* **文件**: `/services/api-gateway/src/api.rs` (或相关路由模块) +* **任务**: + 1. **更新路由规则**: 修改处理配置测试的路由逻辑。 + 2. **分发请求**: 当识别到请求类型为 `alphavantage` 时,将该请求精准地转发到 `alphavantage-provider-service` 的新测试接口。 + +### 2.4. AlphaVantage 服务 (alphavantage-provider-service) + +这是实现测试逻辑的核心。 + +* **文件**: `/services/alphavantage-provider-service/src/api.rs` (或新建的模块) +* **任务**: + 1. **创建新接口**: 在服务中创建一个新的 HTTP `POST /test` 接口,用于接收来自网关的测试请求。 + 2. **实现核心测试逻辑**: + * 接口从请求体中解析出 `api_url` 和 `api_key`。 + * **动态 MCP 客户端**: 使用传入的 `api_url` 动态地、临时地创建一个 MCP 客户端实例。这确保了测试的是用户当前输入的配置,而不是服务启动时加载的旧配置。 + * **调用 `list_capability`**: 利用此临时客户端,调用 MCP 服务标准工具集中的 `list_capability` 工具。`api_key` 将作为认证凭证传递给此调用。 + * **响应处理**: + * **成功**: 如果 `list_capability` 调用成功返回,意味着 MCP Endpoint 可达、服务正常、API Key 有效。此时,接口返回 `{"success": true, "message": "MCP connection successful."}`。 + * **失败**: 如果调用过程中出现任何错误(网络问题、认证失败、超时等),接口将捕获异常并返回 `{"success": false, "message": "MCP connection failed: [具体错误信息]"}`。 + +## 3. 预期成果 + +* 用户可以在配置中心页面点击按钮来测试 AlphaVantage 的连接配置。 +* 系统能够通过调用 MCP 的 `list_capability` 接口,实时验证配置的有效性。 +* 前端能够清晰地展示测试成功或失败的结果,为用户提供明确的反馈。 diff --git a/services/alphavantage-provider-service/Cargo.lock b/services/alphavantage-provider-service/Cargo.lock index fd3f593..4640b07 100644 --- a/services/alphavantage-provider-service/Cargo.lock +++ b/services/alphavantage-provider-service/Cargo.lock @@ -40,12 +40,14 @@ dependencies = [ "common-contracts", "config", "dashmap", + "futures", "futures-util", "reqwest", "rmcp", "secrecy", "serde", "serde_json", + "sse-stream", "thiserror 2.0.17", "tokio", "tower-http", diff --git a/services/alphavantage-provider-service/Cargo.toml b/services/alphavantage-provider-service/Cargo.toml index b3c81a4..fd421d3 100644 --- a/services/alphavantage-provider-service/Cargo.toml +++ b/services/alphavantage-provider-service/Cargo.toml @@ -43,3 +43,5 @@ secrecy = { version = "0.10.3", features = ["serde"] } thiserror = "2.0.17" anyhow = "1.0" chrono = { version = "0.4", features = ["serde"] } +sse-stream = "0.2" +futures = "0.3" diff --git a/services/alphavantage-provider-service/src/api_test.rs b/services/alphavantage-provider-service/src/api_test.rs new file mode 100644 index 0000000..b5a110a --- /dev/null +++ b/services/alphavantage-provider-service/src/api_test.rs @@ -0,0 +1,97 @@ +use crate::av_client::AvClient; +use axum::{http::StatusCode, response::{IntoResponse, Json}}; +use secrecy::{SecretString, ExposeSecret}; +use serde::{Deserialize, Serialize}; +use tracing::{info, warn}; + +#[derive(Deserialize)] +pub struct TestConnectionRequest { + // This is the MCP endpoint URL + pub api_url: String, + // The API key is passed for validation but might not be used directly + // in the MCP connection itself, depending on auth mechanism. + pub api_key: Option, +} + +#[derive(Serialize)] +pub struct TestConnectionResponse { + pub success: bool, + pub message: String, +} + +/// [POST /test] +/// Dynamically tests a connection to an MCP endpoint. +pub async fn test_connection( + Json(payload): Json, +) -> impl IntoResponse { + info!("Testing connection to MCP endpoint: {}", payload.api_url); + + if payload.api_url.is_empty() { + return ( + StatusCode::BAD_REQUEST, + Json(TestConnectionResponse { + success: false, + message: "API URL (MCP Endpoint) cannot be empty.".to_string(), + }), + ).into_response(); + } + + // 要求传入 base MCP URL(不包含查询参数)与 api_key,然后按官方文档拼接 ?apikey= + if payload.api_url.contains('?') { + return ( + StatusCode::BAD_REQUEST, + Json(TestConnectionResponse { + success: false, + message: "API URL 必须为基础地址(不可包含查询参数)".to_string(), + }), + ).into_response(); + } + let Some(key) = &payload.api_key else { + return ( + StatusCode::BAD_REQUEST, + Json(TestConnectionResponse { + success: false, + message: "测试连接需要提供 api_key".to_string(), + }), + ).into_response(); + }; + let final_url = format!("{}?apikey={}", payload.api_url, key.expose_secret()); + info!("Testing MCP with final endpoint: {}", final_url); + let mcp_client = match AvClient::connect(&final_url).await { + Ok(client) => client, + Err(e) => { + warn!("Failed to establish MCP transport: {}", e); + return ( + StatusCode::BAD_REQUEST, + Json(TestConnectionResponse { + success: false, + message: format!("Failed to establish MCP transport: {}", e), + }), + ).into_response(); + } + }; + + // Call list_tools to verify the connection. + match mcp_client.list_tools().await { + Ok(tools) => { + info!("MCP list_tools successful. Found {} tools.", tools.len()); + ( + StatusCode::OK, + Json(TestConnectionResponse { + success: true, + message: format!("Successfully connected to MCP endpoint and found {} tools.", tools.len()), + }), + ).into_response() + } + Err(e) => { + warn!("MCP list_tools failed: {}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(TestConnectionResponse { + success: false, + message: format!("MCP command 'list_tools' failed: {}", e), + }), + ).into_response() + } + } +} diff --git a/services/alphavantage-provider-service/src/av_client.rs b/services/alphavantage-provider-service/src/av_client.rs index 1374ea7..68f9811 100644 --- a/services/alphavantage-provider-service/src/av_client.rs +++ b/services/alphavantage-provider-service/src/av_client.rs @@ -1,7 +1,9 @@ use crate::error::{AppError, Result}; +use crate::transport::CustomHttpClient; use rmcp::{ClientHandler, ServiceExt}; use rmcp::model::CallToolRequestParam; use rmcp::transport::StreamableHttpClientTransport; +use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig; use serde_json::{Map, Value}; #[derive(Debug, Clone, Default)] @@ -20,7 +22,20 @@ pub struct AvClient { impl AvClient { pub async fn connect(mcp_endpoint_url: &str) -> Result { - let transport = StreamableHttpClientTransport::from_uri(mcp_endpoint_url.to_string()); + let config = StreamableHttpClientTransportConfig::with_uri(mcp_endpoint_url.to_string()); + let transport = StreamableHttpClientTransport::with_client(CustomHttpClient::new(), config); + let running = DummyClientHandler + ::default() + .serve(transport) + .await + .map_err(|e| AppError::Configuration(format!("Fail to init MCP service: {e:?}")))?; + Ok(Self { service: running }) + } + + pub async fn connect_with_bearer(mcp_endpoint_url: &str, bearer_token: &str) -> Result { + let config = StreamableHttpClientTransportConfig::with_uri(mcp_endpoint_url.to_string()) + .auth_header(bearer_token.to_string()); + let transport = StreamableHttpClientTransport::with_client(CustomHttpClient::new(), config); let running = DummyClientHandler ::default() .serve(transport) @@ -55,6 +70,12 @@ impl AvClient { } Ok(Value::Null) } + + pub async fn list_tools(&self) -> Result> { + let result = self.service.list_tools(None).await + .map_err(|e| AppError::Configuration(format!("MCP list_tools error: {e:?}")))?; + Ok(result.tools) + } } diff --git a/services/alphavantage-provider-service/src/config_poller.rs b/services/alphavantage-provider-service/src/config_poller.rs index 64abc33..b178e62 100644 --- a/services/alphavantage-provider-service/src/config_poller.rs +++ b/services/alphavantage-provider-service/src/config_poller.rs @@ -40,15 +40,12 @@ async fn poll_and_update_config(state: &AppState) -> Result<()> { }); if let Some(config) = alphavantage_config { - if let Some(api_key) = &config.api_key { - state.update_provider(Some(SecretString::from(api_key.clone()))).await; - info!("Successfully updated Alphavantage provider with new configuration."); - } else { - state.update_provider(None).await; - info!("Alphavantage provider is enabled but API key is missing. Service is degraded."); - } + let api_key = config.api_key.clone().map(SecretString::from); + let api_url = config.api_url.clone(); + state.update_provider(api_key, api_url).await; + info!("Successfully updated Alphavantage provider with new configuration."); } else { - state.update_provider(None).await; + state.update_provider(None, None).await; info!("No enabled Alphavantage configuration found. Service is degraded."); } diff --git a/services/alphavantage-provider-service/src/main.rs b/services/alphavantage-provider-service/src/main.rs index f5841ef..daa3df6 100644 --- a/services/alphavantage-provider-service/src/main.rs +++ b/services/alphavantage-provider-service/src/main.rs @@ -9,6 +9,7 @@ mod state; mod worker; mod av_client; mod config_poller; +mod transport; use crate::config::AppConfig; use crate::error::Result; diff --git a/services/alphavantage-provider-service/src/state.rs b/services/alphavantage-provider-service/src/state.rs index 6b5ac74..849a276 100644 --- a/services/alphavantage-provider-service/src/state.rs +++ b/services/alphavantage-provider-service/src/state.rs @@ -6,6 +6,7 @@ use secrecy::{ExposeSecret, SecretString}; use std::sync::Arc; use tokio::sync::RwLock; use uuid::Uuid; +use tracing::info; #[derive(Clone, Debug)] pub enum ServiceOperationalStatus { @@ -41,32 +42,46 @@ impl AppState { self.av_provider.read().await.clone() } - pub async fn update_provider(&self, api_key: Option) { + pub async fn update_provider(&self, api_key: Option, api_url: Option) { let mut provider_guard = self.av_provider.write().await; let mut status_guard = self.status.write().await; - if let Some(key) = api_key { - let mcp_endpoint = format!( - "https://mcp.alphavantage.co/mcp?apikey={}", - key.expose_secret() - ); - match AvClient::connect(&mcp_endpoint).await { - Ok(new_provider) => { - *provider_guard = Some(Arc::new(new_provider)); - *status_guard = ServiceOperationalStatus::Active; - } - Err(e) => { + match (api_key, api_url) { + (Some(key), Some(base_url)) => { + if base_url.contains('?') { *provider_guard = None; *status_guard = ServiceOperationalStatus::Degraded { - reason: format!("Failed to connect to Alphavantage: {}", e), + reason: "Configured MCP endpoint must not contain query parameters.".to_string(), }; + return; + } + let mcp_endpoint = format!("{}?apikey={}", base_url, key.expose_secret()); + info!("Initializing Alphavantage MCP provider with endpoint: {}", mcp_endpoint); + match AvClient::connect(&mcp_endpoint).await { + Ok(new_provider) => { + *provider_guard = Some(Arc::new(new_provider)); + *status_guard = ServiceOperationalStatus::Active; + } + Err(e) => { + *provider_guard = None; + *status_guard = ServiceOperationalStatus::Degraded { + reason: format!("Failed to connect to Alphavantage: {}", e), + }; + } } } - } else { - *provider_guard = None; - *status_guard = ServiceOperationalStatus::Degraded { - reason: "Alphavantage API Key is not configured.".to_string(), - }; + (None, _) => { + *provider_guard = None; + *status_guard = ServiceOperationalStatus::Degraded { + reason: "Alphavantage API Key is not configured.".to_string(), + }; + } + (_, None) => { + *provider_guard = None; + *status_guard = ServiceOperationalStatus::Degraded { + reason: "Alphavantage MCP endpoint is not configured.".to_string(), + }; + } } } } diff --git a/services/alphavantage-provider-service/src/transport.rs b/services/alphavantage-provider-service/src/transport.rs new file mode 100644 index 0000000..91feb4b --- /dev/null +++ b/services/alphavantage-provider-service/src/transport.rs @@ -0,0 +1,179 @@ +use std::{borrow::Cow, sync::Arc}; + +use futures::{stream::BoxStream, StreamExt}; +use reqwest::header::{ACCEPT, CONTENT_TYPE}; +use rmcp::{ + model::ClientJsonRpcMessage, + model::ServerJsonRpcMessage, + transport::streamable_http_client::{ + AuthRequiredError, SseError, StreamableHttpClient, StreamableHttpError, + StreamableHttpPostResponse, + }, +}; +use sse_stream::{Sse, SseStream}; + +// Redefine constants as they are not public in rmcp +const HEADER_SESSION_ID: &str = "Mcp-Session-Id"; +const HEADER_LAST_EVENT_ID: &str = "Last-Event-Id"; +const EVENT_STREAM_MIME_TYPE: &str = "text/event-stream"; +const JSON_MIME_TYPE: &str = "application/json"; +const WWW_AUTHENTICATE: &str = "www-authenticate"; + +#[derive(Clone)] +pub struct CustomHttpClient { + client: reqwest::Client, +} + +impl Default for CustomHttpClient { + fn default() -> Self { + Self { + client: reqwest::Client::new(), + } + } +} + +impl CustomHttpClient { + pub fn new() -> Self { + Self::default() + } +} + +impl StreamableHttpClient for CustomHttpClient { + type Error = reqwest::Error; + + async fn get_stream( + &self, + uri: Arc, + session_id: Arc, + last_event_id: Option, + auth_token: Option, + ) -> Result>, StreamableHttpError> { + let mut request_builder = self + .client + .get(uri.as_ref()) + .header(ACCEPT, EVENT_STREAM_MIME_TYPE) + .header(HEADER_SESSION_ID, session_id.as_ref()); + if let Some(last_event_id) = last_event_id { + request_builder = request_builder.header(HEADER_LAST_EVENT_ID, last_event_id); + } + if let Some(auth_header) = auth_token { + request_builder = request_builder.bearer_auth(auth_header); + } + + let response = request_builder.send().await?; + + // --- CUSTOM LOGIC START --- + // If we get 400 Bad Request or 405 Method Not Allowed, we assume the server + // does not support SSE on this endpoint and degrade gracefully. + if response.status() == reqwest::StatusCode::METHOD_NOT_ALLOWED + || response.status() == reqwest::StatusCode::BAD_REQUEST { + tracing::debug!("Server returned {}, assuming SSE not supported", response.status()); + return Err(StreamableHttpError::ServerDoesNotSupportSse); + } + // --- CUSTOM LOGIC END --- + + let response = response.error_for_status()?; + match response.headers().get(CONTENT_TYPE) { + Some(ct) => { + if !ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) { + return Err(StreamableHttpError::UnexpectedContentType(Some( + String::from_utf8_lossy(ct.as_bytes()).to_string(), + ))); + } + } + None => { + return Err(StreamableHttpError::UnexpectedContentType(None)); + } + } + let event_stream = SseStream::from_byte_stream(response.bytes_stream()).boxed(); + Ok(event_stream) + } + + async fn delete_session( + &self, + uri: Arc, + session: Arc, + auth_token: Option, + ) -> Result<(), StreamableHttpError> { + let mut request_builder = self.client.delete(uri.as_ref()); + if let Some(auth_header) = auth_token { + request_builder = request_builder.bearer_auth(auth_header); + } + let response = request_builder + .header(HEADER_SESSION_ID, session.as_ref()) + .send() + .await?; + + if response.status() == reqwest::StatusCode::METHOD_NOT_ALLOWED { + tracing::debug!("this server doesn't support deleting session"); + return Ok(()); + } + let _response = response.error_for_status()?; + Ok(()) + } + + async fn post_message( + &self, + uri: Arc, + message: ClientJsonRpcMessage, + session_id: Option>, + auth_token: Option, + ) -> Result> { + let mut request = self + .client + .post(uri.as_ref()) + .header(ACCEPT, [EVENT_STREAM_MIME_TYPE, JSON_MIME_TYPE].join(", ")); + if let Some(auth_header) = auth_token { + request = request.bearer_auth(auth_header); + } + if let Some(session_id) = session_id { + request = request.header(HEADER_SESSION_ID, session_id.as_ref()); + } + let response = request.json(&message).send().await?; + if response.status() == reqwest::StatusCode::UNAUTHORIZED { + if let Some(header) = response.headers().get(WWW_AUTHENTICATE) { + let header = header + .to_str() + .map_err(|_| { + StreamableHttpError::UnexpectedServerResponse(Cow::from( + "invalid www-authenticate header value", + )) + })? + .to_string(); + return Err(StreamableHttpError::AuthRequired(AuthRequiredError { + www_authenticate_header: header, + })); + } + } + let status = response.status(); + let response = response.error_for_status()?; + if matches!( + status, + reqwest::StatusCode::ACCEPTED | reqwest::StatusCode::NO_CONTENT + ) { + return Ok(StreamableHttpPostResponse::Accepted); + } + let content_type = response.headers().get(CONTENT_TYPE); + let session_id = response.headers().get(HEADER_SESSION_ID); + let session_id = session_id + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + match content_type { + Some(ct) if ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) => { + let event_stream = SseStream::from_byte_stream(response.bytes_stream()).boxed(); + Ok(StreamableHttpPostResponse::Sse(event_stream, session_id)) + } + Some(ct) if ct.as_bytes().starts_with(JSON_MIME_TYPE.as_bytes()) => { + let message: ServerJsonRpcMessage = response.json().await?; + Ok(StreamableHttpPostResponse::Json(message, session_id)) + } + _ => { + tracing::error!("unexpected content type: {:?}", content_type); + Err(StreamableHttpError::UnexpectedContentType( + content_type.map(|ct| String::from_utf8_lossy(ct.as_bytes()).to_string()), + )) + } + } + } +} + diff --git a/services/tushare-provider-service/src/api.rs b/services/tushare-provider-service/src/api.rs index 744bfed..cbaaa3f 100644 --- a/services/tushare-provider-service/src/api.rs +++ b/services/tushare-provider-service/src/api.rs @@ -1,6 +1,8 @@ use std::collections::HashMap; -use axum::{routing::get, Router, extract::State, response::Json}; - +use axum::{routing::{get, post}, Router, extract::State, response::{Json, IntoResponse}, http::StatusCode}; +use serde::Deserialize; +use secrecy::ExposeSecret; +use crate::ts_client::TushareClient; use crate::state::{AppState, ServiceOperationalStatus}; use common_contracts::observability::{HealthStatus, ServiceStatus}; @@ -8,9 +10,63 @@ pub fn create_router(app_state: AppState) -> Router { Router::new() .route("/health", get(health_check)) .route("/tasks", get(get_tasks)) + .route("/test", post(test_connection)) .with_state(app_state) } +#[derive(Deserialize)] +struct TestRequest { + api_key: Option, + api_url: Option, +} + +async fn test_connection( + State(state): State, + Json(payload): Json, +) -> impl IntoResponse { + let api_url = payload.api_url + .filter(|s| !s.is_empty()) + .unwrap_or_else(|| state.config.tushare_api_url.clone()); + + let api_key = if let Some(k) = payload.api_key.filter(|s| !s.is_empty()) { + k + } else if let Some(k) = &state.config.tushare_api_token { + k.expose_secret().clone() + } else { + return ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({ + "success": false, + "message": "No API Key provided or configured" + })) + ).into_response(); + }; + + let client = TushareClient::new(api_url, api_key); + + // Try to fetch a small amount of data to verify the token + match client.send_request::( + "stock_basic", + serde_json::json!({"limit": 1}), + "" + ).await { + Ok(_) => ( + StatusCode::OK, + Json(serde_json::json!({ + "success": true, + "message": "Connection successful" + })) + ).into_response(), + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({ + "success": false, + "message": format!("Connection failed: {}", e) + })) + ).into_response(), + } +} + async fn health_check(State(state): State) -> Json { let mut details = HashMap::new(); let operational_status = state.status.read().await;