Compare commits
No commits in common. "68ae2656a72b2e5a5711ac73bf24e579930c17be" and "427776b8639f2f5a60c585a8f1330e4bd09445aa" have entirely different histories.
68ae2656a7
...
427776b863
@ -1,62 +0,0 @@
|
|||||||
# [待处理] 实现 AlphaVantage 服务连接测试功能
|
|
||||||
|
|
||||||
**日期**: 2025-11-18
|
|
||||||
|
|
||||||
**状态**: 待处理 (Pending)
|
|
||||||
|
|
||||||
**负责人**: AI Assistant
|
|
||||||
|
|
||||||
## 1. 需求背景
|
|
||||||
|
|
||||||
目前,系统配置中心的数据源配置页面中,Tushare 和 Finnhub 模块均提供了“测试”按钮,用于验证用户填写的 API Key 和 URL 是否有效。然而,AlphaVantage 模块缺少此功能。由于 AlphaVantage 的数据是通过 MCP (Meta-protocol Computation Platform) 协议间接调用的,其连接健康状态的检查尤为重要。
|
|
||||||
|
|
||||||
本任务旨在为 AlphaVantage 模块添加一个功能完善的“测试”按钮,以提升系统的健壮性和用户体验。
|
|
||||||
|
|
||||||
## 2. 技术方案与执行细节
|
|
||||||
|
|
||||||
该功能的实现需要贯穿前端、API网关和后端的 AlphaVantage 服务,形成一个完整的调用链路。
|
|
||||||
|
|
||||||
### 2.1. 前端 (Frontend)
|
|
||||||
|
|
||||||
* **文件**: `/frontend/src/app/config/page.tsx`
|
|
||||||
* **任务**:
|
|
||||||
1. **新增UI元素**: 在 AlphaVantage 配置卡片中,仿照其他服务,添加一个“测试 AlphaVantage”按钮。
|
|
||||||
2. **创建事件处理器**:
|
|
||||||
* 实现 `handleTestAlphaVantage` 函数。
|
|
||||||
* 该函数将从组件的本地状态 (`localDataSources`) 中读取 `alphavantage` 的 `api_key` 和 `api_url` (此 URL 为 MCP Endpoint)。
|
|
||||||
* 调用通用的 `handleTest('alphavantage', { ...config })` 函数,将请求发送至 Next.js 后端 API 路由。
|
|
||||||
|
|
||||||
### 2.2. Next.js API 路由
|
|
||||||
|
|
||||||
* **文件**: `/frontend/src/app/api/configs/test/route.ts` (推测)
|
|
||||||
* **任务**:
|
|
||||||
1. **新增处理分支**: 在 `POST` 请求处理逻辑中,为 `type: 'alphavantage'` 增加一个新的 `case`。
|
|
||||||
2. **请求转发**: 该分支将把收到的测试请求(包含配置信息)原样转发到后端的 API 网关。
|
|
||||||
|
|
||||||
### 2.3. API 网关 (API Gateway)
|
|
||||||
|
|
||||||
* **文件**: `/services/api-gateway/src/api.rs` (或相关路由模块)
|
|
||||||
* **任务**:
|
|
||||||
1. **更新路由规则**: 修改处理配置测试的路由逻辑。
|
|
||||||
2. **分发请求**: 当识别到请求类型为 `alphavantage` 时,将该请求精准地转发到 `alphavantage-provider-service` 的新测试接口。
|
|
||||||
|
|
||||||
### 2.4. AlphaVantage 服务 (alphavantage-provider-service)
|
|
||||||
|
|
||||||
这是实现测试逻辑的核心。
|
|
||||||
|
|
||||||
* **文件**: `/services/alphavantage-provider-service/src/api.rs` (或新建的模块)
|
|
||||||
* **任务**:
|
|
||||||
1. **创建新接口**: 在服务中创建一个新的 HTTP `POST /test` 接口,用于接收来自网关的测试请求。
|
|
||||||
2. **实现核心测试逻辑**:
|
|
||||||
* 接口从请求体中解析出 `api_url` 和 `api_key`。
|
|
||||||
* **动态 MCP 客户端**: 使用传入的 `api_url` 动态地、临时地创建一个 MCP 客户端实例。这确保了测试的是用户当前输入的配置,而不是服务启动时加载的旧配置。
|
|
||||||
* **调用 `list_capability`**: 利用此临时客户端,调用 MCP 服务标准工具集中的 `list_capability` 工具。`api_key` 将作为认证凭证传递给此调用。
|
|
||||||
* **响应处理**:
|
|
||||||
* **成功**: 如果 `list_capability` 调用成功返回,意味着 MCP Endpoint 可达、服务正常、API Key 有效。此时,接口返回 `{"success": true, "message": "MCP connection successful."}`。
|
|
||||||
* **失败**: 如果调用过程中出现任何错误(网络问题、认证失败、超时等),接口将捕获异常并返回 `{"success": false, "message": "MCP connection failed: [具体错误信息]"}`。
|
|
||||||
|
|
||||||
## 3. 预期成果
|
|
||||||
|
|
||||||
* 用户可以在配置中心页面点击按钮来测试 AlphaVantage 的连接配置。
|
|
||||||
* 系统能够通过调用 MCP 的 `list_capability` 接口,实时验证配置的有效性。
|
|
||||||
* 前端能够清晰地展示测试成功或失败的结果,为用户提供明确的反馈。
|
|
||||||
@ -6,40 +6,7 @@ export async function POST(req: NextRequest) {
|
|||||||
if (!BACKEND_BASE) {
|
if (!BACKEND_BASE) {
|
||||||
return new Response('BACKEND_INTERNAL_URL/NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 });
|
return new Response('BACKEND_INTERNAL_URL/NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 });
|
||||||
}
|
}
|
||||||
|
// 新后端暂无统一 /config/test;先返回未实现
|
||||||
try {
|
const body = await req.text().catch(() => '');
|
||||||
const body = await req.json();
|
return Response.json({ success: false, message: 'config/test 未实现', echo: body }, { status: 501 });
|
||||||
const { type, data } = body;
|
|
||||||
|
|
||||||
if (!type || !data) {
|
|
||||||
return new Response('请求体必须包含 type 和 data', { status: 400 });
|
|
||||||
}
|
|
||||||
|
|
||||||
// 将请求转发到 API Gateway
|
|
||||||
const targetUrl = `${BACKEND_BASE.replace(/\/$/, '')}/configs/test`;
|
|
||||||
|
|
||||||
const backendRes = await fetch(targetUrl, {
|
|
||||||
method: 'POST',
|
|
||||||
headers: {
|
|
||||||
'Content-Type': 'application/json',
|
|
||||||
},
|
|
||||||
body: JSON.stringify({ type, ...data }), // 转发时将 data 字段展开
|
|
||||||
});
|
|
||||||
|
|
||||||
const backendResBody = await backendRes.text();
|
|
||||||
|
|
||||||
return new Response(backendResBody, {
|
|
||||||
status: backendRes.status,
|
|
||||||
headers: {
|
|
||||||
'Content-Type': 'application/json',
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
} catch (error: any) {
|
|
||||||
console.error('配置测试代理失败:', error);
|
|
||||||
return new Response(JSON.stringify({ success: false, message: error.message || '代理请求时发生未知错误' }), {
|
|
||||||
status: 500,
|
|
||||||
headers: { 'Content-Type': 'application/json' },
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -26,143 +26,6 @@ import type {
|
|||||||
} from '@/types';
|
} from '@/types';
|
||||||
import { useDataSourcesConfig, updateDataSourcesConfig, useAnalysisTemplateSets, updateAnalysisTemplateSets } from '@/hooks/useApi';
|
import { useDataSourcesConfig, updateDataSourcesConfig, useAnalysisTemplateSets, updateAnalysisTemplateSets } from '@/hooks/useApi';
|
||||||
|
|
||||||
// ---- Helpers: pretty print nested JSON as YAML-like (braceless, unquoted) ----
|
|
||||||
const MAX_REPARSE_DEPTH = 2;
|
|
||||||
function tryParseJson(input: string): unknown {
|
|
||||||
try {
|
|
||||||
return JSON.parse(input);
|
|
||||||
} catch {
|
|
||||||
return input;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
function parsePossiblyNestedJson(raw: string): unknown {
|
|
||||||
let current: unknown = raw;
|
|
||||||
for (let i = 0; i < MAX_REPARSE_DEPTH; i++) {
|
|
||||||
if (typeof current === 'string') {
|
|
||||||
const trimmed = current.trim();
|
|
||||||
if ((trimmed.startsWith('{') && trimmed.endsWith('}')) || (trimmed.startsWith('[') && trimmed.endsWith(']'))) {
|
|
||||||
current = tryParseJson(trimmed);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
return current;
|
|
||||||
}
|
|
||||||
function indentLines(text: string, indent: string): string {
|
|
||||||
return text.split('\n').map((line) => indent + line).join('\n');
|
|
||||||
}
|
|
||||||
function looksLikeStructuredText(s: string): boolean {
|
|
||||||
const t = s.trim();
|
|
||||||
// 具有成对的大括号,或常见的 Rust/reqwest 错误标识,判定为可结构化的调试输出
|
|
||||||
return (t.includes('{') && t.includes('}')) || t.includes('DynamicTransportError') || t.includes('reqwest::Error');
|
|
||||||
}
|
|
||||||
function prettyFormatByBraces(input: string): string {
|
|
||||||
let out = '';
|
|
||||||
let indentLevel = 0;
|
|
||||||
const indentUnit = ' ';
|
|
||||||
let inString = false;
|
|
||||||
let stringQuote = '';
|
|
||||||
for (let i = 0; i < input.length; i++) {
|
|
||||||
const ch = input[i]!;
|
|
||||||
const prev = i > 0 ? input[i - 1]! : '';
|
|
||||||
if ((ch === '"' || ch === "'") && prev !== '\\') {
|
|
||||||
if (!inString) {
|
|
||||||
inString = true;
|
|
||||||
stringQuote = ch;
|
|
||||||
} else if (stringQuote === ch) {
|
|
||||||
inString = false;
|
|
||||||
}
|
|
||||||
out += ch;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (inString) {
|
|
||||||
out += ch;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (ch === '{' || ch === '[' || ch === '(') {
|
|
||||||
indentLevel += 1;
|
|
||||||
out += ch + '\n' + indentUnit.repeat(indentLevel);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (ch === '}' || ch === ']' || ch === ')') {
|
|
||||||
indentLevel = Math.max(0, indentLevel - 1);
|
|
||||||
out += '\n' + indentUnit.repeat(indentLevel) + ch;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (ch === ',') {
|
|
||||||
out += ch + '\n' + indentUnit.repeat(indentLevel);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (ch === ':') {
|
|
||||||
out += ch + ' ';
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
out += ch;
|
|
||||||
}
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
function toPseudoYaml(value: unknown, indent: string = ''): string {
|
|
||||||
const nextIndent = indent + ' ';
|
|
||||||
if (value === null || value === undefined) {
|
|
||||||
return `${indent}null`;
|
|
||||||
}
|
|
||||||
const t = typeof value;
|
|
||||||
if (t === 'string' || t === 'number' || t === 'boolean') {
|
|
||||||
const s = String(value);
|
|
||||||
const shouldPretty = looksLikeStructuredText(s) || s.includes('\n');
|
|
||||||
if (shouldPretty) {
|
|
||||||
const pretty = looksLikeStructuredText(s) ? prettyFormatByBraces(s) : s;
|
|
||||||
return `${indent}|-\n${indentLines(pretty, nextIndent)}`;
|
|
||||||
}
|
|
||||||
return `${indent}${s}`;
|
|
||||||
}
|
|
||||||
if (Array.isArray(value)) {
|
|
||||||
if (value.length === 0) return `${indent}[]`;
|
|
||||||
return value.map((item) => {
|
|
||||||
const rendered = toPseudoYaml(item, nextIndent);
|
|
||||||
const lines = rendered.split('\n');
|
|
||||||
if (lines.length === 1) {
|
|
||||||
return `${indent}- ${lines[0].trimStart()}`;
|
|
||||||
}
|
|
||||||
return `${indent}- ${lines[0].trimStart()}\n${lines.slice(1).map((l) => indent + ' ' + l.trimStart()).join('\n')}`;
|
|
||||||
}).join('\n');
|
|
||||||
}
|
|
||||||
if (typeof value === 'object') {
|
|
||||||
const obj = value as Record<string, unknown>;
|
|
||||||
const keys = Object.keys(obj);
|
|
||||||
if (keys.length === 0) return `${indent}{}`;
|
|
||||||
return keys.map((k) => {
|
|
||||||
const rendered = toPseudoYaml(obj[k], nextIndent);
|
|
||||||
const lines = rendered.split('\n');
|
|
||||||
if (lines.length === 1) {
|
|
||||||
return `${indent}${k}: ${lines[0].trimStart()}`;
|
|
||||||
}
|
|
||||||
return `${indent}${k}:\n${lines.map((l) => l).join('\n')}`;
|
|
||||||
}).join('\n');
|
|
||||||
}
|
|
||||||
// fallback
|
|
||||||
return `${indent}${String(value)}`;
|
|
||||||
}
|
|
||||||
function formatDetailsToYaml(details: string): string {
|
|
||||||
const parsed = parsePossiblyNestedJson(details);
|
|
||||||
if (typeof parsed === 'string') {
|
|
||||||
// 尝试再次解析(有些场景内层 message 也是 JSON 串)
|
|
||||||
const again = parsePossiblyNestedJson(parsed);
|
|
||||||
if (typeof again === 'string') {
|
|
||||||
return toPseudoYaml(again);
|
|
||||||
}
|
|
||||||
return toPseudoYaml(again);
|
|
||||||
}
|
|
||||||
return toPseudoYaml(parsed);
|
|
||||||
}
|
|
||||||
|
|
||||||
const defaultUrls: Partial<Record<DataSourceProvider, string>> = {
|
|
||||||
tushare: 'http://api.tushare.pro',
|
|
||||||
finnhub: 'https://finnhub.io/api/v1',
|
|
||||||
alphavantage: 'https://mcp.alphavantage.co/mcp',
|
|
||||||
};
|
|
||||||
|
|
||||||
export default function ConfigPage() {
|
export default function ConfigPage() {
|
||||||
// 从 Zustand store 获取全局状态
|
// 从 Zustand store 获取全局状态
|
||||||
const { config, loading, error, setConfig } = useConfigStore();
|
const { config, loading, error, setConfig } = useConfigStore();
|
||||||
@ -184,7 +47,7 @@ export default function ConfigPage() {
|
|||||||
// 分析配置保存状态(状态定义在下方统一维护)
|
// 分析配置保存状态(状态定义在下方统一维护)
|
||||||
|
|
||||||
// 测试结果状态
|
// 测试结果状态
|
||||||
const [testResults, setTestResults] = useState<Record<string, { success: boolean; summary: string; details?: string } | null>>({});
|
const [testResults, setTestResults] = useState<Record<string, { success: boolean; message: string } | null>>({});
|
||||||
|
|
||||||
// 保存状态
|
// 保存状态
|
||||||
const [saving, setSaving] = useState(false);
|
const [saving, setSaving] = useState(false);
|
||||||
@ -505,22 +368,8 @@ export default function ConfigPage() {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
if (initialDataSources) {
|
if (initialDataSources) {
|
||||||
// Create a deep copy to avoid mutating the local state directly
|
await updateDataSourcesConfig(localDataSources);
|
||||||
const finalDataSources = JSON.parse(JSON.stringify(localDataSources));
|
await mutateDataSources(localDataSources, false);
|
||||||
for (const key in finalDataSources) {
|
|
||||||
const providerKey = key as DataSourceProvider;
|
|
||||||
const source = finalDataSources[providerKey];
|
|
||||||
// If the URL is empty or null, and there is a default URL, use it
|
|
||||||
if (source && (source.api_url === null || source.api_url.trim() === '') && defaultUrls[providerKey]) {
|
|
||||||
source.api_url = defaultUrls[providerKey];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
await updateDataSourcesConfig(finalDataSources);
|
|
||||||
// After saving, mutate the local SWR cache with the final data
|
|
||||||
// and also update the component's local state to reflect the change.
|
|
||||||
await mutateDataSources(finalDataSources, false);
|
|
||||||
setLocalDataSources(finalDataSources);
|
|
||||||
}
|
}
|
||||||
setSaveMessage('保存成功!');
|
setSaveMessage('保存成功!');
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
@ -534,18 +383,11 @@ export default function ConfigPage() {
|
|||||||
const handleTest = async (type: string, data: any) => {
|
const handleTest = async (type: string, data: any) => {
|
||||||
try {
|
try {
|
||||||
const result = await testConfig(type, data);
|
const result = await testConfig(type, data);
|
||||||
const success = !!result?.success;
|
setTestResults(prev => ({ ...prev, [type]: result }));
|
||||||
const summary = typeof result?.message === 'string' && result.message.trim().length > 0
|
|
||||||
? result.message
|
|
||||||
: (success ? '测试成功' : '测试失败');
|
|
||||||
setTestResults(prev => ({ ...prev, [type]: { success, summary } }));
|
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
// 结构化错误对象:{ summary, details? }
|
setTestResults(prev => ({
|
||||||
const summary: string = (e && typeof e === 'object' && 'summary' in e) ? String(e.summary) : (e?.message || '未知错误');
|
...prev,
|
||||||
const details: string | undefined = (e && typeof e === 'object' && 'details' in e) ? (e.details ? String(e.details) : undefined) : undefined;
|
[type]: { success: false, message: e.message }
|
||||||
setTestResults(prev => ({
|
|
||||||
...prev,
|
|
||||||
[type]: { success: false, summary, details }
|
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -560,11 +402,6 @@ export default function ConfigPage() {
|
|||||||
handleTest('finnhub', { api_key: cfg?.api_key, api_url: cfg?.api_url, enabled: cfg?.enabled });
|
handleTest('finnhub', { api_key: cfg?.api_key, api_url: cfg?.api_url, enabled: cfg?.enabled });
|
||||||
};
|
};
|
||||||
|
|
||||||
const handleTestAlphaVantage = () => {
|
|
||||||
const cfg = localDataSources['alphavantage'];
|
|
||||||
handleTest('alphavantage', { api_key: cfg?.api_key, api_url: cfg?.api_url, enabled: cfg?.enabled });
|
|
||||||
};
|
|
||||||
|
|
||||||
const handleReset = () => {
|
const handleReset = () => {
|
||||||
if (initialDataSources) setLocalDataSources(initialDataSources);
|
if (initialDataSources) setLocalDataSources(initialDataSources);
|
||||||
setTestResults({});
|
setTestResults({});
|
||||||
@ -1221,8 +1058,7 @@ export default function ConfigPage() {
|
|||||||
[providerKey]: { ...item, api_url: v, provider: providerKey },
|
[providerKey]: { ...item, api_url: v, provider: providerKey },
|
||||||
}));
|
}));
|
||||||
}}
|
}}
|
||||||
placeholder={defaultUrls[providerKey] ?? 'https://...'}
|
placeholder={providerKey === 'tushare' ? 'https://api.tushare.pro' : 'https://...'}
|
||||||
disabled={providerKey === 'yfinance'}
|
|
||||||
/>
|
/>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
@ -1233,33 +1069,7 @@ export default function ConfigPage() {
|
|||||||
{providerKey === 'finnhub' && (
|
{providerKey === 'finnhub' && (
|
||||||
<Button variant="outline" onClick={handleTestFinnhub}>测试 Finnhub</Button>
|
<Button variant="outline" onClick={handleTestFinnhub}>测试 Finnhub</Button>
|
||||||
)}
|
)}
|
||||||
{providerKey === 'alphavantage' && (
|
|
||||||
<Button variant="outline" onClick={handleTestAlphaVantage}>测试 AlphaVantage</Button>
|
|
||||||
)}
|
|
||||||
</div>
|
</div>
|
||||||
{testResults[providerKey] ? (() => {
|
|
||||||
const r = testResults[providerKey]!;
|
|
||||||
if (r.success) {
|
|
||||||
return (
|
|
||||||
<div className="text-sm text-green-600">
|
|
||||||
{r.summary}
|
|
||||||
</div>
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return (
|
|
||||||
<div className="text-sm text-red-600">
|
|
||||||
<div className="font-medium">测试失败</div>
|
|
||||||
{r.details ? (
|
|
||||||
<details className="mt-2">
|
|
||||||
<summary className="cursor-pointer text-red-700 underline">查看详细错误(YAML)</summary>
|
|
||||||
<pre className="mt-2 p-2 rounded bg-red-50 text-red-700 whitespace-pre-wrap break-words">
|
|
||||||
{formatDetailsToYaml(r.details)}
|
|
||||||
</pre>
|
|
||||||
</details>
|
|
||||||
) : null}
|
|
||||||
</div>
|
|
||||||
);
|
|
||||||
})() : null}
|
|
||||||
</div>
|
</div>
|
||||||
);
|
);
|
||||||
})}
|
})}
|
||||||
|
|||||||
@ -294,41 +294,9 @@ export async function testConfig(type: string, data: unknown) {
|
|||||||
if (!res.ok) {
|
if (!res.ok) {
|
||||||
try {
|
try {
|
||||||
const err = JSON.parse(text);
|
const err = JSON.parse(text);
|
||||||
// 优先从标准字段中提取错误信息;同时分离 details
|
throw new Error(err?.message || text);
|
||||||
let message: string = err?.error || err?.message || '';
|
|
||||||
let detailsStr = '';
|
|
||||||
if (err?.details !== undefined) {
|
|
||||||
if (typeof err.details === 'string') {
|
|
||||||
detailsStr = err.details;
|
|
||||||
// details 可能是被 JSON 序列化的字符串,尝试解析一次以便还原内部结构
|
|
||||||
try {
|
|
||||||
const parsed = JSON.parse(detailsStr);
|
|
||||||
if (typeof parsed === 'string') {
|
|
||||||
detailsStr = parsed;
|
|
||||||
} else if (parsed && typeof parsed === 'object' && 'message' in parsed) {
|
|
||||||
detailsStr = String((parsed as any).message);
|
|
||||||
} else {
|
|
||||||
detailsStr = JSON.stringify(parsed);
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
// 忽略解析失败,保留原始 details 字符串
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
try {
|
|
||||||
detailsStr = JSON.stringify(err.details);
|
|
||||||
} catch {
|
|
||||||
detailsStr = String(err.details);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const summary = message || `HTTP ${res.status}`;
|
|
||||||
const detailsOut = (detailsStr && detailsStr.length > 0) ? detailsStr : (text || undefined);
|
|
||||||
// 抛出结构化错误对象,供调用方精确展示(details 始终尽量携带原始响应体,便于前端美化)
|
|
||||||
throw { summary, details: detailsOut };
|
|
||||||
} catch {
|
} catch {
|
||||||
// 无法解析为 JSON 时,仍然把原始文本作为 details 返回,保证前端可美化
|
throw new Error(text || `HTTP ${res.status}`);
|
||||||
const fallback = text || `HTTP ${res.status}`;
|
|
||||||
throw { summary: fallback, details: text || undefined };
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
|||||||
@ -40,14 +40,12 @@ dependencies = [
|
|||||||
"common-contracts",
|
"common-contracts",
|
||||||
"config",
|
"config",
|
||||||
"dashmap",
|
"dashmap",
|
||||||
"futures",
|
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"rmcp",
|
"rmcp",
|
||||||
"secrecy",
|
"secrecy",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sse-stream",
|
|
||||||
"thiserror 2.0.17",
|
"thiserror 2.0.17",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tower-http",
|
"tower-http",
|
||||||
|
|||||||
@ -43,5 +43,3 @@ secrecy = { version = "0.10.3", features = ["serde"] }
|
|||||||
thiserror = "2.0.17"
|
thiserror = "2.0.17"
|
||||||
anyhow = "1.0"
|
anyhow = "1.0"
|
||||||
chrono = { version = "0.4", features = ["serde"] }
|
chrono = { version = "0.4", features = ["serde"] }
|
||||||
sse-stream = "0.2"
|
|
||||||
futures = "0.3"
|
|
||||||
|
|||||||
@ -2,18 +2,16 @@ use std::collections::HashMap;
|
|||||||
use axum::{
|
use axum::{
|
||||||
extract::State,
|
extract::State,
|
||||||
response::Json,
|
response::Json,
|
||||||
routing::{get, post},
|
routing::get,
|
||||||
Router,
|
Router,
|
||||||
};
|
};
|
||||||
use common_contracts::observability::{HealthStatus, ServiceStatus, TaskProgress};
|
use common_contracts::observability::{HealthStatus, ServiceStatus, TaskProgress};
|
||||||
use crate::state::{AppState, ServiceOperationalStatus};
|
use crate::state::{AppState, ServiceOperationalStatus};
|
||||||
use crate::api_test;
|
|
||||||
|
|
||||||
pub fn create_router(app_state: AppState) -> Router {
|
pub fn create_router(app_state: AppState) -> Router {
|
||||||
Router::new()
|
Router::new()
|
||||||
.route("/health", get(health_check))
|
.route("/health", get(health_check))
|
||||||
.route("/tasks", get(get_current_tasks))
|
.route("/tasks", get(get_current_tasks))
|
||||||
.route("/test", post(api_test::test_connection))
|
|
||||||
.with_state(app_state)
|
.with_state(app_state)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,97 +0,0 @@
|
|||||||
use crate::av_client::AvClient;
|
|
||||||
use axum::{http::StatusCode, response::{IntoResponse, Json}};
|
|
||||||
use secrecy::{SecretString, ExposeSecret};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use tracing::{info, warn};
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
pub struct TestConnectionRequest {
|
|
||||||
// This is the MCP endpoint URL
|
|
||||||
pub api_url: String,
|
|
||||||
// The API key is passed for validation but might not be used directly
|
|
||||||
// in the MCP connection itself, depending on auth mechanism.
|
|
||||||
pub api_key: Option<SecretString>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize)]
|
|
||||||
pub struct TestConnectionResponse {
|
|
||||||
pub success: bool,
|
|
||||||
pub message: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// [POST /test]
|
|
||||||
/// Dynamically tests a connection to an MCP endpoint.
|
|
||||||
pub async fn test_connection(
|
|
||||||
Json(payload): Json<TestConnectionRequest>,
|
|
||||||
) -> impl IntoResponse {
|
|
||||||
info!("Testing connection to MCP endpoint: {}", payload.api_url);
|
|
||||||
|
|
||||||
if payload.api_url.is_empty() {
|
|
||||||
return (
|
|
||||||
StatusCode::BAD_REQUEST,
|
|
||||||
Json(TestConnectionResponse {
|
|
||||||
success: false,
|
|
||||||
message: "API URL (MCP Endpoint) cannot be empty.".to_string(),
|
|
||||||
}),
|
|
||||||
).into_response();
|
|
||||||
}
|
|
||||||
|
|
||||||
// 要求传入 base MCP URL(不包含查询参数)与 api_key,然后按官方文档拼接 ?apikey=
|
|
||||||
if payload.api_url.contains('?') {
|
|
||||||
return (
|
|
||||||
StatusCode::BAD_REQUEST,
|
|
||||||
Json(TestConnectionResponse {
|
|
||||||
success: false,
|
|
||||||
message: "API URL 必须为基础地址(不可包含查询参数)".to_string(),
|
|
||||||
}),
|
|
||||||
).into_response();
|
|
||||||
}
|
|
||||||
let Some(key) = &payload.api_key else {
|
|
||||||
return (
|
|
||||||
StatusCode::BAD_REQUEST,
|
|
||||||
Json(TestConnectionResponse {
|
|
||||||
success: false,
|
|
||||||
message: "测试连接需要提供 api_key".to_string(),
|
|
||||||
}),
|
|
||||||
).into_response();
|
|
||||||
};
|
|
||||||
let final_url = format!("{}?apikey={}", payload.api_url, key.expose_secret());
|
|
||||||
info!("Testing MCP with final endpoint: {}", final_url);
|
|
||||||
let mcp_client = match AvClient::connect(&final_url).await {
|
|
||||||
Ok(client) => client,
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Failed to establish MCP transport: {}", e);
|
|
||||||
return (
|
|
||||||
StatusCode::BAD_REQUEST,
|
|
||||||
Json(TestConnectionResponse {
|
|
||||||
success: false,
|
|
||||||
message: format!("Failed to establish MCP transport: {}", e),
|
|
||||||
}),
|
|
||||||
).into_response();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Call list_tools to verify the connection.
|
|
||||||
match mcp_client.list_tools().await {
|
|
||||||
Ok(tools) => {
|
|
||||||
info!("MCP list_tools successful. Found {} tools.", tools.len());
|
|
||||||
(
|
|
||||||
StatusCode::OK,
|
|
||||||
Json(TestConnectionResponse {
|
|
||||||
success: true,
|
|
||||||
message: format!("Successfully connected to MCP endpoint and found {} tools.", tools.len()),
|
|
||||||
}),
|
|
||||||
).into_response()
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
warn!("MCP list_tools failed: {}", e);
|
|
||||||
(
|
|
||||||
StatusCode::INTERNAL_SERVER_ERROR,
|
|
||||||
Json(TestConnectionResponse {
|
|
||||||
success: false,
|
|
||||||
message: format!("MCP command 'list_tools' failed: {}", e),
|
|
||||||
}),
|
|
||||||
).into_response()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,9 +1,7 @@
|
|||||||
use crate::error::{AppError, Result};
|
use crate::error::{AppError, Result};
|
||||||
use crate::transport::CustomHttpClient;
|
|
||||||
use rmcp::{ClientHandler, ServiceExt};
|
use rmcp::{ClientHandler, ServiceExt};
|
||||||
use rmcp::model::CallToolRequestParam;
|
use rmcp::model::CallToolRequestParam;
|
||||||
use rmcp::transport::StreamableHttpClientTransport;
|
use rmcp::transport::StreamableHttpClientTransport;
|
||||||
use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig;
|
|
||||||
use serde_json::{Map, Value};
|
use serde_json::{Map, Value};
|
||||||
|
|
||||||
#[derive(Debug, Clone, Default)]
|
#[derive(Debug, Clone, Default)]
|
||||||
@ -22,20 +20,7 @@ pub struct AvClient {
|
|||||||
|
|
||||||
impl AvClient {
|
impl AvClient {
|
||||||
pub async fn connect(mcp_endpoint_url: &str) -> Result<Self> {
|
pub async fn connect(mcp_endpoint_url: &str) -> Result<Self> {
|
||||||
let config = StreamableHttpClientTransportConfig::with_uri(mcp_endpoint_url.to_string());
|
let transport = StreamableHttpClientTransport::from_uri(mcp_endpoint_url.to_string());
|
||||||
let transport = StreamableHttpClientTransport::with_client(CustomHttpClient::new(), config);
|
|
||||||
let running = DummyClientHandler
|
|
||||||
::default()
|
|
||||||
.serve(transport)
|
|
||||||
.await
|
|
||||||
.map_err(|e| AppError::Configuration(format!("Fail to init MCP service: {e:?}")))?;
|
|
||||||
Ok(Self { service: running })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn connect_with_bearer(mcp_endpoint_url: &str, bearer_token: &str) -> Result<Self> {
|
|
||||||
let config = StreamableHttpClientTransportConfig::with_uri(mcp_endpoint_url.to_string())
|
|
||||||
.auth_header(bearer_token.to_string());
|
|
||||||
let transport = StreamableHttpClientTransport::with_client(CustomHttpClient::new(), config);
|
|
||||||
let running = DummyClientHandler
|
let running = DummyClientHandler
|
||||||
::default()
|
::default()
|
||||||
.serve(transport)
|
.serve(transport)
|
||||||
@ -70,12 +55,6 @@ impl AvClient {
|
|||||||
}
|
}
|
||||||
Ok(Value::Null)
|
Ok(Value::Null)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn list_tools(&self) -> Result<Vec<rmcp::model::Tool>> {
|
|
||||||
let result = self.service.list_tools(None).await
|
|
||||||
.map_err(|e| AppError::Configuration(format!("MCP list_tools error: {e:?}")))?;
|
|
||||||
Ok(result.tools)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -40,12 +40,15 @@ async fn poll_and_update_config(state: &AppState) -> Result<()> {
|
|||||||
});
|
});
|
||||||
|
|
||||||
if let Some(config) = alphavantage_config {
|
if let Some(config) = alphavantage_config {
|
||||||
let api_key = config.api_key.clone().map(SecretString::from);
|
if let Some(api_key) = &config.api_key {
|
||||||
let api_url = config.api_url.clone();
|
state.update_provider(Some(SecretString::from(api_key.clone()))).await;
|
||||||
state.update_provider(api_key, api_url).await;
|
info!("Successfully updated Alphavantage provider with new configuration.");
|
||||||
info!("Successfully updated Alphavantage provider with new configuration.");
|
} else {
|
||||||
|
state.update_provider(None).await;
|
||||||
|
info!("Alphavantage provider is enabled but API key is missing. Service is degraded.");
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
state.update_provider(None, None).await;
|
state.update_provider(None).await;
|
||||||
info!("No enabled Alphavantage configuration found. Service is degraded.");
|
info!("No enabled Alphavantage configuration found. Service is degraded.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,4 @@
|
|||||||
mod api;
|
mod api;
|
||||||
mod api_test;
|
|
||||||
mod config;
|
mod config;
|
||||||
mod error;
|
mod error;
|
||||||
mod mapping;
|
mod mapping;
|
||||||
@ -9,7 +8,6 @@ mod state;
|
|||||||
mod worker;
|
mod worker;
|
||||||
mod av_client;
|
mod av_client;
|
||||||
mod config_poller;
|
mod config_poller;
|
||||||
mod transport;
|
|
||||||
|
|
||||||
use crate::config::AppConfig;
|
use crate::config::AppConfig;
|
||||||
use crate::error::Result;
|
use crate::error::Result;
|
||||||
|
|||||||
@ -6,7 +6,6 @@ use secrecy::{ExposeSecret, SecretString};
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub enum ServiceOperationalStatus {
|
pub enum ServiceOperationalStatus {
|
||||||
@ -42,46 +41,32 @@ impl AppState {
|
|||||||
self.av_provider.read().await.clone()
|
self.av_provider.read().await.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn update_provider(&self, api_key: Option<SecretString>, api_url: Option<String>) {
|
pub async fn update_provider(&self, api_key: Option<SecretString>) {
|
||||||
let mut provider_guard = self.av_provider.write().await;
|
let mut provider_guard = self.av_provider.write().await;
|
||||||
let mut status_guard = self.status.write().await;
|
let mut status_guard = self.status.write().await;
|
||||||
|
|
||||||
match (api_key, api_url) {
|
if let Some(key) = api_key {
|
||||||
(Some(key), Some(base_url)) => {
|
let mcp_endpoint = format!(
|
||||||
if base_url.contains('?') {
|
"https://mcp.alphavantage.co/mcp?apikey={}",
|
||||||
|
key.expose_secret()
|
||||||
|
);
|
||||||
|
match AvClient::connect(&mcp_endpoint).await {
|
||||||
|
Ok(new_provider) => {
|
||||||
|
*provider_guard = Some(Arc::new(new_provider));
|
||||||
|
*status_guard = ServiceOperationalStatus::Active;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
*provider_guard = None;
|
*provider_guard = None;
|
||||||
*status_guard = ServiceOperationalStatus::Degraded {
|
*status_guard = ServiceOperationalStatus::Degraded {
|
||||||
reason: "Configured MCP endpoint must not contain query parameters.".to_string(),
|
reason: format!("Failed to connect to Alphavantage: {}", e),
|
||||||
};
|
};
|
||||||
return;
|
|
||||||
}
|
|
||||||
let mcp_endpoint = format!("{}?apikey={}", base_url, key.expose_secret());
|
|
||||||
info!("Initializing Alphavantage MCP provider with endpoint: {}", mcp_endpoint);
|
|
||||||
match AvClient::connect(&mcp_endpoint).await {
|
|
||||||
Ok(new_provider) => {
|
|
||||||
*provider_guard = Some(Arc::new(new_provider));
|
|
||||||
*status_guard = ServiceOperationalStatus::Active;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
*provider_guard = None;
|
|
||||||
*status_guard = ServiceOperationalStatus::Degraded {
|
|
||||||
reason: format!("Failed to connect to Alphavantage: {}", e),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(None, _) => {
|
} else {
|
||||||
*provider_guard = None;
|
*provider_guard = None;
|
||||||
*status_guard = ServiceOperationalStatus::Degraded {
|
*status_guard = ServiceOperationalStatus::Degraded {
|
||||||
reason: "Alphavantage API Key is not configured.".to_string(),
|
reason: "Alphavantage API Key is not configured.".to_string(),
|
||||||
};
|
};
|
||||||
}
|
|
||||||
(_, None) => {
|
|
||||||
*provider_guard = None;
|
|
||||||
*status_guard = ServiceOperationalStatus::Degraded {
|
|
||||||
reason: "Alphavantage MCP endpoint is not configured.".to_string(),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,179 +0,0 @@
|
|||||||
use std::{borrow::Cow, sync::Arc};
|
|
||||||
|
|
||||||
use futures::{stream::BoxStream, StreamExt};
|
|
||||||
use reqwest::header::{ACCEPT, CONTENT_TYPE};
|
|
||||||
use rmcp::{
|
|
||||||
model::ClientJsonRpcMessage,
|
|
||||||
model::ServerJsonRpcMessage,
|
|
||||||
transport::streamable_http_client::{
|
|
||||||
AuthRequiredError, SseError, StreamableHttpClient, StreamableHttpError,
|
|
||||||
StreamableHttpPostResponse,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
use sse_stream::{Sse, SseStream};
|
|
||||||
|
|
||||||
// Redefine constants as they are not public in rmcp
|
|
||||||
const HEADER_SESSION_ID: &str = "Mcp-Session-Id";
|
|
||||||
const HEADER_LAST_EVENT_ID: &str = "Last-Event-Id";
|
|
||||||
const EVENT_STREAM_MIME_TYPE: &str = "text/event-stream";
|
|
||||||
const JSON_MIME_TYPE: &str = "application/json";
|
|
||||||
const WWW_AUTHENTICATE: &str = "www-authenticate";
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct CustomHttpClient {
|
|
||||||
client: reqwest::Client,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for CustomHttpClient {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
client: reqwest::Client::new(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CustomHttpClient {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self::default()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl StreamableHttpClient for CustomHttpClient {
|
|
||||||
type Error = reqwest::Error;
|
|
||||||
|
|
||||||
async fn get_stream(
|
|
||||||
&self,
|
|
||||||
uri: Arc<str>,
|
|
||||||
session_id: Arc<str>,
|
|
||||||
last_event_id: Option<String>,
|
|
||||||
auth_token: Option<String>,
|
|
||||||
) -> Result<BoxStream<'static, Result<Sse, SseError>>, StreamableHttpError<Self::Error>> {
|
|
||||||
let mut request_builder = self
|
|
||||||
.client
|
|
||||||
.get(uri.as_ref())
|
|
||||||
.header(ACCEPT, EVENT_STREAM_MIME_TYPE)
|
|
||||||
.header(HEADER_SESSION_ID, session_id.as_ref());
|
|
||||||
if let Some(last_event_id) = last_event_id {
|
|
||||||
request_builder = request_builder.header(HEADER_LAST_EVENT_ID, last_event_id);
|
|
||||||
}
|
|
||||||
if let Some(auth_header) = auth_token {
|
|
||||||
request_builder = request_builder.bearer_auth(auth_header);
|
|
||||||
}
|
|
||||||
|
|
||||||
let response = request_builder.send().await?;
|
|
||||||
|
|
||||||
// --- CUSTOM LOGIC START ---
|
|
||||||
// If we get 400 Bad Request or 405 Method Not Allowed, we assume the server
|
|
||||||
// does not support SSE on this endpoint and degrade gracefully.
|
|
||||||
if response.status() == reqwest::StatusCode::METHOD_NOT_ALLOWED
|
|
||||||
|| response.status() == reqwest::StatusCode::BAD_REQUEST {
|
|
||||||
tracing::debug!("Server returned {}, assuming SSE not supported", response.status());
|
|
||||||
return Err(StreamableHttpError::ServerDoesNotSupportSse);
|
|
||||||
}
|
|
||||||
// --- CUSTOM LOGIC END ---
|
|
||||||
|
|
||||||
let response = response.error_for_status()?;
|
|
||||||
match response.headers().get(CONTENT_TYPE) {
|
|
||||||
Some(ct) => {
|
|
||||||
if !ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) {
|
|
||||||
return Err(StreamableHttpError::UnexpectedContentType(Some(
|
|
||||||
String::from_utf8_lossy(ct.as_bytes()).to_string(),
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
return Err(StreamableHttpError::UnexpectedContentType(None));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let event_stream = SseStream::from_byte_stream(response.bytes_stream()).boxed();
|
|
||||||
Ok(event_stream)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn delete_session(
|
|
||||||
&self,
|
|
||||||
uri: Arc<str>,
|
|
||||||
session: Arc<str>,
|
|
||||||
auth_token: Option<String>,
|
|
||||||
) -> Result<(), StreamableHttpError<Self::Error>> {
|
|
||||||
let mut request_builder = self.client.delete(uri.as_ref());
|
|
||||||
if let Some(auth_header) = auth_token {
|
|
||||||
request_builder = request_builder.bearer_auth(auth_header);
|
|
||||||
}
|
|
||||||
let response = request_builder
|
|
||||||
.header(HEADER_SESSION_ID, session.as_ref())
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
if response.status() == reqwest::StatusCode::METHOD_NOT_ALLOWED {
|
|
||||||
tracing::debug!("this server doesn't support deleting session");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
let _response = response.error_for_status()?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn post_message(
|
|
||||||
&self,
|
|
||||||
uri: Arc<str>,
|
|
||||||
message: ClientJsonRpcMessage,
|
|
||||||
session_id: Option<Arc<str>>,
|
|
||||||
auth_token: Option<String>,
|
|
||||||
) -> Result<StreamableHttpPostResponse, StreamableHttpError<Self::Error>> {
|
|
||||||
let mut request = self
|
|
||||||
.client
|
|
||||||
.post(uri.as_ref())
|
|
||||||
.header(ACCEPT, [EVENT_STREAM_MIME_TYPE, JSON_MIME_TYPE].join(", "));
|
|
||||||
if let Some(auth_header) = auth_token {
|
|
||||||
request = request.bearer_auth(auth_header);
|
|
||||||
}
|
|
||||||
if let Some(session_id) = session_id {
|
|
||||||
request = request.header(HEADER_SESSION_ID, session_id.as_ref());
|
|
||||||
}
|
|
||||||
let response = request.json(&message).send().await?;
|
|
||||||
if response.status() == reqwest::StatusCode::UNAUTHORIZED {
|
|
||||||
if let Some(header) = response.headers().get(WWW_AUTHENTICATE) {
|
|
||||||
let header = header
|
|
||||||
.to_str()
|
|
||||||
.map_err(|_| {
|
|
||||||
StreamableHttpError::UnexpectedServerResponse(Cow::from(
|
|
||||||
"invalid www-authenticate header value",
|
|
||||||
))
|
|
||||||
})?
|
|
||||||
.to_string();
|
|
||||||
return Err(StreamableHttpError::AuthRequired(AuthRequiredError {
|
|
||||||
www_authenticate_header: header,
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let status = response.status();
|
|
||||||
let response = response.error_for_status()?;
|
|
||||||
if matches!(
|
|
||||||
status,
|
|
||||||
reqwest::StatusCode::ACCEPTED | reqwest::StatusCode::NO_CONTENT
|
|
||||||
) {
|
|
||||||
return Ok(StreamableHttpPostResponse::Accepted);
|
|
||||||
}
|
|
||||||
let content_type = response.headers().get(CONTENT_TYPE);
|
|
||||||
let session_id = response.headers().get(HEADER_SESSION_ID);
|
|
||||||
let session_id = session_id
|
|
||||||
.and_then(|v| v.to_str().ok())
|
|
||||||
.map(|s| s.to_string());
|
|
||||||
match content_type {
|
|
||||||
Some(ct) if ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) => {
|
|
||||||
let event_stream = SseStream::from_byte_stream(response.bytes_stream()).boxed();
|
|
||||||
Ok(StreamableHttpPostResponse::Sse(event_stream, session_id))
|
|
||||||
}
|
|
||||||
Some(ct) if ct.as_bytes().starts_with(JSON_MIME_TYPE.as_bytes()) => {
|
|
||||||
let message: ServerJsonRpcMessage = response.json().await?;
|
|
||||||
Ok(StreamableHttpPostResponse::Json(message, session_id))
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
tracing::error!("unexpected content type: {:?}", content_type);
|
|
||||||
Err(StreamableHttpError::UnexpectedContentType(
|
|
||||||
content_type.map(|ct| String::from_utf8_lossy(ct.as_bytes()).to_string()),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@ -66,7 +66,6 @@ fn create_v1_router() -> Router<AppState> {
|
|||||||
"/configs/data_sources",
|
"/configs/data_sources",
|
||||||
get(get_data_sources_config).put(update_data_sources_config),
|
get(get_data_sources_config).put(update_data_sources_config),
|
||||||
)
|
)
|
||||||
.route("/configs/test", post(test_data_source_config))
|
|
||||||
// --- New Discover Routes ---
|
// --- New Discover Routes ---
|
||||||
.route("/discover-models/{provider_id}", get(discover_models))
|
.route("/discover-models/{provider_id}", get(discover_models))
|
||||||
.route("/discover-models", post(discover_models_preview))
|
.route("/discover-models", post(discover_models_preview))
|
||||||
@ -209,71 +208,6 @@ async fn get_task_progress(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// --- New Config Test Handler ---
|
|
||||||
|
|
||||||
#[derive(Deserialize, Debug)]
|
|
||||||
struct TestConfigRequest {
|
|
||||||
r#type: String,
|
|
||||||
#[serde(flatten)]
|
|
||||||
data: serde_json::Value,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// [POST /v1/configs/test]
|
|
||||||
/// Forwards a configuration test request to the appropriate downstream service.
|
|
||||||
async fn test_data_source_config(
|
|
||||||
State(state): State<AppState>,
|
|
||||||
Json(payload): Json<TestConfigRequest>,
|
|
||||||
) -> Result<impl IntoResponse> {
|
|
||||||
info!("test_data_source_config: type={}", payload.r#type);
|
|
||||||
|
|
||||||
let target_service_url = match payload.r#type.as_str() {
|
|
||||||
"tushare" => state.config.provider_services.iter().find(|s| s.contains("tushare")),
|
|
||||||
"finnhub" => state.config.provider_services.iter().find(|s| s.contains("finnhub")),
|
|
||||||
"alphavantage" => state.config.provider_services.iter().find(|s| s.contains("alphavantage")),
|
|
||||||
_ => {
|
|
||||||
return Ok((
|
|
||||||
StatusCode::BAD_REQUEST,
|
|
||||||
Json(serde_json::json!({ "error": "Unsupported config type" })),
|
|
||||||
).into_response());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(base_url) = target_service_url {
|
|
||||||
let client = reqwest::Client::new();
|
|
||||||
let target_url = format!("{}/test", base_url.trim_end_matches('/'));
|
|
||||||
info!("Forwarding test request for '{}' to {}", payload.r#type, target_url);
|
|
||||||
|
|
||||||
let response = client
|
|
||||||
.post(&target_url)
|
|
||||||
.json(&payload.data)
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
if !response.status().is_success() {
|
|
||||||
let status = response.status();
|
|
||||||
let error_text = response.text().await?;
|
|
||||||
warn!("Downstream test for '{}' failed: status={} body={}", payload.r#type, status, error_text);
|
|
||||||
return Ok((
|
|
||||||
StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::BAD_GATEWAY),
|
|
||||||
Json(serde_json::json!({
|
|
||||||
"error": "Downstream service returned an error",
|
|
||||||
"details": error_text,
|
|
||||||
})),
|
|
||||||
).into_response());
|
|
||||||
}
|
|
||||||
|
|
||||||
let response_json: serde_json::Value = response.json().await?;
|
|
||||||
Ok((StatusCode::OK, Json(response_json)).into_response())
|
|
||||||
} else {
|
|
||||||
warn!("No downstream service found for config type: {}", payload.r#type);
|
|
||||||
Ok((
|
|
||||||
StatusCode::NOT_IMPLEMENTED,
|
|
||||||
Json(serde_json::json!({ "error": "No downstream service configured for this type" })),
|
|
||||||
).into_response())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// --- Config API Handlers (Proxy to data-persistence-service) ---
|
// --- Config API Handlers (Proxy to data-persistence-service) ---
|
||||||
|
|
||||||
use common_contracts::config_models::{
|
use common_contracts::config_models::{
|
||||||
|
|||||||
@ -1,8 +1,6 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use axum::{routing::{get, post}, Router, extract::State, response::{Json, IntoResponse}, http::StatusCode};
|
use axum::{routing::get, Router, extract::State, response::Json};
|
||||||
use serde::Deserialize;
|
|
||||||
use secrecy::ExposeSecret;
|
|
||||||
use crate::ts_client::TushareClient;
|
|
||||||
use crate::state::{AppState, ServiceOperationalStatus};
|
use crate::state::{AppState, ServiceOperationalStatus};
|
||||||
use common_contracts::observability::{HealthStatus, ServiceStatus};
|
use common_contracts::observability::{HealthStatus, ServiceStatus};
|
||||||
|
|
||||||
@ -10,63 +8,9 @@ pub fn create_router(app_state: AppState) -> Router {
|
|||||||
Router::new()
|
Router::new()
|
||||||
.route("/health", get(health_check))
|
.route("/health", get(health_check))
|
||||||
.route("/tasks", get(get_tasks))
|
.route("/tasks", get(get_tasks))
|
||||||
.route("/test", post(test_connection))
|
|
||||||
.with_state(app_state)
|
.with_state(app_state)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
struct TestRequest {
|
|
||||||
api_key: Option<String>,
|
|
||||||
api_url: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn test_connection(
|
|
||||||
State(state): State<AppState>,
|
|
||||||
Json(payload): Json<TestRequest>,
|
|
||||||
) -> impl IntoResponse {
|
|
||||||
let api_url = payload.api_url
|
|
||||||
.filter(|s| !s.is_empty())
|
|
||||||
.unwrap_or_else(|| state.config.tushare_api_url.clone());
|
|
||||||
|
|
||||||
let api_key = if let Some(k) = payload.api_key.filter(|s| !s.is_empty()) {
|
|
||||||
k
|
|
||||||
} else if let Some(k) = &state.config.tushare_api_token {
|
|
||||||
k.expose_secret().clone()
|
|
||||||
} else {
|
|
||||||
return (
|
|
||||||
StatusCode::BAD_REQUEST,
|
|
||||||
Json(serde_json::json!({
|
|
||||||
"success": false,
|
|
||||||
"message": "No API Key provided or configured"
|
|
||||||
}))
|
|
||||||
).into_response();
|
|
||||||
};
|
|
||||||
|
|
||||||
let client = TushareClient::new(api_url, api_key);
|
|
||||||
|
|
||||||
// Try to fetch a small amount of data to verify the token
|
|
||||||
match client.send_request::<serde_json::Value>(
|
|
||||||
"stock_basic",
|
|
||||||
serde_json::json!({"limit": 1}),
|
|
||||||
""
|
|
||||||
).await {
|
|
||||||
Ok(_) => (
|
|
||||||
StatusCode::OK,
|
|
||||||
Json(serde_json::json!({
|
|
||||||
"success": true,
|
|
||||||
"message": "Connection successful"
|
|
||||||
}))
|
|
||||||
).into_response(),
|
|
||||||
Err(e) => (
|
|
||||||
StatusCode::BAD_REQUEST,
|
|
||||||
Json(serde_json::json!({
|
|
||||||
"success": false,
|
|
||||||
"message": format!("Connection failed: {}", e)
|
|
||||||
}))
|
|
||||||
).into_response(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn health_check(State(state): State<AppState>) -> Json<HealthStatus> {
|
async fn health_check(State(state): State<AppState>) -> Json<HealthStatus> {
|
||||||
let mut details = HashMap::new();
|
let mut details = HashMap::new();
|
||||||
let operational_status = state.status.read().await;
|
let operational_status = state.status.read().await;
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user