feat: Implement connection testing for Tushare and AlphaVantage providers

- Tushare: Added /test endpoint to verify API token validity by fetching a small dataset.
- AlphaVantage: Implemented custom HTTP transport to handle MCP server's 400 Bad Request response on SSE endpoint gracefully (degrading to POST-only mode).
- AlphaVantage: Added /test endpoint using `list_tools` to verify MCP connection.
- AlphaVantage: Updated configuration polling to support dynamic API URLs.
This commit is contained in:
Lv, Qi 2025-11-19 03:15:21 +08:00
parent 733bf89af5
commit 68ae2656a7
10 changed files with 461 additions and 29 deletions

View File

@ -0,0 +1,62 @@
# [待处理] 实现 AlphaVantage 服务连接测试功能
**日期**: 2025-11-18
**状态**: 待处理 (Pending)
**负责人**: AI Assistant
## 1. 需求背景
目前系统配置中心的数据源配置页面中Tushare 和 Finnhub 模块均提供了“测试”按钮,用于验证用户填写的 API Key 和 URL 是否有效。然而AlphaVantage 模块缺少此功能。由于 AlphaVantage 的数据是通过 MCP (Meta-protocol Computation Platform) 协议间接调用的,其连接健康状态的检查尤为重要。
本任务旨在为 AlphaVantage 模块添加一个功能完善的“测试”按钮,以提升系统的健壮性和用户体验。
## 2. 技术方案与执行细节
该功能的实现需要贯穿前端、API网关和后端的 AlphaVantage 服务,形成一个完整的调用链路。
### 2.1. 前端 (Frontend)
* **文件**: `/frontend/src/app/config/page.tsx`
* **任务**:
1. **新增UI元素**: 在 AlphaVantage 配置卡片中,仿照其他服务,添加一个“测试 AlphaVantage”按钮。
2. **创建事件处理器**:
* 实现 `handleTestAlphaVantage` 函数。
* 该函数将从组件的本地状态 (`localDataSources`) 中读取 `alphavantage``api_key``api_url` (此 URL 为 MCP Endpoint)。
* 调用通用的 `handleTest('alphavantage', { ...config })` 函数,将请求发送至 Next.js 后端 API 路由。
### 2.2. Next.js API 路由
* **文件**: `/frontend/src/app/api/configs/test/route.ts` (推测)
* **任务**:
1. **新增处理分支**: 在 `POST` 请求处理逻辑中,为 `type: 'alphavantage'` 增加一个新的 `case`
2. **请求转发**: 该分支将把收到的测试请求(包含配置信息)原样转发到后端的 API 网关。
### 2.3. API 网关 (API Gateway)
* **文件**: `/services/api-gateway/src/api.rs` (或相关路由模块)
* **任务**:
1. **更新路由规则**: 修改处理配置测试的路由逻辑。
2. **分发请求**: 当识别到请求类型为 `alphavantage` 时,将该请求精准地转发到 `alphavantage-provider-service` 的新测试接口。
### 2.4. AlphaVantage 服务 (alphavantage-provider-service)
这是实现测试逻辑的核心。
* **文件**: `/services/alphavantage-provider-service/src/api.rs` (或新建的模块)
* **任务**:
1. **创建新接口**: 在服务中创建一个新的 HTTP `POST /test` 接口,用于接收来自网关的测试请求。
2. **实现核心测试逻辑**:
* 接口从请求体中解析出 `api_url``api_key`
* **动态 MCP 客户端**: 使用传入的 `api_url` 动态地、临时地创建一个 MCP 客户端实例。这确保了测试的是用户当前输入的配置,而不是服务启动时加载的旧配置。
* **调用 `list_capability`**: 利用此临时客户端,调用 MCP 服务标准工具集中的 `list_capability` 工具。`api_key` 将作为认证凭证传递给此调用。
* **响应处理**:
* **成功**: 如果 `list_capability` 调用成功返回,意味着 MCP Endpoint 可达、服务正常、API Key 有效。此时,接口返回 `{"success": true, "message": "MCP connection successful."}`
* **失败**: 如果调用过程中出现任何错误(网络问题、认证失败、超时等),接口将捕获异常并返回 `{"success": false, "message": "MCP connection failed: [具体错误信息]"}`
## 3. 预期成果
* 用户可以在配置中心页面点击按钮来测试 AlphaVantage 的连接配置。
* 系统能够通过调用 MCP 的 `list_capability` 接口,实时验证配置的有效性。
* 前端能够清晰地展示测试成功或失败的结果,为用户提供明确的反馈。

View File

@ -40,12 +40,14 @@ dependencies = [
"common-contracts",
"config",
"dashmap",
"futures",
"futures-util",
"reqwest",
"rmcp",
"secrecy",
"serde",
"serde_json",
"sse-stream",
"thiserror 2.0.17",
"tokio",
"tower-http",

View File

@ -43,3 +43,5 @@ secrecy = { version = "0.10.3", features = ["serde"] }
thiserror = "2.0.17"
anyhow = "1.0"
chrono = { version = "0.4", features = ["serde"] }
sse-stream = "0.2"
futures = "0.3"

View File

@ -0,0 +1,97 @@
use crate::av_client::AvClient;
use axum::{http::StatusCode, response::{IntoResponse, Json}};
use secrecy::{SecretString, ExposeSecret};
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
#[derive(Deserialize)]
pub struct TestConnectionRequest {
// This is the MCP endpoint URL
pub api_url: String,
// The API key is passed for validation but might not be used directly
// in the MCP connection itself, depending on auth mechanism.
pub api_key: Option<SecretString>,
}
#[derive(Serialize)]
pub struct TestConnectionResponse {
pub success: bool,
pub message: String,
}
/// [POST /test]
/// Dynamically tests a connection to an MCP endpoint.
pub async fn test_connection(
Json(payload): Json<TestConnectionRequest>,
) -> impl IntoResponse {
info!("Testing connection to MCP endpoint: {}", payload.api_url);
if payload.api_url.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(TestConnectionResponse {
success: false,
message: "API URL (MCP Endpoint) cannot be empty.".to_string(),
}),
).into_response();
}
// 要求传入 base MCP URL不包含查询参数与 api_key然后按官方文档拼接 ?apikey=
if payload.api_url.contains('?') {
return (
StatusCode::BAD_REQUEST,
Json(TestConnectionResponse {
success: false,
message: "API URL 必须为基础地址(不可包含查询参数)".to_string(),
}),
).into_response();
}
let Some(key) = &payload.api_key else {
return (
StatusCode::BAD_REQUEST,
Json(TestConnectionResponse {
success: false,
message: "测试连接需要提供 api_key".to_string(),
}),
).into_response();
};
let final_url = format!("{}?apikey={}", payload.api_url, key.expose_secret());
info!("Testing MCP with final endpoint: {}", final_url);
let mcp_client = match AvClient::connect(&final_url).await {
Ok(client) => client,
Err(e) => {
warn!("Failed to establish MCP transport: {}", e);
return (
StatusCode::BAD_REQUEST,
Json(TestConnectionResponse {
success: false,
message: format!("Failed to establish MCP transport: {}", e),
}),
).into_response();
}
};
// Call list_tools to verify the connection.
match mcp_client.list_tools().await {
Ok(tools) => {
info!("MCP list_tools successful. Found {} tools.", tools.len());
(
StatusCode::OK,
Json(TestConnectionResponse {
success: true,
message: format!("Successfully connected to MCP endpoint and found {} tools.", tools.len()),
}),
).into_response()
}
Err(e) => {
warn!("MCP list_tools failed: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(TestConnectionResponse {
success: false,
message: format!("MCP command 'list_tools' failed: {}", e),
}),
).into_response()
}
}
}

View File

@ -1,7 +1,9 @@
use crate::error::{AppError, Result};
use crate::transport::CustomHttpClient;
use rmcp::{ClientHandler, ServiceExt};
use rmcp::model::CallToolRequestParam;
use rmcp::transport::StreamableHttpClientTransport;
use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig;
use serde_json::{Map, Value};
#[derive(Debug, Clone, Default)]
@ -20,7 +22,20 @@ pub struct AvClient {
impl AvClient {
pub async fn connect(mcp_endpoint_url: &str) -> Result<Self> {
let transport = StreamableHttpClientTransport::from_uri(mcp_endpoint_url.to_string());
let config = StreamableHttpClientTransportConfig::with_uri(mcp_endpoint_url.to_string());
let transport = StreamableHttpClientTransport::with_client(CustomHttpClient::new(), config);
let running = DummyClientHandler
::default()
.serve(transport)
.await
.map_err(|e| AppError::Configuration(format!("Fail to init MCP service: {e:?}")))?;
Ok(Self { service: running })
}
pub async fn connect_with_bearer(mcp_endpoint_url: &str, bearer_token: &str) -> Result<Self> {
let config = StreamableHttpClientTransportConfig::with_uri(mcp_endpoint_url.to_string())
.auth_header(bearer_token.to_string());
let transport = StreamableHttpClientTransport::with_client(CustomHttpClient::new(), config);
let running = DummyClientHandler
::default()
.serve(transport)
@ -55,6 +70,12 @@ impl AvClient {
}
Ok(Value::Null)
}
pub async fn list_tools(&self) -> Result<Vec<rmcp::model::Tool>> {
let result = self.service.list_tools(None).await
.map_err(|e| AppError::Configuration(format!("MCP list_tools error: {e:?}")))?;
Ok(result.tools)
}
}

View File

@ -40,15 +40,12 @@ async fn poll_and_update_config(state: &AppState) -> Result<()> {
});
if let Some(config) = alphavantage_config {
if let Some(api_key) = &config.api_key {
state.update_provider(Some(SecretString::from(api_key.clone()))).await;
let api_key = config.api_key.clone().map(SecretString::from);
let api_url = config.api_url.clone();
state.update_provider(api_key, api_url).await;
info!("Successfully updated Alphavantage provider with new configuration.");
} else {
state.update_provider(None).await;
info!("Alphavantage provider is enabled but API key is missing. Service is degraded.");
}
} else {
state.update_provider(None).await;
state.update_provider(None, None).await;
info!("No enabled Alphavantage configuration found. Service is degraded.");
}

View File

@ -9,6 +9,7 @@ mod state;
mod worker;
mod av_client;
mod config_poller;
mod transport;
use crate::config::AppConfig;
use crate::error::Result;

View File

@ -6,6 +6,7 @@ use secrecy::{ExposeSecret, SecretString};
use std::sync::Arc;
use tokio::sync::RwLock;
use uuid::Uuid;
use tracing::info;
#[derive(Clone, Debug)]
pub enum ServiceOperationalStatus {
@ -41,15 +42,21 @@ impl AppState {
self.av_provider.read().await.clone()
}
pub async fn update_provider(&self, api_key: Option<SecretString>) {
pub async fn update_provider(&self, api_key: Option<SecretString>, api_url: Option<String>) {
let mut provider_guard = self.av_provider.write().await;
let mut status_guard = self.status.write().await;
if let Some(key) = api_key {
let mcp_endpoint = format!(
"https://mcp.alphavantage.co/mcp?apikey={}",
key.expose_secret()
);
match (api_key, api_url) {
(Some(key), Some(base_url)) => {
if base_url.contains('?') {
*provider_guard = None;
*status_guard = ServiceOperationalStatus::Degraded {
reason: "Configured MCP endpoint must not contain query parameters.".to_string(),
};
return;
}
let mcp_endpoint = format!("{}?apikey={}", base_url, key.expose_secret());
info!("Initializing Alphavantage MCP provider with endpoint: {}", mcp_endpoint);
match AvClient::connect(&mcp_endpoint).await {
Ok(new_provider) => {
*provider_guard = Some(Arc::new(new_provider));
@ -62,12 +69,20 @@ impl AppState {
};
}
}
} else {
}
(None, _) => {
*provider_guard = None;
*status_guard = ServiceOperationalStatus::Degraded {
reason: "Alphavantage API Key is not configured.".to_string(),
};
}
(_, None) => {
*provider_guard = None;
*status_guard = ServiceOperationalStatus::Degraded {
reason: "Alphavantage MCP endpoint is not configured.".to_string(),
};
}
}
}
}

View File

@ -0,0 +1,179 @@
use std::{borrow::Cow, sync::Arc};
use futures::{stream::BoxStream, StreamExt};
use reqwest::header::{ACCEPT, CONTENT_TYPE};
use rmcp::{
model::ClientJsonRpcMessage,
model::ServerJsonRpcMessage,
transport::streamable_http_client::{
AuthRequiredError, SseError, StreamableHttpClient, StreamableHttpError,
StreamableHttpPostResponse,
},
};
use sse_stream::{Sse, SseStream};
// Redefine constants as they are not public in rmcp
const HEADER_SESSION_ID: &str = "Mcp-Session-Id";
const HEADER_LAST_EVENT_ID: &str = "Last-Event-Id";
const EVENT_STREAM_MIME_TYPE: &str = "text/event-stream";
const JSON_MIME_TYPE: &str = "application/json";
const WWW_AUTHENTICATE: &str = "www-authenticate";
#[derive(Clone)]
pub struct CustomHttpClient {
client: reqwest::Client,
}
impl Default for CustomHttpClient {
fn default() -> Self {
Self {
client: reqwest::Client::new(),
}
}
}
impl CustomHttpClient {
pub fn new() -> Self {
Self::default()
}
}
impl StreamableHttpClient for CustomHttpClient {
type Error = reqwest::Error;
async fn get_stream(
&self,
uri: Arc<str>,
session_id: Arc<str>,
last_event_id: Option<String>,
auth_token: Option<String>,
) -> Result<BoxStream<'static, Result<Sse, SseError>>, StreamableHttpError<Self::Error>> {
let mut request_builder = self
.client
.get(uri.as_ref())
.header(ACCEPT, EVENT_STREAM_MIME_TYPE)
.header(HEADER_SESSION_ID, session_id.as_ref());
if let Some(last_event_id) = last_event_id {
request_builder = request_builder.header(HEADER_LAST_EVENT_ID, last_event_id);
}
if let Some(auth_header) = auth_token {
request_builder = request_builder.bearer_auth(auth_header);
}
let response = request_builder.send().await?;
// --- CUSTOM LOGIC START ---
// If we get 400 Bad Request or 405 Method Not Allowed, we assume the server
// does not support SSE on this endpoint and degrade gracefully.
if response.status() == reqwest::StatusCode::METHOD_NOT_ALLOWED
|| response.status() == reqwest::StatusCode::BAD_REQUEST {
tracing::debug!("Server returned {}, assuming SSE not supported", response.status());
return Err(StreamableHttpError::ServerDoesNotSupportSse);
}
// --- CUSTOM LOGIC END ---
let response = response.error_for_status()?;
match response.headers().get(CONTENT_TYPE) {
Some(ct) => {
if !ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) {
return Err(StreamableHttpError::UnexpectedContentType(Some(
String::from_utf8_lossy(ct.as_bytes()).to_string(),
)));
}
}
None => {
return Err(StreamableHttpError::UnexpectedContentType(None));
}
}
let event_stream = SseStream::from_byte_stream(response.bytes_stream()).boxed();
Ok(event_stream)
}
async fn delete_session(
&self,
uri: Arc<str>,
session: Arc<str>,
auth_token: Option<String>,
) -> Result<(), StreamableHttpError<Self::Error>> {
let mut request_builder = self.client.delete(uri.as_ref());
if let Some(auth_header) = auth_token {
request_builder = request_builder.bearer_auth(auth_header);
}
let response = request_builder
.header(HEADER_SESSION_ID, session.as_ref())
.send()
.await?;
if response.status() == reqwest::StatusCode::METHOD_NOT_ALLOWED {
tracing::debug!("this server doesn't support deleting session");
return Ok(());
}
let _response = response.error_for_status()?;
Ok(())
}
async fn post_message(
&self,
uri: Arc<str>,
message: ClientJsonRpcMessage,
session_id: Option<Arc<str>>,
auth_token: Option<String>,
) -> Result<StreamableHttpPostResponse, StreamableHttpError<Self::Error>> {
let mut request = self
.client
.post(uri.as_ref())
.header(ACCEPT, [EVENT_STREAM_MIME_TYPE, JSON_MIME_TYPE].join(", "));
if let Some(auth_header) = auth_token {
request = request.bearer_auth(auth_header);
}
if let Some(session_id) = session_id {
request = request.header(HEADER_SESSION_ID, session_id.as_ref());
}
let response = request.json(&message).send().await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
if let Some(header) = response.headers().get(WWW_AUTHENTICATE) {
let header = header
.to_str()
.map_err(|_| {
StreamableHttpError::UnexpectedServerResponse(Cow::from(
"invalid www-authenticate header value",
))
})?
.to_string();
return Err(StreamableHttpError::AuthRequired(AuthRequiredError {
www_authenticate_header: header,
}));
}
}
let status = response.status();
let response = response.error_for_status()?;
if matches!(
status,
reqwest::StatusCode::ACCEPTED | reqwest::StatusCode::NO_CONTENT
) {
return Ok(StreamableHttpPostResponse::Accepted);
}
let content_type = response.headers().get(CONTENT_TYPE);
let session_id = response.headers().get(HEADER_SESSION_ID);
let session_id = session_id
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
match content_type {
Some(ct) if ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) => {
let event_stream = SseStream::from_byte_stream(response.bytes_stream()).boxed();
Ok(StreamableHttpPostResponse::Sse(event_stream, session_id))
}
Some(ct) if ct.as_bytes().starts_with(JSON_MIME_TYPE.as_bytes()) => {
let message: ServerJsonRpcMessage = response.json().await?;
Ok(StreamableHttpPostResponse::Json(message, session_id))
}
_ => {
tracing::error!("unexpected content type: {:?}", content_type);
Err(StreamableHttpError::UnexpectedContentType(
content_type.map(|ct| String::from_utf8_lossy(ct.as_bytes()).to_string()),
))
}
}
}
}

View File

@ -1,6 +1,8 @@
use std::collections::HashMap;
use axum::{routing::get, Router, extract::State, response::Json};
use axum::{routing::{get, post}, Router, extract::State, response::{Json, IntoResponse}, http::StatusCode};
use serde::Deserialize;
use secrecy::ExposeSecret;
use crate::ts_client::TushareClient;
use crate::state::{AppState, ServiceOperationalStatus};
use common_contracts::observability::{HealthStatus, ServiceStatus};
@ -8,9 +10,63 @@ pub fn create_router(app_state: AppState) -> Router {
Router::new()
.route("/health", get(health_check))
.route("/tasks", get(get_tasks))
.route("/test", post(test_connection))
.with_state(app_state)
}
#[derive(Deserialize)]
struct TestRequest {
api_key: Option<String>,
api_url: Option<String>,
}
async fn test_connection(
State(state): State<AppState>,
Json(payload): Json<TestRequest>,
) -> impl IntoResponse {
let api_url = payload.api_url
.filter(|s| !s.is_empty())
.unwrap_or_else(|| state.config.tushare_api_url.clone());
let api_key = if let Some(k) = payload.api_key.filter(|s| !s.is_empty()) {
k
} else if let Some(k) = &state.config.tushare_api_token {
k.expose_secret().clone()
} else {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"success": false,
"message": "No API Key provided or configured"
}))
).into_response();
};
let client = TushareClient::new(api_url, api_key);
// Try to fetch a small amount of data to verify the token
match client.send_request::<serde_json::Value>(
"stock_basic",
serde_json::json!({"limit": 1}),
""
).await {
Ok(_) => (
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"message": "Connection successful"
}))
).into_response(),
Err(e) => (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"success": false,
"message": format!("Connection failed: {}", e)
}))
).into_response(),
}
}
async fn health_check(State(state): State<AppState>) -> Json<HealthStatus> {
let mut details = HashMap::new();
let operational_status = state.status.read().await;