Compare commits

...

2 Commits

Author SHA1 Message Date
Lv, Qi
68ae2656a7 feat: Implement connection testing for Tushare and AlphaVantage providers
- Tushare: Added /test endpoint to verify API token validity by fetching a small dataset.
- AlphaVantage: Implemented custom HTTP transport to handle MCP server's 400 Bad Request response on SSE endpoint gracefully (degrading to POST-only mode).
- AlphaVantage: Added /test endpoint using `list_tools` to verify MCP connection.
- AlphaVantage: Updated configuration polling to support dynamic API URLs.
2025-11-19 03:15:21 +08:00
Lv, Qi
733bf89af5 frontend(config): 改善配置中心测试错误展示布局并美化错误详情
- useApi.testConfig:摘要优先 error 字段;缺省时将原始响应体放入 details,确保 Pretty Printer 可用
- Config 页面:失败时仅显示“测试失败”+ 折叠详情;新增 brace-aware pretty printer,支持 MCP/Rust/reqwest 风格错误的缩进分行显示
2025-11-18 20:12:17 +08:00
15 changed files with 799 additions and 43 deletions

View File

@ -0,0 +1,62 @@
# [待处理] 实现 AlphaVantage 服务连接测试功能
**日期**: 2025-11-18
**状态**: 待处理 (Pending)
**负责人**: AI Assistant
## 1. 需求背景
目前系统配置中心的数据源配置页面中Tushare 和 Finnhub 模块均提供了“测试”按钮,用于验证用户填写的 API Key 和 URL 是否有效。然而AlphaVantage 模块缺少此功能。由于 AlphaVantage 的数据是通过 MCP (Meta-protocol Computation Platform) 协议间接调用的,其连接健康状态的检查尤为重要。
本任务旨在为 AlphaVantage 模块添加一个功能完善的“测试”按钮,以提升系统的健壮性和用户体验。
## 2. 技术方案与执行细节
该功能的实现需要贯穿前端、API网关和后端的 AlphaVantage 服务,形成一个完整的调用链路。
### 2.1. 前端 (Frontend)
* **文件**: `/frontend/src/app/config/page.tsx`
* **任务**:
1. **新增UI元素**: 在 AlphaVantage 配置卡片中,仿照其他服务,添加一个“测试 AlphaVantage”按钮。
2. **创建事件处理器**:
* 实现 `handleTestAlphaVantage` 函数。
* 该函数将从组件的本地状态 (`localDataSources`) 中读取 `alphavantage``api_key``api_url` (此 URL 为 MCP Endpoint)。
* 调用通用的 `handleTest('alphavantage', { ...config })` 函数,将请求发送至 Next.js 后端 API 路由。
### 2.2. Next.js API 路由
* **文件**: `/frontend/src/app/api/configs/test/route.ts` (推测)
* **任务**:
1. **新增处理分支**: 在 `POST` 请求处理逻辑中,为 `type: 'alphavantage'` 增加一个新的 `case`
2. **请求转发**: 该分支将把收到的测试请求(包含配置信息)原样转发到后端的 API 网关。
### 2.3. API 网关 (API Gateway)
* **文件**: `/services/api-gateway/src/api.rs` (或相关路由模块)
* **任务**:
1. **更新路由规则**: 修改处理配置测试的路由逻辑。
2. **分发请求**: 当识别到请求类型为 `alphavantage` 时,将该请求精准地转发到 `alphavantage-provider-service` 的新测试接口。
### 2.4. AlphaVantage 服务 (alphavantage-provider-service)
这是实现测试逻辑的核心。
* **文件**: `/services/alphavantage-provider-service/src/api.rs` (或新建的模块)
* **任务**:
1. **创建新接口**: 在服务中创建一个新的 HTTP `POST /test` 接口,用于接收来自网关的测试请求。
2. **实现核心测试逻辑**:
* 接口从请求体中解析出 `api_url``api_key`
* **动态 MCP 客户端**: 使用传入的 `api_url` 动态地、临时地创建一个 MCP 客户端实例。这确保了测试的是用户当前输入的配置,而不是服务启动时加载的旧配置。
* **调用 `list_capability`**: 利用此临时客户端,调用 MCP 服务标准工具集中的 `list_capability` 工具。`api_key` 将作为认证凭证传递给此调用。
* **响应处理**:
* **成功**: 如果 `list_capability` 调用成功返回,意味着 MCP Endpoint 可达、服务正常、API Key 有效。此时,接口返回 `{"success": true, "message": "MCP connection successful."}`
* **失败**: 如果调用过程中出现任何错误(网络问题、认证失败、超时等),接口将捕获异常并返回 `{"success": false, "message": "MCP connection failed: [具体错误信息]"}`
## 3. 预期成果
* 用户可以在配置中心页面点击按钮来测试 AlphaVantage 的连接配置。
* 系统能够通过调用 MCP 的 `list_capability` 接口,实时验证配置的有效性。
* 前端能够清晰地展示测试成功或失败的结果,为用户提供明确的反馈。

View File

@ -6,7 +6,40 @@ export async function POST(req: NextRequest) {
if (!BACKEND_BASE) { if (!BACKEND_BASE) {
return new Response('BACKEND_INTERNAL_URL/NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 }); return new Response('BACKEND_INTERNAL_URL/NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 });
} }
// 新后端暂无统一 /config/test先返回未实现
const body = await req.text().catch(() => ''); try {
return Response.json({ success: false, message: 'config/test 未实现', echo: body }, { status: 501 }); const body = await req.json();
const { type, data } = body;
if (!type || !data) {
return new Response('请求体必须包含 type 和 data', { status: 400 });
}
// 将请求转发到 API Gateway
const targetUrl = `${BACKEND_BASE.replace(/\/$/, '')}/configs/test`;
const backendRes = await fetch(targetUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ type, ...data }), // 转发时将 data 字段展开
});
const backendResBody = await backendRes.text();
return new Response(backendResBody, {
status: backendRes.status,
headers: {
'Content-Type': 'application/json',
},
});
} catch (error: any) {
console.error('配置测试代理失败:', error);
return new Response(JSON.stringify({ success: false, message: error.message || '代理请求时发生未知错误' }), {
status: 500,
headers: { 'Content-Type': 'application/json' },
});
}
} }

View File

@ -26,6 +26,143 @@ import type {
} from '@/types'; } from '@/types';
import { useDataSourcesConfig, updateDataSourcesConfig, useAnalysisTemplateSets, updateAnalysisTemplateSets } from '@/hooks/useApi'; import { useDataSourcesConfig, updateDataSourcesConfig, useAnalysisTemplateSets, updateAnalysisTemplateSets } from '@/hooks/useApi';
// ---- Helpers: pretty print nested JSON as YAML-like (braceless, unquoted) ----
const MAX_REPARSE_DEPTH = 2;
function tryParseJson(input: string): unknown {
try {
return JSON.parse(input);
} catch {
return input;
}
}
function parsePossiblyNestedJson(raw: string): unknown {
let current: unknown = raw;
for (let i = 0; i < MAX_REPARSE_DEPTH; i++) {
if (typeof current === 'string') {
const trimmed = current.trim();
if ((trimmed.startsWith('{') && trimmed.endsWith('}')) || (trimmed.startsWith('[') && trimmed.endsWith(']'))) {
current = tryParseJson(trimmed);
continue;
}
}
break;
}
return current;
}
function indentLines(text: string, indent: string): string {
return text.split('\n').map((line) => indent + line).join('\n');
}
function looksLikeStructuredText(s: string): boolean {
const t = s.trim();
// 具有成对的大括号,或常见的 Rust/reqwest 错误标识,判定为可结构化的调试输出
return (t.includes('{') && t.includes('}')) || t.includes('DynamicTransportError') || t.includes('reqwest::Error');
}
function prettyFormatByBraces(input: string): string {
let out = '';
let indentLevel = 0;
const indentUnit = ' ';
let inString = false;
let stringQuote = '';
for (let i = 0; i < input.length; i++) {
const ch = input[i]!;
const prev = i > 0 ? input[i - 1]! : '';
if ((ch === '"' || ch === "'") && prev !== '\\') {
if (!inString) {
inString = true;
stringQuote = ch;
} else if (stringQuote === ch) {
inString = false;
}
out += ch;
continue;
}
if (inString) {
out += ch;
continue;
}
if (ch === '{' || ch === '[' || ch === '(') {
indentLevel += 1;
out += ch + '\n' + indentUnit.repeat(indentLevel);
continue;
}
if (ch === '}' || ch === ']' || ch === ')') {
indentLevel = Math.max(0, indentLevel - 1);
out += '\n' + indentUnit.repeat(indentLevel) + ch;
continue;
}
if (ch === ',') {
out += ch + '\n' + indentUnit.repeat(indentLevel);
continue;
}
if (ch === ':') {
out += ch + ' ';
continue;
}
out += ch;
}
return out;
}
function toPseudoYaml(value: unknown, indent: string = ''): string {
const nextIndent = indent + ' ';
if (value === null || value === undefined) {
return `${indent}null`;
}
const t = typeof value;
if (t === 'string' || t === 'number' || t === 'boolean') {
const s = String(value);
const shouldPretty = looksLikeStructuredText(s) || s.includes('\n');
if (shouldPretty) {
const pretty = looksLikeStructuredText(s) ? prettyFormatByBraces(s) : s;
return `${indent}|-\n${indentLines(pretty, nextIndent)}`;
}
return `${indent}${s}`;
}
if (Array.isArray(value)) {
if (value.length === 0) return `${indent}[]`;
return value.map((item) => {
const rendered = toPseudoYaml(item, nextIndent);
const lines = rendered.split('\n');
if (lines.length === 1) {
return `${indent}- ${lines[0].trimStart()}`;
}
return `${indent}- ${lines[0].trimStart()}\n${lines.slice(1).map((l) => indent + ' ' + l.trimStart()).join('\n')}`;
}).join('\n');
}
if (typeof value === 'object') {
const obj = value as Record<string, unknown>;
const keys = Object.keys(obj);
if (keys.length === 0) return `${indent}{}`;
return keys.map((k) => {
const rendered = toPseudoYaml(obj[k], nextIndent);
const lines = rendered.split('\n');
if (lines.length === 1) {
return `${indent}${k}: ${lines[0].trimStart()}`;
}
return `${indent}${k}:\n${lines.map((l) => l).join('\n')}`;
}).join('\n');
}
// fallback
return `${indent}${String(value)}`;
}
function formatDetailsToYaml(details: string): string {
const parsed = parsePossiblyNestedJson(details);
if (typeof parsed === 'string') {
// 尝试再次解析(有些场景内层 message 也是 JSON 串)
const again = parsePossiblyNestedJson(parsed);
if (typeof again === 'string') {
return toPseudoYaml(again);
}
return toPseudoYaml(again);
}
return toPseudoYaml(parsed);
}
const defaultUrls: Partial<Record<DataSourceProvider, string>> = {
tushare: 'http://api.tushare.pro',
finnhub: 'https://finnhub.io/api/v1',
alphavantage: 'https://mcp.alphavantage.co/mcp',
};
export default function ConfigPage() { export default function ConfigPage() {
// 从 Zustand store 获取全局状态 // 从 Zustand store 获取全局状态
const { config, loading, error, setConfig } = useConfigStore(); const { config, loading, error, setConfig } = useConfigStore();
@ -47,7 +184,7 @@ export default function ConfigPage() {
// 分析配置保存状态(状态定义在下方统一维护) // 分析配置保存状态(状态定义在下方统一维护)
// 测试结果状态 // 测试结果状态
const [testResults, setTestResults] = useState<Record<string, { success: boolean; message: string } | null>>({}); const [testResults, setTestResults] = useState<Record<string, { success: boolean; summary: string; details?: string } | null>>({});
// 保存状态 // 保存状态
const [saving, setSaving] = useState(false); const [saving, setSaving] = useState(false);
@ -368,8 +505,22 @@ export default function ConfigPage() {
try { try {
if (initialDataSources) { if (initialDataSources) {
await updateDataSourcesConfig(localDataSources); // Create a deep copy to avoid mutating the local state directly
await mutateDataSources(localDataSources, false); const finalDataSources = JSON.parse(JSON.stringify(localDataSources));
for (const key in finalDataSources) {
const providerKey = key as DataSourceProvider;
const source = finalDataSources[providerKey];
// If the URL is empty or null, and there is a default URL, use it
if (source && (source.api_url === null || source.api_url.trim() === '') && defaultUrls[providerKey]) {
source.api_url = defaultUrls[providerKey];
}
}
await updateDataSourcesConfig(finalDataSources);
// After saving, mutate the local SWR cache with the final data
// and also update the component's local state to reflect the change.
await mutateDataSources(finalDataSources, false);
setLocalDataSources(finalDataSources);
} }
setSaveMessage('保存成功!'); setSaveMessage('保存成功!');
} catch (e: any) { } catch (e: any) {
@ -383,11 +534,18 @@ export default function ConfigPage() {
const handleTest = async (type: string, data: any) => { const handleTest = async (type: string, data: any) => {
try { try {
const result = await testConfig(type, data); const result = await testConfig(type, data);
setTestResults(prev => ({ ...prev, [type]: result })); const success = !!result?.success;
const summary = typeof result?.message === 'string' && result.message.trim().length > 0
? result.message
: (success ? '测试成功' : '测试失败');
setTestResults(prev => ({ ...prev, [type]: { success, summary } }));
} catch (e: any) { } catch (e: any) {
setTestResults(prev => ({ // 结构化错误对象:{ summary, details? }
...prev, const summary: string = (e && typeof e === 'object' && 'summary' in e) ? String(e.summary) : (e?.message || '未知错误');
[type]: { success: false, message: e.message } const details: string | undefined = (e && typeof e === 'object' && 'details' in e) ? (e.details ? String(e.details) : undefined) : undefined;
setTestResults(prev => ({
...prev,
[type]: { success: false, summary, details }
})); }));
} }
}; };
@ -402,6 +560,11 @@ export default function ConfigPage() {
handleTest('finnhub', { api_key: cfg?.api_key, api_url: cfg?.api_url, enabled: cfg?.enabled }); handleTest('finnhub', { api_key: cfg?.api_key, api_url: cfg?.api_url, enabled: cfg?.enabled });
}; };
const handleTestAlphaVantage = () => {
const cfg = localDataSources['alphavantage'];
handleTest('alphavantage', { api_key: cfg?.api_key, api_url: cfg?.api_url, enabled: cfg?.enabled });
};
const handleReset = () => { const handleReset = () => {
if (initialDataSources) setLocalDataSources(initialDataSources); if (initialDataSources) setLocalDataSources(initialDataSources);
setTestResults({}); setTestResults({});
@ -1058,7 +1221,8 @@ export default function ConfigPage() {
[providerKey]: { ...item, api_url: v, provider: providerKey }, [providerKey]: { ...item, api_url: v, provider: providerKey },
})); }));
}} }}
placeholder={providerKey === 'tushare' ? 'https://api.tushare.pro' : 'https://...'} placeholder={defaultUrls[providerKey] ?? 'https://...'}
disabled={providerKey === 'yfinance'}
/> />
</div> </div>
</div> </div>
@ -1069,7 +1233,33 @@ export default function ConfigPage() {
{providerKey === 'finnhub' && ( {providerKey === 'finnhub' && (
<Button variant="outline" onClick={handleTestFinnhub}> Finnhub</Button> <Button variant="outline" onClick={handleTestFinnhub}> Finnhub</Button>
)} )}
{providerKey === 'alphavantage' && (
<Button variant="outline" onClick={handleTestAlphaVantage}> AlphaVantage</Button>
)}
</div> </div>
{testResults[providerKey] ? (() => {
const r = testResults[providerKey]!;
if (r.success) {
return (
<div className="text-sm text-green-600">
{r.summary}
</div>
);
}
return (
<div className="text-sm text-red-600">
<div className="font-medium"></div>
{r.details ? (
<details className="mt-2">
<summary className="cursor-pointer text-red-700 underline">YAML</summary>
<pre className="mt-2 p-2 rounded bg-red-50 text-red-700 whitespace-pre-wrap break-words">
{formatDetailsToYaml(r.details)}
</pre>
</details>
) : null}
</div>
);
})() : null}
</div> </div>
); );
})} })}

View File

@ -294,9 +294,41 @@ export async function testConfig(type: string, data: unknown) {
if (!res.ok) { if (!res.ok) {
try { try {
const err = JSON.parse(text); const err = JSON.parse(text);
throw new Error(err?.message || text); // 优先从标准字段中提取错误信息;同时分离 details
let message: string = err?.error || err?.message || '';
let detailsStr = '';
if (err?.details !== undefined) {
if (typeof err.details === 'string') {
detailsStr = err.details;
// details 可能是被 JSON 序列化的字符串,尝试解析一次以便还原内部结构
try {
const parsed = JSON.parse(detailsStr);
if (typeof parsed === 'string') {
detailsStr = parsed;
} else if (parsed && typeof parsed === 'object' && 'message' in parsed) {
detailsStr = String((parsed as any).message);
} else {
detailsStr = JSON.stringify(parsed);
}
} catch {
// 忽略解析失败,保留原始 details 字符串
}
} else {
try {
detailsStr = JSON.stringify(err.details);
} catch {
detailsStr = String(err.details);
}
}
}
const summary = message || `HTTP ${res.status}`;
const detailsOut = (detailsStr && detailsStr.length > 0) ? detailsStr : (text || undefined);
// 抛出结构化错误对象供调用方精确展示details 始终尽量携带原始响应体,便于前端美化)
throw { summary, details: detailsOut };
} catch { } catch {
throw new Error(text || `HTTP ${res.status}`); // 无法解析为 JSON 时,仍然把原始文本作为 details 返回,保证前端可美化
const fallback = text || `HTTP ${res.status}`;
throw { summary: fallback, details: text || undefined };
} }
} }
try { try {

View File

@ -40,12 +40,14 @@ dependencies = [
"common-contracts", "common-contracts",
"config", "config",
"dashmap", "dashmap",
"futures",
"futures-util", "futures-util",
"reqwest", "reqwest",
"rmcp", "rmcp",
"secrecy", "secrecy",
"serde", "serde",
"serde_json", "serde_json",
"sse-stream",
"thiserror 2.0.17", "thiserror 2.0.17",
"tokio", "tokio",
"tower-http", "tower-http",

View File

@ -43,3 +43,5 @@ secrecy = { version = "0.10.3", features = ["serde"] }
thiserror = "2.0.17" thiserror = "2.0.17"
anyhow = "1.0" anyhow = "1.0"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
sse-stream = "0.2"
futures = "0.3"

View File

@ -2,16 +2,18 @@ use std::collections::HashMap;
use axum::{ use axum::{
extract::State, extract::State,
response::Json, response::Json,
routing::get, routing::{get, post},
Router, Router,
}; };
use common_contracts::observability::{HealthStatus, ServiceStatus, TaskProgress}; use common_contracts::observability::{HealthStatus, ServiceStatus, TaskProgress};
use crate::state::{AppState, ServiceOperationalStatus}; use crate::state::{AppState, ServiceOperationalStatus};
use crate::api_test;
pub fn create_router(app_state: AppState) -> Router { pub fn create_router(app_state: AppState) -> Router {
Router::new() Router::new()
.route("/health", get(health_check)) .route("/health", get(health_check))
.route("/tasks", get(get_current_tasks)) .route("/tasks", get(get_current_tasks))
.route("/test", post(api_test::test_connection))
.with_state(app_state) .with_state(app_state)
} }

View File

@ -0,0 +1,97 @@
use crate::av_client::AvClient;
use axum::{http::StatusCode, response::{IntoResponse, Json}};
use secrecy::{SecretString, ExposeSecret};
use serde::{Deserialize, Serialize};
use tracing::{info, warn};
#[derive(Deserialize)]
pub struct TestConnectionRequest {
// This is the MCP endpoint URL
pub api_url: String,
// The API key is passed for validation but might not be used directly
// in the MCP connection itself, depending on auth mechanism.
pub api_key: Option<SecretString>,
}
#[derive(Serialize)]
pub struct TestConnectionResponse {
pub success: bool,
pub message: String,
}
/// [POST /test]
/// Dynamically tests a connection to an MCP endpoint.
pub async fn test_connection(
Json(payload): Json<TestConnectionRequest>,
) -> impl IntoResponse {
info!("Testing connection to MCP endpoint: {}", payload.api_url);
if payload.api_url.is_empty() {
return (
StatusCode::BAD_REQUEST,
Json(TestConnectionResponse {
success: false,
message: "API URL (MCP Endpoint) cannot be empty.".to_string(),
}),
).into_response();
}
// 要求传入 base MCP URL不包含查询参数与 api_key然后按官方文档拼接 ?apikey=
if payload.api_url.contains('?') {
return (
StatusCode::BAD_REQUEST,
Json(TestConnectionResponse {
success: false,
message: "API URL 必须为基础地址(不可包含查询参数)".to_string(),
}),
).into_response();
}
let Some(key) = &payload.api_key else {
return (
StatusCode::BAD_REQUEST,
Json(TestConnectionResponse {
success: false,
message: "测试连接需要提供 api_key".to_string(),
}),
).into_response();
};
let final_url = format!("{}?apikey={}", payload.api_url, key.expose_secret());
info!("Testing MCP with final endpoint: {}", final_url);
let mcp_client = match AvClient::connect(&final_url).await {
Ok(client) => client,
Err(e) => {
warn!("Failed to establish MCP transport: {}", e);
return (
StatusCode::BAD_REQUEST,
Json(TestConnectionResponse {
success: false,
message: format!("Failed to establish MCP transport: {}", e),
}),
).into_response();
}
};
// Call list_tools to verify the connection.
match mcp_client.list_tools().await {
Ok(tools) => {
info!("MCP list_tools successful. Found {} tools.", tools.len());
(
StatusCode::OK,
Json(TestConnectionResponse {
success: true,
message: format!("Successfully connected to MCP endpoint and found {} tools.", tools.len()),
}),
).into_response()
}
Err(e) => {
warn!("MCP list_tools failed: {}", e);
(
StatusCode::INTERNAL_SERVER_ERROR,
Json(TestConnectionResponse {
success: false,
message: format!("MCP command 'list_tools' failed: {}", e),
}),
).into_response()
}
}
}

View File

@ -1,7 +1,9 @@
use crate::error::{AppError, Result}; use crate::error::{AppError, Result};
use crate::transport::CustomHttpClient;
use rmcp::{ClientHandler, ServiceExt}; use rmcp::{ClientHandler, ServiceExt};
use rmcp::model::CallToolRequestParam; use rmcp::model::CallToolRequestParam;
use rmcp::transport::StreamableHttpClientTransport; use rmcp::transport::StreamableHttpClientTransport;
use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig;
use serde_json::{Map, Value}; use serde_json::{Map, Value};
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
@ -20,7 +22,20 @@ pub struct AvClient {
impl AvClient { impl AvClient {
pub async fn connect(mcp_endpoint_url: &str) -> Result<Self> { pub async fn connect(mcp_endpoint_url: &str) -> Result<Self> {
let transport = StreamableHttpClientTransport::from_uri(mcp_endpoint_url.to_string()); let config = StreamableHttpClientTransportConfig::with_uri(mcp_endpoint_url.to_string());
let transport = StreamableHttpClientTransport::with_client(CustomHttpClient::new(), config);
let running = DummyClientHandler
::default()
.serve(transport)
.await
.map_err(|e| AppError::Configuration(format!("Fail to init MCP service: {e:?}")))?;
Ok(Self { service: running })
}
pub async fn connect_with_bearer(mcp_endpoint_url: &str, bearer_token: &str) -> Result<Self> {
let config = StreamableHttpClientTransportConfig::with_uri(mcp_endpoint_url.to_string())
.auth_header(bearer_token.to_string());
let transport = StreamableHttpClientTransport::with_client(CustomHttpClient::new(), config);
let running = DummyClientHandler let running = DummyClientHandler
::default() ::default()
.serve(transport) .serve(transport)
@ -55,6 +70,12 @@ impl AvClient {
} }
Ok(Value::Null) Ok(Value::Null)
} }
pub async fn list_tools(&self) -> Result<Vec<rmcp::model::Tool>> {
let result = self.service.list_tools(None).await
.map_err(|e| AppError::Configuration(format!("MCP list_tools error: {e:?}")))?;
Ok(result.tools)
}
} }

View File

@ -40,15 +40,12 @@ async fn poll_and_update_config(state: &AppState) -> Result<()> {
}); });
if let Some(config) = alphavantage_config { if let Some(config) = alphavantage_config {
if let Some(api_key) = &config.api_key { let api_key = config.api_key.clone().map(SecretString::from);
state.update_provider(Some(SecretString::from(api_key.clone()))).await; let api_url = config.api_url.clone();
info!("Successfully updated Alphavantage provider with new configuration."); state.update_provider(api_key, api_url).await;
} else { info!("Successfully updated Alphavantage provider with new configuration.");
state.update_provider(None).await;
info!("Alphavantage provider is enabled but API key is missing. Service is degraded.");
}
} else { } else {
state.update_provider(None).await; state.update_provider(None, None).await;
info!("No enabled Alphavantage configuration found. Service is degraded."); info!("No enabled Alphavantage configuration found. Service is degraded.");
} }

View File

@ -1,4 +1,5 @@
mod api; mod api;
mod api_test;
mod config; mod config;
mod error; mod error;
mod mapping; mod mapping;
@ -8,6 +9,7 @@ mod state;
mod worker; mod worker;
mod av_client; mod av_client;
mod config_poller; mod config_poller;
mod transport;
use crate::config::AppConfig; use crate::config::AppConfig;
use crate::error::Result; use crate::error::Result;

View File

@ -6,6 +6,7 @@ use secrecy::{ExposeSecret, SecretString};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use uuid::Uuid; use uuid::Uuid;
use tracing::info;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum ServiceOperationalStatus { pub enum ServiceOperationalStatus {
@ -41,32 +42,46 @@ impl AppState {
self.av_provider.read().await.clone() self.av_provider.read().await.clone()
} }
pub async fn update_provider(&self, api_key: Option<SecretString>) { pub async fn update_provider(&self, api_key: Option<SecretString>, api_url: Option<String>) {
let mut provider_guard = self.av_provider.write().await; let mut provider_guard = self.av_provider.write().await;
let mut status_guard = self.status.write().await; let mut status_guard = self.status.write().await;
if let Some(key) = api_key { match (api_key, api_url) {
let mcp_endpoint = format!( (Some(key), Some(base_url)) => {
"https://mcp.alphavantage.co/mcp?apikey={}", if base_url.contains('?') {
key.expose_secret()
);
match AvClient::connect(&mcp_endpoint).await {
Ok(new_provider) => {
*provider_guard = Some(Arc::new(new_provider));
*status_guard = ServiceOperationalStatus::Active;
}
Err(e) => {
*provider_guard = None; *provider_guard = None;
*status_guard = ServiceOperationalStatus::Degraded { *status_guard = ServiceOperationalStatus::Degraded {
reason: format!("Failed to connect to Alphavantage: {}", e), reason: "Configured MCP endpoint must not contain query parameters.".to_string(),
}; };
return;
}
let mcp_endpoint = format!("{}?apikey={}", base_url, key.expose_secret());
info!("Initializing Alphavantage MCP provider with endpoint: {}", mcp_endpoint);
match AvClient::connect(&mcp_endpoint).await {
Ok(new_provider) => {
*provider_guard = Some(Arc::new(new_provider));
*status_guard = ServiceOperationalStatus::Active;
}
Err(e) => {
*provider_guard = None;
*status_guard = ServiceOperationalStatus::Degraded {
reason: format!("Failed to connect to Alphavantage: {}", e),
};
}
} }
} }
} else { (None, _) => {
*provider_guard = None; *provider_guard = None;
*status_guard = ServiceOperationalStatus::Degraded { *status_guard = ServiceOperationalStatus::Degraded {
reason: "Alphavantage API Key is not configured.".to_string(), reason: "Alphavantage API Key is not configured.".to_string(),
}; };
}
(_, None) => {
*provider_guard = None;
*status_guard = ServiceOperationalStatus::Degraded {
reason: "Alphavantage MCP endpoint is not configured.".to_string(),
};
}
} }
} }
} }

View File

@ -0,0 +1,179 @@
use std::{borrow::Cow, sync::Arc};
use futures::{stream::BoxStream, StreamExt};
use reqwest::header::{ACCEPT, CONTENT_TYPE};
use rmcp::{
model::ClientJsonRpcMessage,
model::ServerJsonRpcMessage,
transport::streamable_http_client::{
AuthRequiredError, SseError, StreamableHttpClient, StreamableHttpError,
StreamableHttpPostResponse,
},
};
use sse_stream::{Sse, SseStream};
// Redefine constants as they are not public in rmcp
const HEADER_SESSION_ID: &str = "Mcp-Session-Id";
const HEADER_LAST_EVENT_ID: &str = "Last-Event-Id";
const EVENT_STREAM_MIME_TYPE: &str = "text/event-stream";
const JSON_MIME_TYPE: &str = "application/json";
const WWW_AUTHENTICATE: &str = "www-authenticate";
#[derive(Clone)]
pub struct CustomHttpClient {
client: reqwest::Client,
}
impl Default for CustomHttpClient {
fn default() -> Self {
Self {
client: reqwest::Client::new(),
}
}
}
impl CustomHttpClient {
pub fn new() -> Self {
Self::default()
}
}
impl StreamableHttpClient for CustomHttpClient {
type Error = reqwest::Error;
async fn get_stream(
&self,
uri: Arc<str>,
session_id: Arc<str>,
last_event_id: Option<String>,
auth_token: Option<String>,
) -> Result<BoxStream<'static, Result<Sse, SseError>>, StreamableHttpError<Self::Error>> {
let mut request_builder = self
.client
.get(uri.as_ref())
.header(ACCEPT, EVENT_STREAM_MIME_TYPE)
.header(HEADER_SESSION_ID, session_id.as_ref());
if let Some(last_event_id) = last_event_id {
request_builder = request_builder.header(HEADER_LAST_EVENT_ID, last_event_id);
}
if let Some(auth_header) = auth_token {
request_builder = request_builder.bearer_auth(auth_header);
}
let response = request_builder.send().await?;
// --- CUSTOM LOGIC START ---
// If we get 400 Bad Request or 405 Method Not Allowed, we assume the server
// does not support SSE on this endpoint and degrade gracefully.
if response.status() == reqwest::StatusCode::METHOD_NOT_ALLOWED
|| response.status() == reqwest::StatusCode::BAD_REQUEST {
tracing::debug!("Server returned {}, assuming SSE not supported", response.status());
return Err(StreamableHttpError::ServerDoesNotSupportSse);
}
// --- CUSTOM LOGIC END ---
let response = response.error_for_status()?;
match response.headers().get(CONTENT_TYPE) {
Some(ct) => {
if !ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) {
return Err(StreamableHttpError::UnexpectedContentType(Some(
String::from_utf8_lossy(ct.as_bytes()).to_string(),
)));
}
}
None => {
return Err(StreamableHttpError::UnexpectedContentType(None));
}
}
let event_stream = SseStream::from_byte_stream(response.bytes_stream()).boxed();
Ok(event_stream)
}
async fn delete_session(
&self,
uri: Arc<str>,
session: Arc<str>,
auth_token: Option<String>,
) -> Result<(), StreamableHttpError<Self::Error>> {
let mut request_builder = self.client.delete(uri.as_ref());
if let Some(auth_header) = auth_token {
request_builder = request_builder.bearer_auth(auth_header);
}
let response = request_builder
.header(HEADER_SESSION_ID, session.as_ref())
.send()
.await?;
if response.status() == reqwest::StatusCode::METHOD_NOT_ALLOWED {
tracing::debug!("this server doesn't support deleting session");
return Ok(());
}
let _response = response.error_for_status()?;
Ok(())
}
async fn post_message(
&self,
uri: Arc<str>,
message: ClientJsonRpcMessage,
session_id: Option<Arc<str>>,
auth_token: Option<String>,
) -> Result<StreamableHttpPostResponse, StreamableHttpError<Self::Error>> {
let mut request = self
.client
.post(uri.as_ref())
.header(ACCEPT, [EVENT_STREAM_MIME_TYPE, JSON_MIME_TYPE].join(", "));
if let Some(auth_header) = auth_token {
request = request.bearer_auth(auth_header);
}
if let Some(session_id) = session_id {
request = request.header(HEADER_SESSION_ID, session_id.as_ref());
}
let response = request.json(&message).send().await?;
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
if let Some(header) = response.headers().get(WWW_AUTHENTICATE) {
let header = header
.to_str()
.map_err(|_| {
StreamableHttpError::UnexpectedServerResponse(Cow::from(
"invalid www-authenticate header value",
))
})?
.to_string();
return Err(StreamableHttpError::AuthRequired(AuthRequiredError {
www_authenticate_header: header,
}));
}
}
let status = response.status();
let response = response.error_for_status()?;
if matches!(
status,
reqwest::StatusCode::ACCEPTED | reqwest::StatusCode::NO_CONTENT
) {
return Ok(StreamableHttpPostResponse::Accepted);
}
let content_type = response.headers().get(CONTENT_TYPE);
let session_id = response.headers().get(HEADER_SESSION_ID);
let session_id = session_id
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
match content_type {
Some(ct) if ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) => {
let event_stream = SseStream::from_byte_stream(response.bytes_stream()).boxed();
Ok(StreamableHttpPostResponse::Sse(event_stream, session_id))
}
Some(ct) if ct.as_bytes().starts_with(JSON_MIME_TYPE.as_bytes()) => {
let message: ServerJsonRpcMessage = response.json().await?;
Ok(StreamableHttpPostResponse::Json(message, session_id))
}
_ => {
tracing::error!("unexpected content type: {:?}", content_type);
Err(StreamableHttpError::UnexpectedContentType(
content_type.map(|ct| String::from_utf8_lossy(ct.as_bytes()).to_string()),
))
}
}
}
}

View File

@ -66,6 +66,7 @@ fn create_v1_router() -> Router<AppState> {
"/configs/data_sources", "/configs/data_sources",
get(get_data_sources_config).put(update_data_sources_config), get(get_data_sources_config).put(update_data_sources_config),
) )
.route("/configs/test", post(test_data_source_config))
// --- New Discover Routes --- // --- New Discover Routes ---
.route("/discover-models/{provider_id}", get(discover_models)) .route("/discover-models/{provider_id}", get(discover_models))
.route("/discover-models", post(discover_models_preview)) .route("/discover-models", post(discover_models_preview))
@ -208,6 +209,71 @@ async fn get_task_progress(
} }
// --- New Config Test Handler ---
#[derive(Deserialize, Debug)]
struct TestConfigRequest {
r#type: String,
#[serde(flatten)]
data: serde_json::Value,
}
/// [POST /v1/configs/test]
/// Forwards a configuration test request to the appropriate downstream service.
async fn test_data_source_config(
State(state): State<AppState>,
Json(payload): Json<TestConfigRequest>,
) -> Result<impl IntoResponse> {
info!("test_data_source_config: type={}", payload.r#type);
let target_service_url = match payload.r#type.as_str() {
"tushare" => state.config.provider_services.iter().find(|s| s.contains("tushare")),
"finnhub" => state.config.provider_services.iter().find(|s| s.contains("finnhub")),
"alphavantage" => state.config.provider_services.iter().find(|s| s.contains("alphavantage")),
_ => {
return Ok((
StatusCode::BAD_REQUEST,
Json(serde_json::json!({ "error": "Unsupported config type" })),
).into_response());
}
};
if let Some(base_url) = target_service_url {
let client = reqwest::Client::new();
let target_url = format!("{}/test", base_url.trim_end_matches('/'));
info!("Forwarding test request for '{}' to {}", payload.r#type, target_url);
let response = client
.post(&target_url)
.json(&payload.data)
.send()
.await?;
if !response.status().is_success() {
let status = response.status();
let error_text = response.text().await?;
warn!("Downstream test for '{}' failed: status={} body={}", payload.r#type, status, error_text);
return Ok((
StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::BAD_GATEWAY),
Json(serde_json::json!({
"error": "Downstream service returned an error",
"details": error_text,
})),
).into_response());
}
let response_json: serde_json::Value = response.json().await?;
Ok((StatusCode::OK, Json(response_json)).into_response())
} else {
warn!("No downstream service found for config type: {}", payload.r#type);
Ok((
StatusCode::NOT_IMPLEMENTED,
Json(serde_json::json!({ "error": "No downstream service configured for this type" })),
).into_response())
}
}
// --- Config API Handlers (Proxy to data-persistence-service) --- // --- Config API Handlers (Proxy to data-persistence-service) ---
use common_contracts::config_models::{ use common_contracts::config_models::{

View File

@ -1,6 +1,8 @@
use std::collections::HashMap; use std::collections::HashMap;
use axum::{routing::get, Router, extract::State, response::Json}; use axum::{routing::{get, post}, Router, extract::State, response::{Json, IntoResponse}, http::StatusCode};
use serde::Deserialize;
use secrecy::ExposeSecret;
use crate::ts_client::TushareClient;
use crate::state::{AppState, ServiceOperationalStatus}; use crate::state::{AppState, ServiceOperationalStatus};
use common_contracts::observability::{HealthStatus, ServiceStatus}; use common_contracts::observability::{HealthStatus, ServiceStatus};
@ -8,9 +10,63 @@ pub fn create_router(app_state: AppState) -> Router {
Router::new() Router::new()
.route("/health", get(health_check)) .route("/health", get(health_check))
.route("/tasks", get(get_tasks)) .route("/tasks", get(get_tasks))
.route("/test", post(test_connection))
.with_state(app_state) .with_state(app_state)
} }
#[derive(Deserialize)]
struct TestRequest {
api_key: Option<String>,
api_url: Option<String>,
}
async fn test_connection(
State(state): State<AppState>,
Json(payload): Json<TestRequest>,
) -> impl IntoResponse {
let api_url = payload.api_url
.filter(|s| !s.is_empty())
.unwrap_or_else(|| state.config.tushare_api_url.clone());
let api_key = if let Some(k) = payload.api_key.filter(|s| !s.is_empty()) {
k
} else if let Some(k) = &state.config.tushare_api_token {
k.expose_secret().clone()
} else {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"success": false,
"message": "No API Key provided or configured"
}))
).into_response();
};
let client = TushareClient::new(api_url, api_key);
// Try to fetch a small amount of data to verify the token
match client.send_request::<serde_json::Value>(
"stock_basic",
serde_json::json!({"limit": 1}),
""
).await {
Ok(_) => (
StatusCode::OK,
Json(serde_json::json!({
"success": true,
"message": "Connection successful"
}))
).into_response(),
Err(e) => (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"success": false,
"message": format!("Connection failed: {}", e)
}))
).into_response(),
}
}
async fn health_check(State(state): State<AppState>) -> Json<HealthStatus> { async fn health_check(State(state): State<AppState>) -> Json<HealthStatus> {
let mut details = HashMap::new(); let mut details = HashMap::new();
let operational_status = state.status.read().await; let operational_status = state.status.read().await;