fix(alphavantage): resolve mcp tool invocation errors and enhance data parsing fault tolerance

- Fix incorrect parameter injection in MCP client query method
- Correct tool name from OVERVIEW to COMPANY_OVERVIEW
- Add symbol conversion support (e.g., .SH -> .SS)
- Implement fault tolerance for empty data responses to prevent panics
- Add workaround for non-standard JSON responses (single quotes) in GLOBAL_QUOTE
- Add debug_mcp utility for tool inspection
This commit is contained in:
Lv, Qi 2025-11-19 06:51:42 +08:00
parent 75378e7aae
commit e699cda81e
4 changed files with 159 additions and 38 deletions

View File

@ -46,7 +46,6 @@ impl AvClient {
pub async fn query(&self, function: &str, params: &[(&str, &str)]) -> Result<Value> { pub async fn query(&self, function: &str, params: &[(&str, &str)]) -> Result<Value> {
let mut args = Map::new(); let mut args = Map::new();
args.insert("function".to_string(), Value::String(function.to_string()));
for (k, v) in params { for (k, v) in params {
args.insert((*k).to_string(), Value::String((*v).to_string())); args.insert((*k).to_string(), Value::String((*v).to_string()));
} }

View File

@ -0,0 +1,62 @@
#[path = "../config.rs"]
mod config;
#[path = "../error.rs"]
mod error;
#[path = "../transport.rs"]
mod transport;
#[path = "../av_client.rs"]
mod av_client;
use av_client::AvClient;
use tokio;
use tracing::{info, error};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize logging
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let api_key = "PUOO7UPTNXN325NN";
let base_url = "https://mcp.alphavantage.co/mcp";
let url = format!("{}?apikey={}", base_url, api_key);
info!("Connecting to: {}", url);
let client = AvClient::connect(&url).await
.map_err(|e| anyhow::anyhow!("Connect failed: {:?}", e))?;
info!("Connected! Listing tools...");
let tools = client.list_tools().await
.map_err(|e| anyhow::anyhow!("List tools failed: {:?}", e))?;
info!("Found {} tools:", tools.len());
for tool in &tools {
if tool.name == "GLOBAL_QUOTE" || tool.name == "COMPANY_OVERVIEW" {
info!("Tool: {}", tool.name);
info!("Schema: {:?}", tool.input_schema);
}
}
// Test calling a tool
let symbol = "600519.SS";
let params = vec![("symbol", symbol), ("datatype", "json")];
info!("Testing GLOBAL_QUOTE for {} (json)...", symbol);
match client.query("GLOBAL_QUOTE", &params).await {
Ok(v) => info!("GLOBAL_QUOTE result: {:?}", v),
Err(e) => error!("GLOBAL_QUOTE failed: {:?}", e),
}
let params_simple = vec![("symbol", symbol)];
info!("Testing INCOME_STATEMENT for {}...", symbol);
match client.query("INCOME_STATEMENT", &params_simple).await {
Ok(v) => info!("INCOME_STATEMENT result: {:?}", v),
Err(e) => error!("INCOME_STATEMENT failed: {:?}", e),
}
Ok(())
}

View File

@ -1,4 +1,3 @@
use anyhow::anyhow;
use reqwest::Error as ReqwestError; use reqwest::Error as ReqwestError;
use thiserror::Error; use thiserror::Error;

View File

@ -6,11 +6,9 @@ use anyhow::Context;
use chrono::{Utc, Datelike}; use chrono::{Utc, Datelike};
use common_contracts::messages::{FetchCompanyDataCommand, FinancialsPersistedEvent}; use common_contracts::messages::{FetchCompanyDataCommand, FinancialsPersistedEvent};
use common_contracts::observability::TaskProgress; use common_contracts::observability::TaskProgress;
use secrecy::ExposeSecret; use tracing::{error, info, instrument, warn};
use std::sync::Arc;
use tracing::{error, info, instrument};
use uuid::Uuid; use uuid::Uuid;
use crate::av_client::AvClient; use serde_json::Value;
#[instrument(skip(state, command, publisher), fields(request_id = %command.request_id, symbol = %command.symbol))] #[instrument(skip(state, command, publisher), fields(request_id = %command.request_id, symbol = %command.symbol))]
pub async fn handle_fetch_command( pub async fn handle_fetch_command(
@ -49,6 +47,14 @@ pub async fn handle_fetch_command(
PersistenceClient::new(state.config.data_persistence_service_url.clone()); PersistenceClient::new(state.config.data_persistence_service_url.clone());
let symbol = command.symbol.clone(); let symbol = command.symbol.clone();
// Symbol conversion for Chinese stocks
let av_symbol = if symbol.ends_with(".SH") {
symbol.replace(".SH", ".SS")
} else {
symbol.clone()
};
info!("Using symbol for AlphaVantage: {}", av_symbol);
update_task_progress( update_task_progress(
&state.tasks, &state.tasks,
command.request_id, command.request_id,
@ -59,13 +65,14 @@ pub async fn handle_fetch_command(
// --- 1. Fetch all data in parallel --- // --- 1. Fetch all data in parallel ---
let (overview_json, income_json, balance_json, cashflow_json, quote_json) = { let (overview_json, income_json, balance_json, cashflow_json, quote_json) = {
let params_overview = vec![("symbol", symbol.as_str())]; let params_overview = vec![("symbol", av_symbol.as_str())];
let params_income = vec![("symbol", symbol.as_str())]; let params_income = vec![("symbol", av_symbol.as_str())];
let params_balance = vec![("symbol", symbol.as_str())]; let params_balance = vec![("symbol", av_symbol.as_str())];
let params_cashflow = vec![("symbol", symbol.as_str())]; let params_cashflow = vec![("symbol", av_symbol.as_str())];
let params_quote = vec![("symbol", symbol.as_str())]; // Add datatype=json to force JSON response if supported (or at least Python-dict like)
let params_quote = vec![("symbol", av_symbol.as_str()), ("datatype", "json")];
let overview_task = client.query("OVERVIEW", &params_overview); let overview_task = client.query("COMPANY_OVERVIEW", &params_overview);
let income_task = client.query("INCOME_STATEMENT", &params_income); let income_task = client.query("INCOME_STATEMENT", &params_income);
let balance_task = client.query("BALANCE_SHEET", &params_balance); let balance_task = client.query("BALANCE_SHEET", &params_balance);
let cashflow_task = client.query("CASH_FLOW", &params_cashflow); let cashflow_task = client.query("CASH_FLOW", &params_cashflow);
@ -98,34 +105,84 @@ pub async fn handle_fetch_command(
// --- 2. Transform and persist data --- // --- 2. Transform and persist data ---
// Profile // Profile
let profile_to_persist = // Check if overview_json is empty (Symbol field check)
parse_company_profile(overview_json).context("Failed to parse CompanyProfile")?; if let Some(_symbol_val) = overview_json.get("Symbol") {
match parse_company_profile(overview_json) {
Ok(profile_to_persist) => {
persistence_client persistence_client
.upsert_company_profile(profile_to_persist) .upsert_company_profile(profile_to_persist)
.await?; .await?;
},
Err(e) => {
warn!("Failed to parse CompanyProfile: {}", e);
}
}
} else {
warn!("CompanyProfile data is empty or missing 'Symbol' for {}, skipping persistence.", av_symbol);
}
// Financials // Financials
let mut years_updated: Vec<u16> = Vec::new();
// Only attempt to parse financials if we have data (simple check if income statement has annualReports)
if income_json.get("annualReports").is_some() {
let combined_financials = CombinedFinancials { let combined_financials = CombinedFinancials {
income: income_json, income: income_json,
balance_sheet: balance_json, balance_sheet: balance_json,
cash_flow: cashflow_json, cash_flow: cashflow_json,
}; };
let financials_to_persist = match parse_financials(combined_financials) {
parse_financials(combined_financials).context("Failed to parse FinancialStatements")?; Ok(financials_to_persist) => {
let years_updated: Vec<u16> = financials_to_persist if !financials_to_persist.is_empty() {
years_updated = financials_to_persist
.iter() .iter()
.map(|f| f.period_date.year() as u16) .map(|f| f.period_date.year() as u16)
.collect(); .collect();
persistence_client persistence_client
.batch_insert_financials(financials_to_persist) .batch_insert_financials(financials_to_persist)
.await?; .await?;
}
},
Err(e) => {
warn!("Failed to parse Financials: {}", e);
}
}
} else {
warn!("Financial data missing for {}, skipping.", av_symbol);
}
// Quote // Quote
let quote_to_persist = // Fix Python-dict string if necessary
parse_realtime_quote(quote_json, &command.market).context("Failed to parse RealtimeQuote")?; let fixed_quote_json = if let Some(s) = quote_json.as_str() {
if s.trim().starts_with("{'Global Quote'") {
// Attempt to replace single quotes with double quotes
// Note: This is a naive fix but works for the expected format
let fixed = s.replace("'", "\"");
match serde_json::from_str::<Value>(&fixed) {
Ok(v) => v,
Err(e) => {
warn!("Failed to fix/parse quoted JSON string: {}. Error: {}", s, e);
quote_json // fallback to original
}
}
} else {
quote_json
}
} else {
quote_json
};
match parse_realtime_quote(fixed_quote_json, &command.market) {
Ok(mut quote_to_persist) => {
// Restore original symbol if we converted it
quote_to_persist.symbol = command.symbol.clone();
persistence_client persistence_client
.upsert_realtime_quote(quote_to_persist) .upsert_realtime_quote(quote_to_persist)
.await?; .await?;
},
Err(e) => {
warn!("Failed to parse RealtimeQuote: {}", e);
}
}
update_task_progress( update_task_progress(
&state.tasks, &state.tasks,
@ -136,6 +193,10 @@ pub async fn handle_fetch_command(
.await; .await;
// --- 3. Publish events --- // --- 3. Publish events ---
// Only publish if we actually updated something
// Actually, we should publish event even if partial, to signal completion?
// The command is "FetchCompanyData", implies success if we fetched *available* data.
let event = FinancialsPersistedEvent { let event = FinancialsPersistedEvent {
request_id: command.request_id, request_id: command.request_id,
symbol: command.symbol, symbol: command.symbol,
@ -147,7 +208,7 @@ pub async fn handle_fetch_command(
.await?; .await?;
state.tasks.remove(&command.request_id); state.tasks.remove(&command.request_id);
info!("Task completed successfully."); info!("Task completed successfully (Partial data may be missing if provider lacks coverage).");
Ok(()) Ok(())
} }