From e699cda81e35aa840902c2e8ac5dc595821e78b7 Mon Sep 17 00:00:00 2001 From: "Lv, Qi" Date: Wed, 19 Nov 2025 06:51:42 +0800 Subject: [PATCH] fix(alphavantage): resolve mcp tool invocation errors and enhance data parsing fault tolerance - Fix incorrect parameter injection in MCP client query method - Correct tool name from OVERVIEW to COMPANY_OVERVIEW - Add symbol conversion support (e.g., .SH -> .SS) - Implement fault tolerance for empty data responses to prevent panics - Add workaround for non-standard JSON responses (single quotes) in GLOBAL_QUOTE - Add debug_mcp utility for tool inspection --- .../src/av_client.rs | 1 - .../src/bin/debug_mcp.rs | 62 ++++++++ .../src/error.rs | 1 - .../src/worker.rs | 133 +++++++++++++----- 4 files changed, 159 insertions(+), 38 deletions(-) create mode 100644 services/alphavantage-provider-service/src/bin/debug_mcp.rs diff --git a/services/alphavantage-provider-service/src/av_client.rs b/services/alphavantage-provider-service/src/av_client.rs index 68f9811..415f861 100644 --- a/services/alphavantage-provider-service/src/av_client.rs +++ b/services/alphavantage-provider-service/src/av_client.rs @@ -46,7 +46,6 @@ impl AvClient { pub async fn query(&self, function: &str, params: &[(&str, &str)]) -> Result { let mut args = Map::new(); - args.insert("function".to_string(), Value::String(function.to_string())); for (k, v) in params { args.insert((*k).to_string(), Value::String((*v).to_string())); } diff --git a/services/alphavantage-provider-service/src/bin/debug_mcp.rs b/services/alphavantage-provider-service/src/bin/debug_mcp.rs new file mode 100644 index 0000000..3bf6afc --- /dev/null +++ b/services/alphavantage-provider-service/src/bin/debug_mcp.rs @@ -0,0 +1,62 @@ +#[path = "../config.rs"] +mod config; +#[path = "../error.rs"] +mod error; +#[path = "../transport.rs"] +mod transport; +#[path = "../av_client.rs"] +mod av_client; + +use av_client::AvClient; +use tokio; +use tracing::{info, error}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // Initialize logging + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let api_key = "PUOO7UPTNXN325NN"; + let base_url = "https://mcp.alphavantage.co/mcp"; + let url = format!("{}?apikey={}", base_url, api_key); + + info!("Connecting to: {}", url); + + let client = AvClient::connect(&url).await + .map_err(|e| anyhow::anyhow!("Connect failed: {:?}", e))?; + + info!("Connected! Listing tools..."); + + let tools = client.list_tools().await + .map_err(|e| anyhow::anyhow!("List tools failed: {:?}", e))?; + + info!("Found {} tools:", tools.len()); + for tool in &tools { + if tool.name == "GLOBAL_QUOTE" || tool.name == "COMPANY_OVERVIEW" { + info!("Tool: {}", tool.name); + info!("Schema: {:?}", tool.input_schema); + } + } + + // Test calling a tool + let symbol = "600519.SS"; + let params = vec![("symbol", symbol), ("datatype", "json")]; + + info!("Testing GLOBAL_QUOTE for {} (json)...", symbol); + match client.query("GLOBAL_QUOTE", ¶ms).await { + Ok(v) => info!("GLOBAL_QUOTE result: {:?}", v), + Err(e) => error!("GLOBAL_QUOTE failed: {:?}", e), + } + + let params_simple = vec![("symbol", symbol)]; + info!("Testing INCOME_STATEMENT for {}...", symbol); + match client.query("INCOME_STATEMENT", ¶ms_simple).await { + Ok(v) => info!("INCOME_STATEMENT result: {:?}", v), + Err(e) => error!("INCOME_STATEMENT failed: {:?}", e), + } + + Ok(()) +} + diff --git a/services/alphavantage-provider-service/src/error.rs b/services/alphavantage-provider-service/src/error.rs index e5aa8a9..bd4a2d0 100644 --- a/services/alphavantage-provider-service/src/error.rs +++ b/services/alphavantage-provider-service/src/error.rs @@ -1,4 +1,3 @@ -use anyhow::anyhow; use reqwest::Error as ReqwestError; use thiserror::Error; diff --git a/services/alphavantage-provider-service/src/worker.rs b/services/alphavantage-provider-service/src/worker.rs index e919112..1895474 100644 --- a/services/alphavantage-provider-service/src/worker.rs +++ b/services/alphavantage-provider-service/src/worker.rs @@ -6,11 +6,9 @@ use anyhow::Context; use chrono::{Utc, Datelike}; use common_contracts::messages::{FetchCompanyDataCommand, FinancialsPersistedEvent}; use common_contracts::observability::TaskProgress; -use secrecy::ExposeSecret; -use std::sync::Arc; -use tracing::{error, info, instrument}; +use tracing::{error, info, instrument, warn}; use uuid::Uuid; -use crate::av_client::AvClient; +use serde_json::Value; #[instrument(skip(state, command, publisher), fields(request_id = %command.request_id, symbol = %command.symbol))] pub async fn handle_fetch_command( @@ -49,6 +47,14 @@ pub async fn handle_fetch_command( PersistenceClient::new(state.config.data_persistence_service_url.clone()); let symbol = command.symbol.clone(); + // Symbol conversion for Chinese stocks + let av_symbol = if symbol.ends_with(".SH") { + symbol.replace(".SH", ".SS") + } else { + symbol.clone() + }; + info!("Using symbol for AlphaVantage: {}", av_symbol); + update_task_progress( &state.tasks, command.request_id, @@ -59,13 +65,14 @@ pub async fn handle_fetch_command( // --- 1. Fetch all data in parallel --- let (overview_json, income_json, balance_json, cashflow_json, quote_json) = { - let params_overview = vec![("symbol", symbol.as_str())]; - let params_income = vec![("symbol", symbol.as_str())]; - let params_balance = vec![("symbol", symbol.as_str())]; - let params_cashflow = vec![("symbol", symbol.as_str())]; - let params_quote = vec![("symbol", symbol.as_str())]; - - let overview_task = client.query("OVERVIEW", ¶ms_overview); + let params_overview = vec![("symbol", av_symbol.as_str())]; + let params_income = vec![("symbol", av_symbol.as_str())]; + let params_balance = vec![("symbol", av_symbol.as_str())]; + let params_cashflow = vec![("symbol", av_symbol.as_str())]; + // Add datatype=json to force JSON response if supported (or at least Python-dict like) + let params_quote = vec![("symbol", av_symbol.as_str()), ("datatype", "json")]; + + let overview_task = client.query("COMPANY_OVERVIEW", ¶ms_overview); let income_task = client.query("INCOME_STATEMENT", ¶ms_income); let balance_task = client.query("BALANCE_SHEET", ¶ms_balance); let cashflow_task = client.query("CASH_FLOW", ¶ms_cashflow); @@ -98,34 +105,84 @@ pub async fn handle_fetch_command( // --- 2. Transform and persist data --- // Profile - let profile_to_persist = - parse_company_profile(overview_json).context("Failed to parse CompanyProfile")?; - persistence_client - .upsert_company_profile(profile_to_persist) - .await?; + // Check if overview_json is empty (Symbol field check) + if let Some(_symbol_val) = overview_json.get("Symbol") { + match parse_company_profile(overview_json) { + Ok(profile_to_persist) => { + persistence_client + .upsert_company_profile(profile_to_persist) + .await?; + }, + Err(e) => { + warn!("Failed to parse CompanyProfile: {}", e); + } + } + } else { + warn!("CompanyProfile data is empty or missing 'Symbol' for {}, skipping persistence.", av_symbol); + } // Financials - let combined_financials = CombinedFinancials { - income: income_json, - balance_sheet: balance_json, - cash_flow: cashflow_json, - }; - let financials_to_persist = - parse_financials(combined_financials).context("Failed to parse FinancialStatements")?; - let years_updated: Vec = financials_to_persist - .iter() - .map(|f| f.period_date.year() as u16) - .collect(); - persistence_client - .batch_insert_financials(financials_to_persist) - .await?; + let mut years_updated: Vec = Vec::new(); + // Only attempt to parse financials if we have data (simple check if income statement has annualReports) + if income_json.get("annualReports").is_some() { + let combined_financials = CombinedFinancials { + income: income_json, + balance_sheet: balance_json, + cash_flow: cashflow_json, + }; + match parse_financials(combined_financials) { + Ok(financials_to_persist) => { + if !financials_to_persist.is_empty() { + years_updated = financials_to_persist + .iter() + .map(|f| f.period_date.year() as u16) + .collect(); + persistence_client + .batch_insert_financials(financials_to_persist) + .await?; + } + }, + Err(e) => { + warn!("Failed to parse Financials: {}", e); + } + } + } else { + warn!("Financial data missing for {}, skipping.", av_symbol); + } // Quote - let quote_to_persist = - parse_realtime_quote(quote_json, &command.market).context("Failed to parse RealtimeQuote")?; - persistence_client - .upsert_realtime_quote(quote_to_persist) - .await?; + // Fix Python-dict string if necessary + let fixed_quote_json = if let Some(s) = quote_json.as_str() { + if s.trim().starts_with("{'Global Quote'") { + // Attempt to replace single quotes with double quotes + // Note: This is a naive fix but works for the expected format + let fixed = s.replace("'", "\""); + match serde_json::from_str::(&fixed) { + Ok(v) => v, + Err(e) => { + warn!("Failed to fix/parse quoted JSON string: {}. Error: {}", s, e); + quote_json // fallback to original + } + } + } else { + quote_json + } + } else { + quote_json + }; + + match parse_realtime_quote(fixed_quote_json, &command.market) { + Ok(mut quote_to_persist) => { + // Restore original symbol if we converted it + quote_to_persist.symbol = command.symbol.clone(); + persistence_client + .upsert_realtime_quote(quote_to_persist) + .await?; + }, + Err(e) => { + warn!("Failed to parse RealtimeQuote: {}", e); + } + } update_task_progress( &state.tasks, @@ -136,6 +193,10 @@ pub async fn handle_fetch_command( .await; // --- 3. Publish events --- + // Only publish if we actually updated something + // Actually, we should publish event even if partial, to signal completion? + // The command is "FetchCompanyData", implies success if we fetched *available* data. + let event = FinancialsPersistedEvent { request_id: command.request_id, symbol: command.symbol, @@ -147,7 +208,7 @@ pub async fn handle_fetch_command( .await?; state.tasks.remove(&command.request_id); - info!("Task completed successfully."); + info!("Task completed successfully (Partial data may be missing if provider lacks coverage)."); Ok(()) }