fix(yfinance): enable crumb auth and fix config loading

- Enable dynamic config loading from DB to respect enabled status
- Implement Yahoo Finance Crumb authentication flow
- Add browser-like User-Agent and cookie store
- Map .SH symbols to .SS for Yahoo compatibility
- Add Google DNS to docker-compose to fix container resolution issues
This commit is contained in:
Lv, Qi 2025-11-19 06:53:14 +08:00
parent 75aa5e4add
commit c8be576526
8 changed files with 184 additions and 11 deletions

View File

@ -186,6 +186,9 @@ services:
- data-persistence-service - data-persistence-service
networks: networks:
- app-network - app-network
dns:
- 8.8.8.8
- 8.8.4.4
healthcheck: healthcheck:
test: ["CMD-SHELL", "curl -fsS http://localhost:8003/health >/dev/null || exit 1"] test: ["CMD-SHELL", "curl -fsS http://localhost:8003/health >/dev/null || exit 1"]
interval: 5s interval: 5s

View File

@ -398,6 +398,35 @@ dependencies = [
"unicode-segmentation", "unicode-segmentation",
] ]
[[package]]
name = "cookie"
version = "0.18.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ddef33a339a91ea89fb53151bd0a4689cfce27055c291dfa69945475d22c747"
dependencies = [
"percent-encoding",
"time",
"version_check",
]
[[package]]
name = "cookie_store"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eac901828f88a5241ee0600950ab981148a18f2f756900ffba1b125ca6a3ef9"
dependencies = [
"cookie",
"document-features",
"idna",
"log",
"publicsuffix",
"serde",
"serde_derive",
"serde_json",
"time",
"url",
]
[[package]] [[package]]
name = "core-foundation" name = "core-foundation"
version = "0.9.4" version = "0.9.4"
@ -568,6 +597,15 @@ dependencies = [
"const-random", "const-random",
] ]
[[package]]
name = "document-features"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4b8a88685455ed29a21542a33abd9cb6510b6b129abadabdcef0f4c55bc8f61"
dependencies = [
"litrs",
]
[[package]] [[package]]
name = "dotenvy" name = "dotenvy"
version = "0.15.7" version = "0.15.7"
@ -1352,6 +1390,12 @@ version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77"
[[package]]
name = "litrs"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11d3d7f243d5c5a8b9bb5d6dd2b1602c0cb0b9db1621bafc7ed66e35ff9fe092"
[[package]] [[package]]
name = "lock_api" name = "lock_api"
version = "0.4.14" version = "0.4.14"
@ -1777,6 +1821,12 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "psl-types"
version = "2.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33cb294fe86a74cbcf50d4445b37da762029549ebeea341421c7c70370f86cac"
[[package]] [[package]]
name = "ptr_meta" name = "ptr_meta"
version = "0.1.4" version = "0.1.4"
@ -1797,6 +1847,16 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "publicsuffix"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f42ea446cab60335f76979ec15e12619a2165b5ae2c12166bef27d283a9fadf"
dependencies = [
"idna",
"psl-types",
]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.42" version = "1.0.42"
@ -1923,6 +1983,8 @@ checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f"
dependencies = [ dependencies = [
"base64", "base64",
"bytes", "bytes",
"cookie",
"cookie_store",
"encoding_rs", "encoding_rs",
"futures-core", "futures-core",
"h2", "h2",

View File

@ -18,7 +18,7 @@ futures = "0.3"
futures-util = "0.3.31" futures-util = "0.3.31"
# Data Persistence Client # Data Persistence Client
reqwest = { version = "0.12.24", features = ["json"] } reqwest = { version = "0.12.24", features = ["json", "cookies"] }
# Concurrency & Async # Concurrency & Async
async-trait = "0.1.80" async-trait = "0.1.80"

View File

@ -5,6 +5,8 @@ pub struct AppConfig {
pub server_port: u16, pub server_port: u16,
pub nats_addr: String, pub nats_addr: String,
pub data_persistence_service_url: String, pub data_persistence_service_url: String,
#[serde(default)]
pub yfinance_enabled: bool,
} }
impl AppConfig { impl AppConfig {

View File

@ -6,6 +6,7 @@
use crate::error::Result; use crate::error::Result;
use common_contracts::{ use common_contracts::{
config_models::DataSourcesConfig,
dtos::{CompanyProfileDto, RealtimeQuoteDto, TimeSeriesFinancialBatchDto, TimeSeriesFinancialDto}, dtos::{CompanyProfileDto, RealtimeQuoteDto, TimeSeriesFinancialBatchDto, TimeSeriesFinancialDto},
}; };
use tracing::info; use tracing::info;
@ -24,6 +25,18 @@ impl PersistenceClient {
} }
} }
pub async fn get_data_sources_config(&self) -> Result<DataSourcesConfig> {
let url = format!("{}/configs/data_sources", self.base_url);
let config = self.client
.get(&url)
.send()
.await?
.error_for_status()?
.json()
.await?;
Ok(config)
}
pub async fn upsert_company_profile(&self, profile: CompanyProfileDto) -> Result<()> { pub async fn upsert_company_profile(&self, profile: CompanyProfileDto) -> Result<()> {
let url = format!("{}/companies", self.base_url); let url = format!("{}/companies", self.base_url);
info!("Upserting company profile for {} to {}", profile.symbol, url); info!("Upserting company profile for {} to {}", profile.symbol, url);

View File

@ -16,7 +16,7 @@ pub struct AppState {
impl AppState { impl AppState {
pub fn new(config: AppConfig) -> Self { pub fn new(config: AppConfig) -> Self {
let provider = Arc::new(YFinanceDataProvider::new()); let provider = Arc::new(YFinanceDataProvider::new(config.yfinance_enabled));
Self { Self {
tasks: Arc::new(DashMap::new()), tasks: Arc::new(DashMap::new()),

View File

@ -12,6 +12,21 @@ pub async fn handle_fetch_command(
publisher: async_nats::Client, publisher: async_nats::Client,
) -> Result<()> { ) -> Result<()> {
let task_id = command.request_id; let task_id = command.request_id;
let client = PersistenceClient::new(state.config.data_persistence_service_url.clone());
// Fetch dynamic config from data-persistence-service
// adhering to "single source of truth" rule (DB config overrides env var)
let config = client.get_data_sources_config().await?;
let is_enabled = config.get("yfinance")
.map(|c| c.enabled)
.unwrap_or(false);
if !is_enabled {
info!("YFinance provider is disabled. Skipping task {}.", task_id);
return Ok(());
}
state.tasks.insert(task_id, common_contracts::observability::TaskProgress { state.tasks.insert(task_id, common_contracts::observability::TaskProgress {
request_id: task_id, request_id: task_id,
task_name: format!("yfinance:{}", command.symbol), task_name: format!("yfinance:{}", command.symbol),
@ -33,7 +48,7 @@ pub async fn handle_fetch_command(
task.details = "Persisting data".to_string(); task.details = "Persisting data".to_string();
} }
} }
let client = PersistenceClient::new(state.config.data_persistence_service_url.clone()); // client is already initialized at the beginning of the function
client.upsert_company_profile(profile).await?; client.upsert_company_profile(profile).await?;
let years: std::collections::BTreeSet<u16> = financials.iter().map(|f| f.period_date.year() as u16).collect(); let years: std::collections::BTreeSet<u16> = financials.iter().map(|f| f.period_date.year() as u16).collect();
client.batch_insert_financials(financials).await?; client.batch_insert_financials(financials).await?;

View File

@ -1,20 +1,85 @@
use crate::error::AppError; use crate::error::AppError;
use crate::mapping::{map_financial_statements, map_profile}; use crate::mapping::{map_financial_statements, map_profile};
use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto}; use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto};
use common_contracts::provider::DataProvider;
use anyhow::anyhow; use anyhow::anyhow;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::info;
#[derive(Clone)] #[derive(Clone)]
pub struct YFinanceDataProvider { pub struct YFinanceDataProvider {
client: reqwest::Client, client: reqwest::Client,
is_enabled: bool,
crumb: Arc<RwLock<Option<String>>>,
}
impl DataProvider for YFinanceDataProvider {
fn name(&self) -> &str {
"yfinance"
}
fn is_enabled(&self) -> bool {
self.is_enabled
}
} }
impl YFinanceDataProvider { impl YFinanceDataProvider {
pub fn new() -> Self { pub fn new(is_enabled: bool) -> Self {
let client = reqwest::Client::builder()
.cookie_store(true)
.user_agent("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")
.timeout(std::time::Duration::from_secs(30))
.redirect(reqwest::redirect::Policy::default())
.build()
.unwrap_or_else(|_| reqwest::Client::new());
Self { Self {
client: reqwest::Client::new(), client,
is_enabled,
crumb: Arc::new(RwLock::new(None)),
} }
} }
async fn ensure_crumb(&self) -> Result<String, AppError> {
// 1. Fast path: read lock
if let Some(crumb) = self.crumb.read().await.as_ref() {
return Ok(crumb.clone());
}
// 2. Slow path: write lock
let mut lock = self.crumb.write().await;
// Double-check if another thread filled it while we waited
if let Some(crumb) = lock.as_ref() {
return Ok(crumb.clone());
}
info!("Fetching new Yahoo Finance crumb...");
// 3. Fetch cookie from fc.yahoo.com
// We don't care about the body, just the cookies which reqwest handles automatically
let _ = self.client
.get("https://fc.yahoo.com")
.send()
.await
.map_err(|e| AppError::ServiceRequest(e))?;
// 4. Fetch crumb
let crumb_resp = self.client
.get("https://query1.finance.yahoo.com/v1/test/getcrumb")
.send()
.await
.map_err(|e| AppError::ServiceRequest(e))?;
let crumb_resp = crumb_resp.error_for_status().map_err(|e| AppError::ServiceRequest(e))?;
let crumb_text = crumb_resp.text().await.map_err(|e| AppError::ServiceRequest(e))?;
info!("Successfully fetched crumb: {}", crumb_text);
*lock = Some(crumb_text.clone());
Ok(crumb_text)
}
pub async fn fetch_all_data( pub async fn fetch_all_data(
&self, &self,
symbol: &str, symbol: &str,
@ -29,15 +94,28 @@ impl YFinanceDataProvider {
&self, &self,
symbol: &str, symbol: &str,
) -> Result<(serde_json::Value, serde_json::Value), AppError> { ) -> Result<(serde_json::Value, serde_json::Value), AppError> {
let crumb = self.ensure_crumb().await?;
// Convert suffix: .SH -> .SS for Yahoo Finance compatibility
// Shanghai: 600519.SH -> 600519.SS
// Shenzhen: 000001.SZ -> 000001.SZ (unchanged)
let yahoo_symbol = if symbol.ends_with(".SH") {
symbol.replace(".SH", ".SS")
} else {
symbol.to_string()
};
info!("Fetching raw data for symbol: {} (mapped from {})", yahoo_symbol, symbol);
// Yahoo quoteSummary: assetProfile + quoteType // Yahoo quoteSummary: assetProfile + quoteType
let summary_url = format!( let summary_url = format!(
"https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=assetProfile,quoteType", "https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=assetProfile,quoteType&crumb={}",
symbol yahoo_symbol, crumb
); );
// Yahoo financials: income/balance/cashflow histories // Yahoo financials: income/balance/cashflow histories
let financials_url = format!( let financials_url = format!(
"https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=incomeStatementHistory,balanceSheetHistory,cashflowStatementHistory", "https://query1.finance.yahoo.com/v10/finance/quoteSummary/{}?modules=incomeStatementHistory,balanceSheetHistory,cashflowStatementHistory&crumb={}",
symbol yahoo_symbol, crumb
); );
let summary_task = self.client.get(&summary_url).send(); let summary_task = self.client.get(&summary_url).send();
@ -46,8 +124,8 @@ impl YFinanceDataProvider {
let (summary_res, financials_res) = tokio::try_join!(summary_task, financials_task) let (summary_res, financials_res) = tokio::try_join!(summary_task, financials_task)
.map_err(|e| AppError::ServiceRequest(e))?; .map_err(|e| AppError::ServiceRequest(e))?;
let summary_res = summary_res.error_for_status()?; let summary_res = summary_res.error_for_status().map_err(|e| AppError::ServiceRequest(e))?;
let financials_res = financials_res.error_for_status()?; let financials_res = financials_res.error_for_status().map_err(|e| AppError::ServiceRequest(e))?;
let summary_json: serde_json::Value = summary_res let summary_json: serde_json::Value = summary_res
.json() .json()