diff --git a/docker-compose.yml b/docker-compose.yml index bf8ca43..a93565d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -186,6 +186,9 @@ services: - data-persistence-service networks: - app-network + dns: + - 8.8.8.8 + - 8.8.4.4 healthcheck: test: ["CMD-SHELL", "curl -fsS http://localhost:8003/health >/dev/null || exit 1"] interval: 5s diff --git a/services/yfinance-provider-service/Cargo.lock b/services/yfinance-provider-service/Cargo.lock index fb08bbf..cb339bd 100644 --- a/services/yfinance-provider-service/Cargo.lock +++ b/services/yfinance-provider-service/Cargo.lock @@ -398,6 +398,35 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "cookie" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ddef33a339a91ea89fb53151bd0a4689cfce27055c291dfa69945475d22c747" +dependencies = [ + "percent-encoding", + "time", + "version_check", +] + +[[package]] +name = "cookie_store" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2eac901828f88a5241ee0600950ab981148a18f2f756900ffba1b125ca6a3ef9" +dependencies = [ + "cookie", + "document-features", + "idna", + "log", + "publicsuffix", + "serde", + "serde_derive", + "serde_json", + "time", + "url", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -568,6 +597,15 @@ dependencies = [ "const-random", ] +[[package]] +name = "document-features" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4b8a88685455ed29a21542a33abd9cb6510b6b129abadabdcef0f4c55bc8f61" +dependencies = [ + "litrs", +] + [[package]] name = "dotenvy" version = "0.15.7" @@ -1352,6 +1390,12 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" +[[package]] +name = "litrs" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092" + [[package]] name = "lock_api" version = "0.4.14" @@ -1777,6 +1821,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "psl-types" +version = "2.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33cb294fe86a74cbcf50d4445b37da762029549ebeea341421c7c70370f86cac" + [[package]] name = "ptr_meta" version = "0.1.4" @@ -1797,6 +1847,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "publicsuffix" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42ea446cab60335f76979ec15e12619a2165b5ae2c12166bef27d283a9fadf" +dependencies = [ + "idna", + "psl-types", +] + [[package]] name = "quote" version = "1.0.42" @@ -1923,6 +1983,8 @@ checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f" dependencies = [ "base64", "bytes", + "cookie", + "cookie_store", "encoding_rs", "futures-core", "h2", diff --git a/services/yfinance-provider-service/Cargo.toml b/services/yfinance-provider-service/Cargo.toml index ec30655..7cbd7b7 100644 --- a/services/yfinance-provider-service/Cargo.toml +++ b/services/yfinance-provider-service/Cargo.toml @@ -18,7 +18,7 @@ futures = "0.3" futures-util = "0.3.31" # Data Persistence Client -reqwest = { version = "0.12.24", features = ["json"] } +reqwest = { version = "0.12.24", features = ["json", "cookies"] } # Concurrency & Async async-trait = "0.1.80" diff --git a/services/yfinance-provider-service/src/config.rs b/services/yfinance-provider-service/src/config.rs index 4bd754c..e026d1f 100644 --- a/services/yfinance-provider-service/src/config.rs +++ b/services/yfinance-provider-service/src/config.rs @@ -5,6 +5,8 @@ pub struct AppConfig { pub server_port: u16, pub nats_addr: String, pub data_persistence_service_url: String, + #[serde(default)] + pub yfinance_enabled: bool, } impl AppConfig { diff --git a/services/yfinance-provider-service/src/persistence.rs b/services/yfinance-provider-service/src/persistence.rs index 6cd94bf..1a1d120 100644 --- a/services/yfinance-provider-service/src/persistence.rs +++ b/services/yfinance-provider-service/src/persistence.rs @@ -6,6 +6,7 @@ use crate::error::Result; use common_contracts::{ + config_models::DataSourcesConfig, dtos::{CompanyProfileDto, RealtimeQuoteDto, TimeSeriesFinancialBatchDto, TimeSeriesFinancialDto}, }; use tracing::info; @@ -24,6 +25,18 @@ impl PersistenceClient { } } + pub async fn get_data_sources_config(&self) -> Result { + let url = format!("{}/configs/data_sources", self.base_url); + let config = self.client + .get(&url) + .send() + .await? + .error_for_status()? + .json() + .await?; + Ok(config) + } + pub async fn upsert_company_profile(&self, profile: CompanyProfileDto) -> Result<()> { let url = format!("{}/companies", self.base_url); info!("Upserting company profile for {} to {}", profile.symbol, url); diff --git a/services/yfinance-provider-service/src/state.rs b/services/yfinance-provider-service/src/state.rs index 4a70327..da03665 100644 --- a/services/yfinance-provider-service/src/state.rs +++ b/services/yfinance-provider-service/src/state.rs @@ -16,7 +16,7 @@ pub struct AppState { impl AppState { pub fn new(config: AppConfig) -> Self { - let provider = Arc::new(YFinanceDataProvider::new()); + let provider = Arc::new(YFinanceDataProvider::new(config.yfinance_enabled)); Self { tasks: Arc::new(DashMap::new()), diff --git a/services/yfinance-provider-service/src/worker.rs b/services/yfinance-provider-service/src/worker.rs index 322063e..7ff4956 100644 --- a/services/yfinance-provider-service/src/worker.rs +++ b/services/yfinance-provider-service/src/worker.rs @@ -12,6 +12,21 @@ pub async fn handle_fetch_command( publisher: async_nats::Client, ) -> Result<()> { let task_id = command.request_id; + + let client = PersistenceClient::new(state.config.data_persistence_service_url.clone()); + + // Fetch dynamic config from data-persistence-service + // adhering to "single source of truth" rule (DB config overrides env var) + let config = client.get_data_sources_config().await?; + let is_enabled = config.get("yfinance") + .map(|c| c.enabled) + .unwrap_or(false); + + if !is_enabled { + info!("YFinance provider is disabled. Skipping task {}.", task_id); + return Ok(()); + } + state.tasks.insert(task_id, common_contracts::observability::TaskProgress { request_id: task_id, task_name: format!("yfinance:{}", command.symbol), @@ -33,7 +48,7 @@ pub async fn handle_fetch_command( task.details = "Persisting data".to_string(); } } - let client = PersistenceClient::new(state.config.data_persistence_service_url.clone()); + // client is already initialized at the beginning of the function client.upsert_company_profile(profile).await?; let years: std::collections::BTreeSet = financials.iter().map(|f| f.period_date.year() as u16).collect(); client.batch_insert_financials(financials).await?; diff --git a/services/yfinance-provider-service/src/yfinance.rs b/services/yfinance-provider-service/src/yfinance.rs index e438be9..43b16b0 100644 --- a/services/yfinance-provider-service/src/yfinance.rs +++ b/services/yfinance-provider-service/src/yfinance.rs @@ -1,20 +1,85 @@ use crate::error::AppError; use crate::mapping::{map_financial_statements, map_profile}; use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto}; +use common_contracts::provider::DataProvider; use anyhow::anyhow; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::info; #[derive(Clone)] pub struct YFinanceDataProvider { client: reqwest::Client, + is_enabled: bool, + crumb: Arc>>, +} + +impl DataProvider for YFinanceDataProvider { + fn name(&self) -> &str { + "yfinance" + } + + fn is_enabled(&self) -> bool { + self.is_enabled + } } impl YFinanceDataProvider { - pub fn new() -> Self { + pub fn new(is_enabled: bool) -> Self { + let client = reqwest::Client::builder() + .cookie_store(true) + .user_agent("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") + .timeout(std::time::Duration::from_secs(30)) + .redirect(reqwest::redirect::Policy::default()) + .build() + .unwrap_or_else(|_| reqwest::Client::new()); + Self { - client: reqwest::Client::new(), + client, + is_enabled, + crumb: Arc::new(RwLock::new(None)), } } + async fn ensure_crumb(&self) -> Result { + // 1. Fast path: read lock + if let Some(crumb) = self.crumb.read().await.as_ref() { + return Ok(crumb.clone()); + } + + // 2. Slow path: write lock + let mut lock = self.crumb.write().await; + // Double-check if another thread filled it while we waited + if let Some(crumb) = lock.as_ref() { + return Ok(crumb.clone()); + } + + info!("Fetching new Yahoo Finance crumb..."); + + // 3. Fetch cookie from fc.yahoo.com + // We don't care about the body, just the cookies which reqwest handles automatically + let _ = self.client + .get("https://fc.yahoo.com") + .send() + .await + .map_err(|e| AppError::ServiceRequest(e))?; + + // 4. Fetch crumb + let crumb_resp = self.client + .get("https://query1.finance.yahoo.com/v1/test/getcrumb") + .send() + .await + .map_err(|e| AppError::ServiceRequest(e))?; + + let crumb_resp = crumb_resp.error_for_status().map_err(|e| AppError::ServiceRequest(e))?; + let crumb_text = crumb_resp.text().await.map_err(|e| AppError::ServiceRequest(e))?; + + info!("Successfully fetched crumb: {}", crumb_text); + *lock = Some(crumb_text.clone()); + + Ok(crumb_text) + } + pub async fn fetch_all_data( &self, symbol: &str, @@ -29,15 +94,28 @@ impl YFinanceDataProvider { &self, symbol: &str, ) -> Result<(serde_json::Value, serde_json::Value), AppError> { + let crumb = self.ensure_crumb().await?; + + // Convert suffix: .SH -> .SS for Yahoo Finance compatibility + // Shanghai: 600519.SH -> 600519.SS + // Shenzhen: 000001.SZ -> 000001.SZ (unchanged) + let yahoo_symbol = if symbol.ends_with(".SH") { + symbol.replace(".SH", ".SS") + } else { + symbol.to_string() + }; + + info!("Fetching raw data for symbol: {} (mapped from {})", yahoo_symbol, symbol); + // Yahoo quoteSummary: assetProfile + quoteType let summary_url = format!( - "https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=assetProfile,quoteType", - symbol + "https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=assetProfile,quoteType&crumb={}", + yahoo_symbol, crumb ); // Yahoo financials: income/balance/cashflow histories let financials_url = format!( - "https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=incomeStatementHistory,balanceSheetHistory,cashflowStatementHistory", - symbol + "https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=incomeStatementHistory,balanceSheetHistory,cashflowStatementHistory&crumb={}", + yahoo_symbol, crumb ); let summary_task = self.client.get(&summary_url).send(); @@ -46,8 +124,8 @@ impl YFinanceDataProvider { let (summary_res, financials_res) = tokio::try_join!(summary_task, financials_task) .map_err(|e| AppError::ServiceRequest(e))?; - let summary_res = summary_res.error_for_status()?; - let financials_res = financials_res.error_for_status()?; + let summary_res = summary_res.error_for_status().map_err(|e| AppError::ServiceRequest(e))?; + let financials_res = financials_res.error_for_status().map_err(|e| AppError::ServiceRequest(e))?; let summary_json: serde_json::Value = summary_res .json()