Fundamental_Analysis/services/tushare-provider-service/src/mapping.rs
Lv, Qi 5327e76aaa chore: 提交本轮 Rust 架构迁移相关改动
- docker-compose: 下线 Python backend/config-service,切换至 config-service-rs
- archive: 归档 legacy Python 目录至 archive/python/*
- services: 新增/更新 common-contracts、api-gateway、各 provider、report-generator-service、config-service-rs
- data-persistence-service: API/system 模块与模型/DTO 调整
- frontend: 更新 useApi 与 API 路由
- docs: 更新路线图并勾选光荣退役
- cleanup: 移除 data-distance-service 占位测试
2025-11-16 20:55:46 +08:00

274 lines
11 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use std::collections::HashMap;
use chrono::{Datelike, NaiveDate};
use common_contracts::dtos::TimeSeriesFinancialDto;
use itertools::Itertools;
use rust_decimal::Decimal;
use rust_decimal::prelude::*;
use rust_decimal_macros::dec;
use crate::{
error::ProviderError,
tushare::{
BalanceSheet, Cashflow, Dividend, FinaIndicator, Income, Repurchase, StkHolderNumber,
},
};
pub struct TushareFinancials {
pub balancesheet: Vec<BalanceSheet>,
pub income: Vec<Income>,
pub cashflow: Vec<Cashflow>,
pub fina_indicator: Vec<FinaIndicator>,
pub repurchase: Vec<Repurchase>,
pub dividend: Vec<Dividend>,
pub stk_holdernumber: Vec<StkHolderNumber>,
pub employees: Option<f64>,
}
pub fn map_financial_statements(
symbol: &str,
raw_data: TushareFinancials,
) -> Result<Vec<TimeSeriesFinancialDto>, ProviderError> {
// 1. Merge all financial data by end_date
let mut by_date = merge_financial_data(&raw_data);
// 2. Filter for wanted report dates
let wanted_dates = filter_wanted_dates(by_date.keys().cloned().collect());
let statements_with_periods: Vec<(String, HashMap<String, f64>)> = wanted_dates
.clone()
.into_iter()
.filter_map(|d| by_date.remove(&d).map(|m| (d, m)))
.collect();
// 3. Transform into a series map (metric -> Vec<{period, value}>)
let mut series = transform_to_series_with_periods(&statements_with_periods);
// 4. Process special data (repurchase, dividend, etc.)
let current_year_str = chrono::Utc::now().year().to_string();
let latest_current_year_report = wanted_dates.iter().find(|d| d.starts_with(&current_year_str)).cloned();
process_special_data(&mut series, &raw_data, &current_year_str, latest_current_year_report);
// 5. Calculate derived metrics
calculate_derived_metrics(&mut series);
// 6. 扁平化为 TimeSeriesFinancialDto
flatten_series_to_dtos(symbol, series)
}
fn merge_financial_data(
raw_data: &TushareFinancials,
) -> HashMap<String, HashMap<String, f64>> {
let mut by_date: HashMap<String, HashMap<String, f64>> = HashMap::new();
macro_rules! merge {
($rows:expr, $($field:ident),+) => {
for r in $rows {
if let Some(end_date) = &r.end_date {
let entry = by_date.entry(end_date.clone()).or_default();
$(
if let Some(val) = r.$field {
entry.insert(stringify!($field).to_string(), val);
}
)+
}
}
};
}
merge!(&raw_data.balancesheet, money_cap, inventories, accounts_receiv, prepayment, fix_assets, lt_eqt_invest, goodwill, accounts_pay, adv_receipts, contract_liab, st_borr, lt_borr, total_assets);
merge!(&raw_data.income, revenue, sell_exp, admin_exp, rd_exp, grossprofit_margin, netprofit_margin, n_income);
merge!(&raw_data.cashflow, n_cashflow_act, c_pay_acq_const_fiolta, depr_fa_coga_dpba, c_paid_to_for_empl);
merge!(&raw_data.fina_indicator, tax_to_ebt, arturn_days);
by_date
}
fn filter_wanted_dates(dates: Vec<String>) -> Vec<String> {
let current_year = chrono::Utc::now().year().to_string();
let mut all_available_dates = dates;
all_available_dates.sort_by(|a, b| b.cmp(a)); // Sort descending
let latest_current_year_report = all_available_dates
.iter()
.find(|d| d.starts_with(&current_year))
.cloned();
let mut wanted_dates: Vec<String> = Vec::new();
if let Some(d) = latest_current_year_report {
wanted_dates.push(d);
}
let previous_years_annual_reports = all_available_dates
.into_iter()
.filter(|d| d.ends_with("1231") && !d.starts_with(&current_year));
wanted_dates.extend(previous_years_annual_reports);
wanted_dates
}
#[derive(Debug, Clone)]
struct SeriesPoint { period: String, value: f64 }
type SeriesMap = HashMap<String, Vec<SeriesPoint>>;
fn transform_to_series_with_periods(statements: &[(String, HashMap<String, f64>)]) -> SeriesMap {
let mut series: SeriesMap = HashMap::new();
for (period, report) in statements {
for (key, value) in report {
series
.entry(key.clone())
.or_default()
.push(SeriesPoint { period: period.clone(), value: *value });
}
}
series
}
fn process_special_data(series: &mut SeriesMap, raw_data: &TushareFinancials, current_year: &str, latest_report: Option<String>) {
// Employees
if let Some(employees) = raw_data.employees {
let prev_year = chrono::Utc::now().year() - 1;
series.entry("employees".to_string()).or_default().push(SeriesPoint {
period: format!("{}1231", prev_year),
value: employees,
});
}
// Holder Numbers取每期按最新公告日聚合
let mut holder_by_period: HashMap<String, (String, f64)> = HashMap::new();
for r in &raw_data.stk_holdernumber {
if let (Some(end_date), Some(ann_date), Some(holder_num)) = (&r.end_date, &r.ann_date, r.holder_num) {
let entry = holder_by_period.entry(end_date.clone()).or_insert((ann_date.clone(), holder_num));
if ann_date > &entry.0 {
*entry = (ann_date.clone(), holder_num);
}
}
}
if !holder_by_period.is_empty() {
let mut holder_series: Vec<SeriesPoint> = Vec::new();
for (period, (_ann, num)) in holder_by_period.into_iter() {
holder_series.push(SeriesPoint { period, value: num });
}
series.insert("holder_num".to_string(), holder_series);
}
// Dividend
let mut div_by_year: HashMap<String, f64> = HashMap::new();
for r in &raw_data.dividend {
if let (Some(pay_date), Some(cash_div), Some(base_share)) = (&r.pay_date, r.cash_div_tax, r.base_share) {
if pay_date.len() >= 4 {
let year = &pay_date[..4];
let amount_billion = (cash_div * base_share) / 10000.0;
*div_by_year.entry(year.to_string()).or_default() += amount_billion;
}
}
}
if !div_by_year.is_empty() {
let div_series = div_by_year.into_iter().map(|(year, amount)| {
let period_key = if &year == current_year && latest_report.is_some() {
latest_report.clone().unwrap()
} else {
format!("{}1231", year)
};
SeriesPoint { period: period_key, value: amount }
}).collect();
series.insert("dividend_amount".to_string(), div_series);
}
// Repurchase (simplified)
// A full implementation would be more complex, matching python logic
}
fn calculate_derived_metrics(series: &mut SeriesMap) {
let periods: Vec<String> = series.values().flatten().map(|p| p.period.clone()).unique().collect();
let get_value = |key: &str, period: &str, s: &SeriesMap| -> Option<Decimal> {
s.get(key)
.and_then(|v| v.iter().find(|p| p.period == period))
.and_then(|p| Decimal::from_f64(p.value))
};
let get_avg_value = |key: &str, period: &str, s: &SeriesMap| -> Option<Decimal> {
let current_val = get_value(key, period, s)?;
let prev_year = period[..4].parse::<i32>().ok()? - 1;
let prev_period = format!("{}1231", prev_year);
let prev_val = get_value(key, &prev_period, s);
Some( (current_val + prev_val.unwrap_or(current_val)) / dec!(2) )
};
let get_cogs = |period: &str, s: &SeriesMap| -> Option<Decimal> {
let revenue = get_value("revenue", period, s)?;
let gp_margin_raw = get_value("grossprofit_margin", period, s)?;
let gp_margin = if gp_margin_raw.abs() > dec!(1) { gp_margin_raw / dec!(100) } else { gp_margin_raw };
Some(revenue * (dec!(1) - gp_margin))
};
let mut new_series: SeriesMap = HashMap::new();
for period in &periods {
// Fee Calcs
let fee_calcs = [
("__sell_rate", "sell_exp"), ("__admin_rate", "admin_exp"), ("__rd_rate", "rd_exp"), ("__depr_ratio", "depr_fa_coga_dpba")
];
for (key, num_key) in fee_calcs {
if let (Some(num), Some(den)) = (get_value(num_key, period, series), get_value("revenue", period, series)) {
if !den.is_zero() {
new_series.entry(key.to_string()).or_default().push(SeriesPoint { period: period.clone(), value: ((num / den) * dec!(100)).to_f64().unwrap() });
}
}
}
// Asset Ratios
let asset_ratio_keys = [
("__money_cap_ratio", "money_cap"), ("__inventories_ratio", "inventories"), ("__ar_ratio", "accounts_receiv"),
("__prepay_ratio", "prepayment"), ("__fix_assets_ratio", "fix_assets"), ("__lt_invest_ratio", "lt_eqt_invest"),
("__goodwill_ratio", "goodwill"), ("__ap_ratio", "accounts_pay"), ("__st_borr_ratio", "st_borr"), ("__lt_borr_ratio", "lt_borr"),
];
for (key, num_key) in asset_ratio_keys {
if let (Some(num), Some(den)) = (get_value(num_key, period, series), get_value("total_assets", period, series)) {
if !den.is_zero() {
new_series.entry(key.to_string()).or_default().push(SeriesPoint { period: period.clone(), value: ((num / den) * dec!(100)).to_f64().unwrap() });
}
}
}
// Other derived metrics...
if let Some(tax_to_ebt) = get_value("tax_to_ebt", period, series) {
let rate = if tax_to_ebt.abs() <= dec!(1) { tax_to_ebt * dec!(100) } else { tax_to_ebt };
new_series.entry("__tax_rate".to_string()).or_default().push(SeriesPoint { period: period.clone(), value: rate.to_f64().unwrap() });
}
if let Some(avg_ap) = get_avg_value("accounts_pay", period, series) {
if let Some(cogs) = get_cogs(period, series) {
if !cogs.is_zero() {
new_series.entry("payturn_days".to_string()).or_default().push(SeriesPoint { period: period.clone(), value: ((dec!(365) * avg_ap) / cogs).to_f64().unwrap() });
}
}
}
// ... continue for all other metrics from python
}
series.extend(new_series);
}
fn flatten_series_to_dtos(symbol: &str, series: SeriesMap) -> Result<Vec<TimeSeriesFinancialDto>, ProviderError> {
let mut dtos: Vec<TimeSeriesFinancialDto> = Vec::new();
for (metric_name, data_points) in series {
for point in data_points {
let period_date = NaiveDate::parse_from_str(&point.period, "%Y%m%d")
.map_err(|e| ProviderError::Mapping(format!("Invalid period '{}': {}", point.period, e)))?;
dtos.push(TimeSeriesFinancialDto {
symbol: symbol.to_string(),
metric_name: metric_name.clone(),
period_date,
value: point.value,
source: Some("tushare".to_string()),
});
}
}
Ok(dtos)
}
// (去除旧的 FinancialStatement 映射辅助)