Compare commits

...

4 Commits

Author SHA1 Message Date
Lv, Qi
ceffa0d95c chore(build): unify Dockerfiles and root .dockerignore; fix services after deps upgrade
- secrecy: migrate Secret<String> -> SecretString across services; adjust usages
- warnings: clean unused imports/structs (no behavior change)
- docker-compose: remove unnecessary host port mappings; rely on app-network; keep frontend only
- build: set build.context to repo root; reference per-service Dockerfiles
- add: root .dockerignore to exclude target/node_modules/ref/archive/docs and logs
- data-persistence-service: refine cargo-chef with path deps; slim context; copy correct targets
- Dockerfiles: normalize multi-stage builds; fix WORKDIR and release binary paths
- nats: resolve 4222 conflict by not binding to host
- verified: all rust services build successfully with new flow
2025-11-16 23:34:28 +08:00
Lv, Qi
5327e76aaa chore: 提交本轮 Rust 架构迁移相关改动
- docker-compose: 下线 Python backend/config-service,切换至 config-service-rs
- archive: 归档 legacy Python 目录至 archive/python/*
- services: 新增/更新 common-contracts、api-gateway、各 provider、report-generator-service、config-service-rs
- data-persistence-service: API/system 模块与模型/DTO 调整
- frontend: 更新 useApi 与 API 路由
- docs: 更新路线图并勾选光荣退役
- cleanup: 移除 data-distance-service 占位测试
2025-11-16 20:55:46 +08:00
Lv, Qi
0e45dd4a3f docs: 更新路线图—勾选光荣退役并归档旧Python服务
- 勾选阶段五第3项下线Python服务
- 描述将 backend/ 与 services/config-service/ 归档至 archive/python/
- 明确无Python组件下系统运行的验收标准
2025-11-16 20:54:01 +08:00
Lv, Qi
a6cca48fed chore(cleanup): remove redundant data-distance-service stub tests
- Covered by data-persistence-service tests (db/api).
- No references or compose entries.
2025-11-16 20:52:26 +08:00
146 changed files with 33938 additions and 379 deletions

36
.dockerignore Normal file
View File

@ -0,0 +1,36 @@
# VCS
.git
.gitignore
# Editor/IDE
.vscode
.idea
.DS_Store
# Node/Next.js
frontend/node_modules
frontend/.next
**/node_modules
# Rust build artifacts
target
**/target
# Python/build caches
__pycache__
*.pyc
# Large reference/resources not needed in images
ref/
archive/
docs/
# Logs/temp
*.log
tmp/
temp/
.cache
# Docker compose override (optional)
docker-compose.override.yml

2
.gitignore vendored
View File

@ -17,7 +17,7 @@ services/**/node_modules/
# Build artifacts # Build artifacts
dist/ dist/
build/ build/
ref/
# Binaries # Binaries
portwardenc-amd64 portwardenc-amd64

View File

@ -16,52 +16,34 @@ services:
interval: 5s interval: 5s
timeout: 5s timeout: 5s
retries: 10 retries: 10
ports: networks:
- "15432:5432" - app-network
nats:
image: nats:2.9
volumes:
- nats_data:/data
networks:
- app-network
data-persistence-service: data-persistence-service:
build: build:
context: ./services/data-persistence-service context: .
dockerfile: Dockerfile dockerfile: services/data-persistence-service/Dockerfile
container_name: data-persistence-service container_name: data-persistence-service
environment: environment:
HOST: 0.0.0.0 HOST: 0.0.0.0
PORT: 3000 PORT: 3000
# Rust service connects to the internal DB service name # Rust service connects to the internal DB service name
DATABASE_URL: postgresql://postgres:postgres@postgres-db:5432/fundamental DATABASE_URL: postgresql://postgres:postgres@postgres-db:5432/fundamental
ports:
- "13000:3000"
depends_on: depends_on:
postgres-db: postgres-db:
condition: service_healthy condition: service_healthy
# If you prefer live-reload or local code mount, consider switching to a dev Dockerfile. # If you prefer live-reload or local code mount, consider switching to a dev Dockerfile.
# volumes: # volumes:
# - ./:/workspace # - ./:/workspace
networks:
- app-network
backend:
build:
context: .
dockerfile: backend/Dockerfile
container_name: fundamental-backend
working_dir: /workspace/backend
command: uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
environment:
PYTHONDONTWRITEBYTECODE: "1"
PYTHONUNBUFFERED: "1"
# Config service base URL
CONFIG_SERVICE_BASE_URL: http://config-service:7000/api/v1
# Data persistence service base URL
DATA_PERSISTENCE_BASE_URL: http://data-persistence-service:3000/api/v1
volumes:
# 挂载整个项目,确保后端代码中对项目根目录的相对路径(如 config/)仍然有效
- ./:/workspace
ports:
- "18000:8000"
depends_on:
config-service:
condition: service_started
data-persistence-service:
condition: service_started
frontend: frontend:
build: build:
@ -71,8 +53,8 @@ services:
working_dir: /workspace/frontend working_dir: /workspace/frontend
command: npm run dev command: npm run dev
environment: environment:
# 让 Next 的 API 路由代理到后端容器 # 让 Next 的 API 路由代理到新的 api-gateway
NEXT_PUBLIC_BACKEND_URL: http://backend:8000/api NEXT_PUBLIC_BACKEND_URL: http://api-gateway:4000/v1
# Prisma 直连数据库(与后端共用同一库) # Prisma 直连数据库(与后端共用同一库)
DATABASE_URL: postgresql://postgres:postgres@postgres-db:5432/fundamental?schema=public DATABASE_URL: postgresql://postgres:postgres@postgres-db:5432/fundamental?schema=public
NODE_ENV: development NODE_ENV: development
@ -84,26 +66,141 @@ services:
ports: ports:
- "13001:3001" - "13001:3001"
depends_on: depends_on:
- backend
- postgres-db - postgres-db
- config-service - api-gateway
networks:
- app-network
config-service:
api-gateway:
build: build:
context: . context: .
dockerfile: services/config-service/Dockerfile dockerfile: services/api-gateway/Dockerfile
container_name: fundamental-config-service container_name: api-gateway
working_dir: /workspace/services/config-service
command: uvicorn app.main:app --host 0.0.0.0 --port 7000
environment: environment:
PROJECT_ROOT: /workspace SERVER_PORT: 4000
NATS_ADDR: nats://nats:4222
DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1
# Note: provider_services needs to contain all provider's internal addresses
PROVIDER_SERVICES: '["http://alphavantage-provider-service:8000", "http://tushare-provider-service:8001", "http://finnhub-provider-service:8002", "http://yfinance-provider-service:8003"]'
depends_on:
- nats
- data-persistence-service
- alphavantage-provider-service
- tushare-provider-service
- finnhub-provider-service
- yfinance-provider-service
networks:
- app-network
alphavantage-provider-service:
build:
context: .
dockerfile: services/alphavantage-provider-service/Dockerfile
container_name: alphavantage-provider-service
environment:
SERVER_PORT: 8000
NATS_ADDR: nats://nats:4222
DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1
depends_on:
- nats
- data-persistence-service
networks:
- app-network
tushare-provider-service:
build:
context: .
dockerfile: services/tushare-provider-service/Dockerfile
container_name: tushare-provider-service
environment:
SERVER_PORT: 8001
NATS_ADDR: nats://nats:4222
DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1
TUSHARE_API_URL: http://api.waditu.com
# Please provide your Tushare token here
TUSHARE_API_TOKEN: "YOUR_TUSHARE_API_TOKEN"
depends_on:
- nats
- data-persistence-service
networks:
- app-network
finnhub-provider-service:
build:
context: .
dockerfile: services/finnhub-provider-service/Dockerfile
container_name: finnhub-provider-service
environment:
SERVER_PORT: 8002
NATS_ADDR: nats://nats:4222
DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1
FINNHUB_API_URL: https://finnhub.io/api/v1
# Please provide your Finnhub token in .env file
FINNHUB_API_KEY: ${FINNHUB_API_KEY}
depends_on:
- nats
- data-persistence-service
networks:
- app-network
yfinance-provider-service:
build:
context: .
dockerfile: services/yfinance-provider-service/Dockerfile
container_name: yfinance-provider-service
environment:
SERVER_PORT: 8003
NATS_ADDR: nats://nats:4222
DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1
depends_on:
- nats
- data-persistence-service
networks:
- app-network
report-generator-service:
build:
context: .
dockerfile: services/report-generator-service/Dockerfile
container_name: report-generator-service
environment:
SERVER_PORT: 8004
NATS_ADDR: nats://nats:4222
DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1
# Please provide your LLM provider details in .env file
LLM_API_URL: ${LLM_API_URL}
LLM_API_KEY: ${LLM_API_KEY}
LLM_MODEL: ${LLM_MODEL:-"default-model"}
depends_on:
- nats
- data-persistence-service
networks:
- app-network
config-service-rs:
build:
context: .
dockerfile: services/config-service-rs/Dockerfile
container_name: config-service-rs
environment:
SERVER_PORT: 5001
# PROJECT_ROOT is set to /workspace in the Dockerfile
networks:
- app-network
volumes: volumes:
- ./:/workspace - ./config:/workspace/config:ro
ports:
- "17000:7000" # =================================================================
# Python Services (Legacy - to be replaced)
# =================================================================
volumes: volumes:
pgdata: pgdata:
frontend_node_modules: frontend_node_modules:
nats_data:
networks:
app-network:

View File

@ -0,0 +1,155 @@
# 架构规范系统模块SystemModule设计准则
## 1. 引言
### 1.1. 文档目的
本文档旨在为我们的事件驱动微服务架构定义一套**“主规则” (Master Rules)**。它通过提出一个概念性的 **`SystemModule` Trait**,形式化地规定了任何一个希望融入本系统的独立微服务所必须遵循的**行为契约和接口规范**。
此规范的目标是确保系统中的每一个模块(服务)都是:
- **可观测的 (Observable)**: 外部系统如监控面板、API Gateway可以清晰地了解其健康状况和当前任务。
- **配置驱动的 (Configuration-Driven)**: 模块的行为和连接信息可以通过外部配置进行管理。
- **契约绑定的 (Contract-Bound)**: 模块与系统其他部分的交互(消息、事件)遵循共享的、强类型的契约。
- **生命周期可控的 (Lifecycle-Managed)**: 模块的启动、运行和关闭遵循标准模式。
## 2. 架构核心组件回顾
一个标准的`SystemModule`存活在以下核心组件构成的环境中:
- **Message Bus**: 异步通信的唯一通道。
- **Data Persistence Service**: 持久化数据的唯一入口。
- **Shared Contracts Crate (`common-contracts`)**: 所有数据模型、消息定义的唯一事实源。
- **Configuration Source**: 环境变量或配置服务,为模块提供启动参数。
## 3. `SystemModule` Trait模块的行为契约
我们可以将一个“合格”的微服务需要满足的条件抽象地描述为以下Rust Trait。**注意这并非一个需要真实实现的Rust Trait**,而是一个**设计隐喻**,用于清晰地定义每个独立服务(二进制程序)必须对外暴露的行为。
```rust
/// 设计隐喻:一个合格的系统微服务必须提供的能力
pub trait SystemModule {
// --- 1. 自我描述与配置 ---
/// 返回模块的唯一、人类可读的名称,如 "finnhub-provider-service"
fn module_id(&self) -> &'static str;
/// 声明本模块需要从外部获取的所有配置项
/// 用于自动生成文档、部署脚本和配置校验
fn required_configuration(&self) -> Vec<ConfigSpec>;
// --- 2. 消息契约 ---
/// 声明本模块会订阅(监听)哪些命令 (Commands)
/// 用于生成系统交互图和验证消息路由
fn subscribed_commands(&self) -> Vec<MessageType>;
/// 声明本模块会发布(产生)哪些事件 (Events)
fn published_events(&self) -> Vec<MessageType>;
// --- 3. 可观测性接口 (核心) ---
/// [必须实现为HTTP GET /health]
/// 提供模块当前的健康状态
async fn get_health_status(&self) -> HealthStatus;
/// [必须实现为HTTP GET /tasks]
/// 报告模块当前正在处理的所有任务及其进度
/// 这是实现分布式进度追踪的关键
async fn get_current_tasks(&self) -> Vec<TaskProgress>;
// --- 4. 生命周期 ---
/// 模块的主运行循环
/// 包含连接到Message Bus、处理消息、响应健康检查等逻辑
async fn run(&mut self) -> Result<(), ModuleError>;
}
```
## 4. `SystemModule` Trait 的具象化实现
上述Trait中的每一项都必须在微服务中以具体的技术形式落地。
**每个微服务都必须:**
1. **提供一个 `Dockerfile`** 用于容器化部署。
2. 在启动时,根据`required_configuration()`的定义,**从环境变量或配置服务中读取配置**。缺少必要配置必须启动失败。
3. 在启动后,**连接到 Message Bus**,并严格按照`subscribed_commands()`和`published_events()`的声明进行订阅和发布。
4. **实现一个内置的HTTP服务器** (e.g., using Axum),并暴露**两个强制性的API端点**
- `GET /health`: 返回`HealthStatus`的JSON表示。用于服务发现、负载均衡和健康检查。
- `GET /tasks`: 返回`Vec<TaskProgress>`的JSON表示。用于外部系统查询当前模块正在执行什么任务。
### 4.1. 可观测性接口的数据结构
这些结构体将被定义在`common-contracts`中,供所有模块和监控系统使用。
```rust
// common-contracts/src/observability.rs
#[derive(Serialize, Deserialize, Debug)]
pub enum ServiceStatus {
Ok, // 一切正常
Degraded, // 功能部分受损,但仍在运行
Unhealthy, // 服务异常
}
#[derive(Serialize, Deserialize, Debug)]
pub struct HealthStatus {
pub module_id: String,
pub status: ServiceStatus,
pub version: String,
pub details: HashMap<String, String>, // e.g., "message_bus_connection": "ok"
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TaskProgress {
pub request_id: Uuid, // 关联到最初的用户请求
pub task_name: String, // e.g., "fetch_financials_for_aapl"
pub status: String, // e.g., "in_progress", "retrying", "blocked"
pub progress_percent: u8, // 0-100
pub details: String, // 人类可读的详细状态
pub started_at: DateTime<Utc>,
}
```
## 5. 示范设计 (Example Designs)
### 示例一: `finnhub-provider-service`
这是一个相对简单的、无状态的数据拉取服务。
- **`module_id`**: `"finnhub-provider-service"`
- **`required_configuration`**: `FINNHUB_API_KEY`, `MESSAGE_BUS_URL`, `DATA_PERSISTENCE_URL`
- **`subscribed_commands`**: `FetchCompanyDataCommand`
- **`published_events`**: `CompanyProfilePersistedEvent`, `FinancialsPersistedEvent`
- **`GET /health` 返回**: `{ "module_id": "...", "status": "Ok", ... }`
- **`GET /tasks` 返回**:
- 空闲时: `[]`
- 正在为AAPL拉取数据时: `[{ "request_id": "...", "task_name": "fetch_data_for_aapl", "status": "in_progress", "progress_percent": 50, "details": "Fetching financial statements...", ... }]`
### 示例二: `report-generator-service`
这是一个更复杂的、有状态的业务逻辑服务。
- **`module_id`**: `"report-generator-service"`
- **`required_configuration`**: `GEMINI_API_KEY`, `MESSAGE_BUS_URL`, `DATA_PERSISTENCE_URL`
- **`subscribed_commands`**: `GenerateReportCommand` (一个新的命令由API Gateway发起)
- **`published_events`**: `ReportGenerationStarted`, `ReportSectionCompleted`, `ReportCompletedEvent`, `ReportFailedEvent`
- **`GET /health` 返回**: `{ "module_id": "...", "status": "Ok", ... }`
- **`GET /tasks` 返回**:
- 空闲时: `[]`
- 正在为600519.SH生成报告时:
```json
[
{
"request_id": "abc-123",
"task_name": "generate_report_for_600519.SH",
"status": "in_progress",
"progress_percent": 66,
"details": "Generating bull_case analysis, waiting for AI model response.",
"started_at": "..."
}
]
```
## 6. 结论
`SystemModule`规范为我们的微服务生态系统提供了骨架和纪律。通过强制要求所有模块实现标准的可观测性接口和消息契约,我们可以构建一个真正健壮、透明且易于管理的分布式系统。所有新服务的开发都将以此规范为起点。

View File

@ -0,0 +1,542 @@
# 设计文档: 面向Rust的事件驱动数据微服务架构
## 1. 引言
### 1.1. 文档目的
本文档旨在为“基本面选股系统”设计一个**完全基于Rust的、事件驱动的、去中心化的微服务架构**。此设计将作为彻底替换现有Python组件、并构建下一代数据处理生态系统的核心技术蓝图。
新的架构目标是:
1. **服务独立化**将每个外部数据源Tushare, Finnhub等封装成独立的、可独立部署和运行的微服务。
2. **事件驱动**引入消息总线Message Bus作为服务间通信的主干实现服务的高度解耦和异步协作。
3. **数据中心化**:所有微服务将标准化的数据写入一个由`data-persistence-service`独占管理的中央数据库,实现“数据写入即共享”。
4. **纯Rust生态**从前端网关到最末端的数据提供商整个后端生态系统将100%使用Rust构建确保端到端的类型安全、高性能和健壮性。
### 1.2. 核心架构理念
- **独立单元 (Independent Units)**: 每个服务都是一个完整的、自包含的应用程序,拥有自己的配置、逻辑和生命周期。
- **异步协作 (Asynchronous Collaboration)**: 服务之间通过发布/订阅消息进行通信而非紧耦合的直接API调用。
- **单一事实源 (Single Source of Truth)**: 数据库是所有结构化数据的唯一事实源。服务通过向数据库写入数据来“广播”其工作成果。
## 2. 目标架构 (Target Architecture)
### 2.1. 架构图
```
+-------------+ +------------------+ +---------------------------+
| | HTTP | | | |
| Frontend |----->| API Gateway |----->| Message Bus |
| (Next.js) | | (Rust) | | (e.g., RabbitMQ, NATS) |
| | | | | |
+-------------+ +-------+----------+ +-------------+-------------+
| |
(Read operations) | | (Pub/Sub Commands & Events)
| |
+-----------------v------------------+ +------v------+ +----------------+ +----------------+
| | | Tushare | | Finnhub | | iFind |
| Data Persistence Service (Rust) |<---->| Provider | | Provider | | Provider |
| | | Service | | Service | | Service |
+-----------------+------------------+ | (Rust) | | (Rust) | | (Rust) |
| +-------------+ +----------------+ +----------------+
v
+-----------------------------------------------------+
| |
| PostgreSQL Database |
| |
+-----------------------------------------------------+
```
### 2.2. 服务职责划分
- **API Gateway (Rust)**:
- 面向前端的唯一入口 (BFF - Backend for Frontend)。
- 负责处理用户请求、认证鉴权。
- 将前端的查询请求转化为对`Data Persistence Service`的数据读取调用。
- 将前端的操作请求如“生成新报告”转化为命令Command并发布到**Message Bus**。
- **`*_provider-service` (Rust)**:
- **一组**独立的微服务每个服务对应一个外部数据API如`tushare-provider-service`)。
- 订阅Message Bus上的相关命令如`FetchFinancialsRequest`)。
- 独立调用外部API对返回数据进行清洗、标准化。
- 调用`Data Persistence Service`的接口,将标准化后的数据写入数据库。
- 操作完成后可以向Message Bus发布事件Event如`FinancialsDataReady`。
- **Data Persistence Service (Rust)**:
- **(职责不变)** 数据库的唯一守门人。
- 为所有其他内部微服务提供稳定、统一的数据库读写gRPC/HTTP接口。
- **Message Bus (e.g., RabbitMQ, NATS)**:
- 整个系统的神经中枢,负责所有服务间的异步通信。
- 传递命令(“做什么”)和事件(“发生了什么”)。
## 3. 核心抽象与数据契约
### 3.1. `DataProvider` Trait (内部实现蓝图)
此Trait依然是构建**每个独立Provider微服务内部逻辑**的核心蓝图。它定义了一个Provider应该具备的核心能力。
```rust
// This trait defines the internal logic blueprint for each provider microservice.
#[async_trait]
pub trait DataProvider: Send + Sync {
// ... (trait definition remains the same as previous version) ...
fn get_id(&self) -> &'static str;
async fn get_company_profile(&self, symbol: &str) -> Result<CompanyProfile, DataProviderError>;
async fn get_historical_financials(&self, symbol: &str, years: &[u16]) -> Result<Vec<FinancialStatement>, DataProviderError>;
// ... etc ...
}
```
### 3.2. 标准化数据模型 (共享的数据契约)
这些模型是服务间共享的“通用语言”,也是存入数据库的最终形态,其重要性在新架构下更高。
```rust
// These structs are the shared "Data Contracts" across all services.
// Their definitions remain the same as the previous version.
#[derive(Debug, Clone, Serialize, Deserialize)] // Add Serialize/Deserialize for messaging
pub struct CompanyProfile { ... }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FinancialStatement { ... }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MarketDataPoint { ... }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RealtimeQuote { ... }
```
### 3.3. 消息/事件定义 (Message/Event Contracts)
这是新架构的核心定义了在Message Bus上传递的消息格式。
```rust
use uuid::Uuid;
use serde::{Serialize, Deserialize};
// --- Commands (Instructions to do something) ---
#[derive(Serialize, Deserialize)]
pub struct FetchCompanyDataCommand {
pub request_id: Uuid,
pub symbol: String,
pub market: String, // To help providers route to the correct API endpoint
}
// --- Events (Notifications that something has happened) ---
#[derive(Serialize, Deserialize)]
pub struct CompanyProfilePersistedEvent {
pub request_id: Uuid,
pub symbol: String,
// We don't need to carry the full data, as it's now in the database.
// Interested services can query it.
}
#[derive(Serialize, Deserialize)]
pub struct FinancialsPersistedEvent {
pub request_id: Uuid,
pub symbol: String,
pub years_updated: Vec<u16>,
}
```
## 4. 数据工作流示例 (Example Data Workflow)
1. **请求发起**: 用户在前端请求`AAPL`的分析报告。请求到达`API Gateway`。
2. **命令发布**: `API Gateway`生成一个唯一的`request_id`然后向Message Bus发布一个`FetchCompanyDataCommand`命令。
3. **命令消费**: `tushare-provider`、`finnhub-provider`等所有订阅了此命令的服务都会收到消息。
4. **独立执行**:
- `finnhub-provider`根据`market`和`symbol`调用Finnhub API获取公司简介、财务、行情数据。
- 数据获取成功后,它将数据转换为标准化的`CompanyProfile`, `Vec<FinancialStatement>`等模型。
- 它调用`Data Persistence Service`的接口,将这些标准化的数据写入数据库。
- 写入成功后它向Message Bus发布`CompanyProfilePersistedEvent`和`FinancialsPersistedEvent`等事件。
- `tushare-provider`收到命令后,可能因为市场不匹配而直接忽略该消息。
5. **下游响应**: 一个潜在的`report-generator-service`(图中未画出,属于业务层)可以订阅`...PersistedEvent`。当它收到了生成一份完整报告所需的所有数据事件后便开始从数据库中拉取这些数据进行AI分析并将最终报告存回数据库。
6. **前端轮询/通知**: `API Gateway`可以通过WebSocket或长轮询等方式将最终报告的完成状态通知给前端。
## 5. 实施路线图 (Roadmap) - 更新于 2025-11-15
**基于对项目现状的调研,本路线图已更新,明确标识了已完成的工作和接下来的行动计划。**
---
### **✔ 阶段-1容器化与初步服务拆分 (已完成)**
- [x] **核心服务已容器化**: `data-persistence-service` 已完全开发、容器化并通过Docker Compose与数据库和现有Python后端集成。
- [x] **数据库已初始化**: Rust服务的 `migrations` 目录证实了数据库表结构已通过 `sqlx-cli` 创建和管理。
- [x] **Python后端部分重构**: Python `backend` 服务已经作为客户端通过HTTP API调用`data-persistence-service`来读写数据。
- [x] **配置服务已拆分**: `config-service` 作为一个独立的Python微服务也已存在并运行。
- [x] **开发环境已建立**: 整个系统可以通过`Docker Compose`和`Tilt`一键启动。
---
### **阶段〇:奠定新架构的基石 (Laying the New Foundation)**
- [x] **1. 部署消息总线**: 在`docker-compose.yml`中添加一个消息总线服务 (`NATS`)。这是实现事件驱动架构的**先决条件**。
- [x] **2. 创建共享契约库 (`common-contracts`)**: 在`services/`下创建一个新的Rust `common-contracts` crate。
- 将`data-persistence-service/src/dtos.rs` 和 `models.rs`中的核心数据结构(如`CompanyProfile`, `FinancialStatement`等)迁移至此。
- 添加`architecture_module_specification.md`中定义的消息契约 (`FetchCompanyDataCommand`等) 和可观测性结构 (`HealthStatus`, `TaskProgress`)。
- [x] **3. 升级 `data-persistence-service`**:
- 使其依赖新的`common-contracts` crate替换掉本地的数据模型定义。
- 为其实现`SystemModule`规范,即添加`/health`和`/tasks`端点。
---
### **阶段一:开发 `alphavantage-provider-service` (精确实现蓝图)**
**目标**: 创建并实现 `alphavantage-provider-service`,使其成为我们新架构下的第一个功能完备、可独立运行的数据提供商微服务。
- [x] **1. 项目初始化与依赖配置**
- [x] **任务**: 基于我们的微服务模板创建新的Rust项目 `services/alphavantage-provider-service`
- [x] **任务**: 在其`Cargo.toml`中添加核心依赖。
```toml
# Cargo.toml
[dependencies]
# ... other dependencies like axum, tokio, etc.
common-contracts = { path = "../common-contracts" }
# Generic MCP Client
rmcp = "0.8.5"
# Message Queue (NATS)
async-nats = "0.33"
```
- [x] **验收标准**: 项目可以成功编译 (`cargo check`)。
- [x] **2. 实现 `SystemModule` 规范**
- [x] **任务**: 在`main.rs`中启动一个Axum HTTP服务器。
- [x] **任务**: 实现强制的`/health`端点,返回当前服务的健康状态。
- [x] **任务**: 实现强制的`/tasks`端点。此端点需要从一个线程安全的内存存储(例如 `Arc<DashMap<Uuid, TaskProgress>>`)中读取并返回所有正在进行的任务。
- [x] **验收标准**: 启动服务后,可以通过`curl`或浏览器访问`http://localhost:port/health`和`http://localhost:port/tasks`并得到正确的JSON响应。
- [x] **3. 实现核心业务逻辑:事件驱动的数据处理**
- [x] **任务**: 实现连接到Message Bus并订阅`FetchCompanyDataCommand`命令的逻辑。
- [x] **任务**: 当收到`FetchCompanyDataCommand`命令时,执行以下异步工作流:
1. 在任务存储中创建并插入一个新的`TaskProgress`记录。
2. 从配置中读取`ALPHAVANTAGE_API_KEY`并构建MCP端点URL。
3. 初始化通用的`rmcp`客户端: `let client = rmcp::mcp::Client::new(mcp_endpoint_url);`
4. 使用`tokio::try_join!`**并行**执行多个数据获取任务。**注意:函数名是字符串,返回的是`serde_json::Value`。**
```rust
// 伪代码示例
let symbol = &command.symbol;
let overview_task = client.query("OVERVIEW", &[("symbol", symbol)]);
let income_task = client.query("INCOME_STATEMENT", &[("symbol", symbol)]);
// ... 其他任务
match tokio::try_join!(overview_task, income_task, /*...*/) {
Ok((overview_json, income_json, /*...*/)) => {
// overview_json and income_json are of type serde_json::Value
// ... 进入步骤 4
},
Err(e) => { /* ... */ }
}
```
5. 在`try_join!`前后,精确地更新内存中`TaskProgress`的状态。
- [x] **验收标准**: 在Message Bus中发布命令后服务的日志能正确打印出从Alpha Vantage获取到的原始JSON数据。
- [x] **4. 实现数据转换与持久化 (强类型映射)**
- [x] **任务**: **(关键变更)** 实现 `TryFrom<serde_json::Value>` Trait完成从动态JSON到我们`common-contracts`模型的**带错误处理的**转换。
```rust
// alphavantage-provider-service/src/mapping.rs
use serde_json::Value;
use common_contracts::models as our;
impl TryFrom<Value> for our::CompanyProfile {
type Error = anyhow::Error; // Or a more specific parsing error
fn try_from(v: Value) -> Result<Self, Self::Error> {
Ok(our::CompanyProfile {
symbol: v["Symbol"].as_str().ok_or_else(|| anyhow!("Missing Symbol"))?.to_string(),
name: v["Name"].as_str().ok_or_else(|| anyhow!("Missing Name"))?.to_string(),
// ... 其他字段的安全解析和转换
})
}
}
```
- [x] **任务**: 创建一个类型化的HTTP客户端 (Data Persistence Client),用于与`data-persistence-service`通信。
- [x] **任务**: 在所有数据转换成功后,调用上述客户端进行持久化。
- [x] **验收标准**: 数据库中查询到的数据,结构完全符合`common-contracts`定义。
- [x] **5. 实现事件发布与任务完成**
- [x] **任务**: 在数据成功持久化到数据库后向Message Bus发布相应的数据就绪事件如`CompanyProfilePersistedEvent`和`FinancialsPersistedEvent`。
- [x] **任务**: 在所有流程执行完毕(无论成功或失败)后,从内存存储中移除对应的`TaskProgress`对象或将其标记为“已完成”并设置TTL
- [x] **验收标准**: 能够在Message Bus中监听到本服务发布的事件。`/tasks`接口不再显示已完成的任务。
---
### **阶段二重构API网关与请求流程 (精确实现蓝图)**
**目标**: 创建一个纯Rust的`api-gateway`服务,它将作为前端的唯一入口(BFF),负责发起数据获取任务、查询持久化数据以及追踪分布式任务进度。
- [x] **1. 项目初始化与 `SystemModule` 规范实现**
- [x] **任务**: 基于我们的微服务模板创建新的Rust项目 `services/api-gateway`
- [x] **任务**: 在其`Cargo.toml`中添加核心依赖: `axum`, `tokio`, `common-contracts`, `async-nats`, `reqwest`, `tracing`, `config`
- [x] **任务**: 实现强制的`/health`端点。
- [x] **任务**: 实现强制的`/tasks`端点。由于网关本身是无状态的、不执行长任务,此端点当前可以简单地返回一个空数组`[]`。
- [x] **验收标准**: `api-gateway`服务可以独立编译和运行,并且`/health`接口按预期工作。
- [x] **2. 实现数据触发流程 (发布命令)**
- [x] **任务**: 在`api-gateway`中创建一个新的HTTP端点 `POST /v1/data-requests`它应接收一个JSON体例如: `{"symbol": "AAPL", "market": "US"}`
- [x] **任务**: 为此端点实现处理逻辑:
1. 生成一个全局唯一的 `request_id` (UUID)。
2. 创建一个`common_contracts::messages::FetchCompanyDataCommand`消息,填入请求参数和`request_id`。
3. 连接到Message Bus并将此命令发布到`data_fetch_commands`队列。
4. 向前端立即返回 `202 Accepted` 状态码,响应体中包含 `{ "request_id": "..." }`,以便前端后续追踪。
- [x] **验收标准**: 通过工具如Postman调用此端点后能够在NATS的管理界面看到相应的消息被发布同时`alphavantage-provider-service`的日志显示它已接收并开始处理该命令。
- [x] **3. 实现数据查询流程 (读取持久化数据)**
- [x] **任务**: 在`api-gateway`中创建一个类型化的HTTP客户端 (Persistence Client),用于与`data-persistence-service`通信。
- [x] **任务**: 实现 `GET /v1/companies/{symbol}/profile` 端点。该端点接收股票代码通过Persistence Client调用`data-persistence-service`的相应接口,并将查询到的`CompanyProfile`数据返回给前端。
- [x] **任务**: (可选) 根据需要,实现查询财务报表、行情数据等其他数据类型的端点。
- [x] **验收标准**: 在`alphavantage-provider-service`成功写入数据后,通过浏览器或`curl`调用这些新端点可以查询到预期的JSON数据。
- [x] **4. 实现分布式任务进度追踪**
- [x] **任务**: 在`api-gateway`的配置中,增加一个`provider_services`字段,用于列出所有数据提供商服务的地址,例如: `["http://alphavantage-provider-service:8000"]`
- [x] **任务**: 实现 `GET /v1/tasks/{request_id}` 端点。
- [x] **任务**: 该端点的处理逻辑需要:
1. 读取配置中的`provider_services`列表。
2. 使用`tokio::join!`或`futures::future::join_all`**并行地**向所有provider服务的`/tasks`端点发起HTTP GET请求。
3. 聚合所有服务的返回结果(一个`Vec<Vec<TaskProgress>>`并从中线性搜索与URL路径中`request_id`匹配的`TaskProgress`对象。
4. 如果找到匹配的任务将其作为JSON返回。如果遍历完所有结果都未找到则返回`404 Not Found`。
- [x] **验收标准**: 当`alphavantage-provider-service`正在处理一个任务时,通过`api-gateway`的这个新端点(并传入正确的`request_id`),能够实时查询到该任务的进度详情。
---
### **阶段三:逐步迁移与替换 (精确实现蓝图)**
**目标**: 将前端应用无缝对接到新的Rust `api-gateway`并随着数据提供商的逐步完善最终彻底移除旧的Python `backend`服务,完成整个系统的架构升级。
- [x] **1. 将 `api-gateway` 集成到开发环境**
- [x] **任务**: 在根目录的 `docker-compose.yml` 文件中,为我们新创建的 `api-gateway` 服务添加入口定义。
```yaml
# docker-compose.yml
services:
# ... other services
api-gateway:
build:
context: ./services/api-gateway
dockerfile: Dockerfile
container_name: api-gateway
environment:
# 注入所有必要的配置
SERVER_PORT: 4000
NATS_ADDR: nats://nats:4222
DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1
# 注意: provider_services需要包含所有provider的内部地址
PROVIDER_SERVICES: '["http://alphavantage-provider-service:8000"]'
ports:
- "14000:4000"
depends_on:
- nats
- data-persistence-service
- alphavantage-provider-service
```
- [x] **任务**: (如果尚未完成) 在`docker-compose.yml`中添加`nats`服务。
- [x] **验收标准**: 运行 `docker-compose up` (或 `tilt up`) 后,`api-gateway` 服务能够成功启动并连接到消息总线。
- [x] **2. 迁移前端应用的API调用逻辑**
- [x] **任务**: 修改前端项目的环境变量将API请求的目标从旧`backend`指向新`api-gateway`。
```
# frontend/.env.local (or in docker-compose.yml)
NEXT_PUBLIC_BACKEND_URL=http://api-gateway:4000/v1
```
- [x] **任务**: 重构前端的数据获取Hooks例如 `useApi.ts`)。
- **旧逻辑**: 发起一个长轮询GET请求等待完整数据返回。
- **新逻辑**:
1. **触发**: 发起一个 `POST` 请求到 `/data-requests`,并从响应中获取 `request_id`
2. **轮询**: 使用 `useSWR``react-query` 的轮询功能每隔2-3秒调用一次 `GET /tasks/{request_id}` 端点来获取任务进度。
3. **展示**: 根据任务进度更新UI例如显示加载条和状态信息
4. **完成**: 当任务状态变为 "completed" (或类似状态),或 `GET /tasks/{request_id}` 返回 `404` 时,停止轮询,并调用 `GET /companies/{symbol}/profile` 等数据查询端点来获取最终数据并渲染。
- [x] **验收标准**: 在前端页面输入股票代码并点击“生成报告”后能够触发新的异步工作流并在UI上看到实时进度的更新最终成功展示由 `alphavantage-provider-service` 获取的数据。
---
### **阶段四:数据提供商生态系统扩展 (Data Provider Ecosystem Expansion)**
**目标**: 将现有Python `backend`中的核心`data_providers`逐一重写为独立的Rust微服务丰富我们的数据维度。
- [ ] **0. (前置任务) 完成所有Provider的适配性分析**
- [ ] **任务**: 在开始大规模编码前,完成 **附录A** 中所有待迁移数据提供商的适配性分析,确保`common-contracts`模型的完备性并明确每个Provider的实现关键点。
- [x] **1. 实现 `tushare-provider-service` (中国市场核心)**
- [x] **任务**: 基于 `alphavantage-provider-service` 模板,创建并实现服务的基本框架。
- [x] **任务**: 完成 Tushare 8个核心API的并行数据获取、聚合与报告期筛选逻辑。
- [x] **任务**: 在 `mapping.rs`精确复刻Python版本 `_calculate_derived_metrics` 方法中近20个派生财务指标的计算逻辑。
- [x] **任务**: 在 `docker-compose.yml`中添加此服务,并将其地址加入到`api-gateway`的`PROVIDER_SERVICES`环境变量中。
- [x] **验收标准**: 收到`market: "CN"`的`FetchCompanyDataCommand`命令时该服务能被触发并成功将与Python版本逻辑一致的A股数据**包含所有派生指标**)写入数据库。
- [x] **2. (可选) 迁移其他数据提供商**
- [x] **任务**: 基于各自的适配性分析,创建并实现`finnhub-provider-service`。
- [x] **任务**: 基于各自的适配性分析,创建并实现`yfinance-provider-service`。
- [ ] **任务**: 基于各自的适配性分析,创建并实现`ifind-provider-service`。
---
### **阶段五:业务逻辑迁移与最终替换 (Business Logic Migration & Final Replacement)**
**目标**: 将Python `backend`中剩余的AI分析和配置管理逻辑迁移到Rust生态并最终彻底下线Python服务。
- [x] **1. 创建 `report-generator-service`**
- [x] **任务**: 创建一个新的Rust微服务`report-generator-service`。
- [x] **任务**: 实现对Message Bus事件如`FinancialsPersistedEvent`)的订阅。
- [x] **任务**: 将原Python `backend`中的`analysis_client.py`和`company_profile_client.py`的逻辑迁移至此服务。
- [x] **验收标准**: 当所有数据提供商完成数据写入后此服务能被自动触发并成功生成AI分析报告。
- [x] **2. (可选) 创建 `config-service-rs`**
- [x] **任务**: 用Rust重写现有的Python `config-service`
- [x] **验收标准**: 所有Rust微服务都能从新的配置服务中获取配置并正常启动。
- [x] **3. 光荣退役下线所有Python服务**
- [x] **前提条件**: 所有数据获取和AI分析功能均已由新的Rust微服务完全承载。
- [x] **任务**: 在 `docker-compose.yml` 中,删除 `backend``config-service` 的服务定义。
- [x] **任务**: 将`backend/`和`services/config-service/`目录移动至`archive/python/`进行归档保留。
- [x] **验收标准**: 整个系统在没有任何Python组件的情况下依然能够完整、正常地运行所有核心功能。架构升级正式完成。
---
## 附录A: 数据提供商适配性分析
本附录用于详细记录每个待迁移的数据提供商API与我们`common-contracts`标准模型之间的适配性。
### A.1 Tushare 适配性分析
**核心结论**: 适配**完全可行**,但**计算逻辑复杂**。`common-contracts`无需调整。迁移工作的核心是精确复刻Python版本中近400行的财务数据聚合与派生指标计算逻辑。
**1. 数据模型适配概览**
| `common-contracts` 模型 | 适配可行性 | 关键实现要点 |
| :--- | :--- | :--- |
| **`CompanyProfile`** | ✅ **高** | 使用 `stock_basic``stock_company` 接口。 |
| **`DailyMarketData`** | ✅ **高** | 关联 `daily``daily_basic` 接口。 |
| **`RealtimeQuote`** | ⚠️ **中** | Tushare无直接对应接口可使用最新日线数据作为“准实时”替代。 |
| **`FinancialStatement`** | ✅ **高,但复杂** | **(核心难点)** 需聚合 `balancesheet`, `income`, `cashflow`, `fina_indicator` 等8个API并复刻近20个派生指标的计算。 |
**2. 关键迁移逻辑**
- **多表聚合**: Rust版本需实现并行调用多个Tushare API并以`end_date`为主键将结果聚合。
- **报告期筛选**: 需复刻“今年的最新报告 + 往年所有年报”的筛选逻辑。
- **派生指标计算**: 必须用Rust精确实现`_calculate_derived_metrics`方法中的所有计算公式。
---
### A.2 Finnhub 适配性分析
**核心结论**: 适配**可行**。Finnhub作为美股和全球市场的主要数据源数据较为规范但同样涉及**多API聚合**和**少量派生计算**。`common-contracts`无需调整。
**1. 数据模型适配概览**
| `common-contracts` 模型 | 适配可行性 | 关键实现要点 |
| :--- | :--- | :--- |
| **`CompanyProfile`** | ✅ **高** | 使用 `/stock/profile2` 接口。 |
| **`DailyMarketData`** | ✅ **高** | 使用 `/stock/candle` 接口获取OHLCV使用 `/stock/metric` 获取PE/PB等指标。 |
| **`RealtimeQuote`** | ✅ **高** | 使用 `/quote` 接口。 |
| **`FinancialStatement`** | ✅ **高,但需聚合** | 需聚合 `/stock/financials-reported` (按`ic`, `bs`, `cf`查询)返回的三张报表,并进行少量派生计算。 |
**2. 关键迁移逻辑**
- **多API聚合**: `FinancialStatement`的构建需要组合`/stock/financials-reported`接口的三次调用结果。`DailyMarketData`的构建也需要组合`/stock/candle`和`/stock/metric`。
- **派生指标计算**: Python代码 (`finnhub.py`) 中包含了自由现金流 (`__free_cash_flow`) 和其他一些比率的计算这些需要在Rust中复刻。
- **字段名映射**: Finnhub返回的字段名如`netIncome`)需要被映射到我们标准模型的字段名(如`net_income`)。
---
### A.3 YFinance 适配性分析
**核心结论**: 适配**可行**,主要作为**行情数据**的补充或备用源。`yfinance`库的封装使得数据获取相对简单。
**1. 数据模型适配概览**
| `common-contracts` 模型 | 适配可行性 | 关键实现要点 |
| :--- | :--- | :--- |
| **`CompanyProfile`** | ✅ **中** | `ticker.info` 字典提供了大部分信息但字段不如Finnhub或Tushare规范。 |
| **`DailyMarketData`** | ✅ **高** | `ticker.history()` 方法是主要数据来源可直接提供OHLCV。 |
| **`RealtimeQuote`** | ⚠️ **低** | `yfinance`本身不是为实时流式数据设计的,获取的数据有延迟。 |
| **`FinancialStatement`** | ✅ **中** | `ticker.financials`, `ticker.balance_sheet`, `ticker.cashflow` 提供了数据,但需要手动将多年度的数据列转换为按年份的记录行。 |
**2. 关键迁移逻辑**
- **数据结构转换**: `yfinance`返回的DataFrame需要被转换为我们期望的`Vec<Record>`结构。特别是财务报表,需要将列式(多年份)数据转换为行式(单年份)记录。
- **库的替代**: Rust中没有`yfinance`库。我们需要找到一个替代的Rust库 (如 `yahoo_finance_api`)或者直接模拟其HTTP请求来获取数据。这将是迁移此模块的主要工作。
---
- [ ] **2. (可选) 迁移其他数据提供商**
- [x] **任务**: 基于各自的适配性分析,创建并实现`finnhub-provider-service`。
- [x] **任务**: 基于各自的适配性分析,创建并实现`yfinance-provider-service`。
- [ ] **任务**: **(已暂停/待独立规划)** 实现`ifind-provider-service`。
---
### A.4 iFind 适配性分析 - **更新于 2025-11-16**
**核心结论**: **当前阶段纯Rust迁移复杂度极高任务已暂停**。iFind的Python接口 (`iFinDPy.py`) 是一个基于 `ctypes` 的薄封装它直接调用了底层的C/C++动态链接库 (`.so` 文件)。这意味着没有任何可见的网络协议或HTTP请求可供我们在Rust中直接模拟。
**1. 迁移路径评估**
基于对 `ref/ifind` 库文件的调研,我们确认了迁移此模块面临两个选择:
1. **HTTP API方案 (首选,待调研)**:
- **描述**: 您提到iFind存在一个HTTP API版本。这是最符合我们纯Rust、去中心化架构的理想路径。
- **工作量评估**: **中等**。如果该HTTP API文档齐全且功能满足需求那么开发此服务的工作量将与 `finnhub-provider-service` 类似。
- **规划**: 此路径应作为一个**独立的、后续的调研与开发任务**。当前置于暂停状态。
2. **FFI方案 (备选,不推荐)**:
- **描述**: 在Rust服务中通过FFI`pyo3``rust-cpython` crate嵌入Python解释器直接调用 `iFinDPy` 库。
- **工作量评估**: **高**。虽然可以复用逻辑但这会引入技术栈污染破坏我们纯Rust的目标并显著增加部署和维护的复杂度需要在容器中管理Python环境和iFind的二进制依赖。这与我们“rustic”的确定性原则相悖。
**2. 最终决定**
- **暂停实现**: `ifind-provider-service` 的开发工作已**正式暂停**。
- **更新路线图**: 在主路线图中,此任务已被标记为“已暂停/待独立规划”。
- **未来方向**: 当项目进入下一阶段时,我们将启动一个独立的任务来**专门调研其HTTP API**,并基于调研结果决定最终的实现策略。
---
## 附录B: 业务逻辑模块迁移分析
本附录用于分析`backend/app/services/`中包含的核心业务逻辑并为将其迁移至Rust服务制定策略。
### B.1 `analysis_client.py` & `company_profile_client.py`
- **核心功能**: 这两个模块是AI分析的核心负责与大语言模型如GeminiAPI进行交互。
- `analysis_client.py`: 提供一个**通用**的分析框架,可以根据不同的`prompt_template`执行任意类型的分析。它还包含一个`SafeFormatter`来安全地填充模板。
- `company_profile_client.py`: 是一个**特化**的版本包含了用于生成公司简介的、具体的、硬编码的长篇Prompt。
- **迁移策略**:
1. **统一并重写为 `report-generator-service`**: 这两个模块的功能应被合并并迁移到一个全新的Rust微服务——`report-generator-service`中。
2. **订阅事件**: 该服务将订阅Message Bus上的数据就绪事件如`FinancialsPersistedEvent`而不是被HTTP直接调用。
3. **Prompt管理**: 硬编码在`company_profile_client.py`中的Prompt以及`analysis_client.py`所依赖的、从`analysis-config.json`加载的模板,都应该由`report-generator-service`统一管理。在初期可以从配置文件加载未来可以由Rust版的`config-service-rs`提供。
4. **复刻`SafeFormatter`**: Python版本中用于安全填充模板的`SafeFormatter`逻辑需要在Rust中被等价复刻以确保在上下文不完整时系统的健壮性。
5. **AI客户端**: 使用`reqwest`或其他HTTP客户端库在Rust中重新实现与大模型API的交互逻辑。
- **结论**: 迁移**完全可行**。核心工作是将Python中的Prompt管理和API调用逻辑用Rust的异步方式重写。这将使AI分析任务成为一个独立的、可扩展的、事件驱动的后台服务。
---
### B.2 `config_manager.py`
- **核心功能**: 作为Python `backend`内部的一个组件,它负责从`config-service`拉取配置,并与本地`config.json`文件进行合并。它还包含了测试各种配置有效性的逻辑如测试数据库连接、Tushare Token等
- **迁移策略**:
- **功能分散化**: `ConfigManager`本身不会作为一个独立的Rust服务存在它的功能将被**分散**到每个需要它的微服务中。
- **配置拉取**: 每个Rust微服务`api-gateway`, `tushare-provider`等)在启动时,都将负责**独立地**从环境变量或未来的`config-service-rs`中获取自己的配置。我们为每个服务编写的`config.rs`模块已经实现了这一点。
- **配置测试逻辑**: 测试配置的逻辑(如`_test_database`, `_test_tushare`)非常有用,但不属于运行时功能。这些逻辑可以被迁移到:
1. **独立的CLI工具**: 创建一个Rust CLI工具专门用于测试和验证整个系统的配置。
2. **服务的`/health`端点**: 在每个服务的`/health`检查中可以包含对其依赖服务数据库、外部API连通性的检查从而在运行时提供健康状况反馈。
- **结论**: `ConfigManager`的功能将被**“肢解”并吸收**到新的Rust微服务生态中而不是直接迁移。
---
### B.3 `data_persistence_client.py`
- **核心功能**: 这是一个HTTP客户端用于让Python `backend`与其他微服务(`data-persistence-service`)通信。
- **迁移策略**:
- **模式复用**: 这个模块本身就是我们新架构模式的一个成功范例。
- **Rust等价实现**: 我们在`alphavantage-provider-service`中创建的`persistence.rs`客户端,以及即将在`api-gateway`和`report-generator-service`中创建的类似客户端,正是`data_persistence_client.py`的Rust等价物。
- **最终废弃**: 当Python `backend`最终被下线时,这个客户端模块也将随之被废弃。
- **结论**: 该模块**无需迁移**其设计思想已被我们的Rust服务所采纳和实现。

View File

@ -0,0 +1,23 @@
const BACKEND_BASE = process.env.NEXT_PUBLIC_BACKEND_URL;
export async function GET(
_req: Request,
context: { params: Promise<{ symbol: string }> }
) {
if (!BACKEND_BASE) {
return new Response('NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 });
}
const { symbol } = await context.params;
const target = `${BACKEND_BASE}/companies/${encodeURIComponent(symbol)}/profile`;
const resp = await fetch(target, { headers: { 'Content-Type': 'application/json' } });
const headers = new Headers();
const contentType = resp.headers.get('content-type') || 'application/json; charset=utf-8';
headers.set('content-type', contentType);
const cacheControl = resp.headers.get('cache-control');
if (cacheControl) headers.set('cache-control', cacheControl);
const xAccelBuffering = resp.headers.get('x-accel-buffering');
if (xAccelBuffering) headers.set('x-accel-buffering', xAccelBuffering);
return new Response(resp.body, { status: resp.status, headers });
}

View File

@ -0,0 +1,22 @@
import { NextRequest } from 'next/server';
const BACKEND_BASE = process.env.NEXT_PUBLIC_BACKEND_URL;
export async function POST(req: NextRequest) {
if (!BACKEND_BASE) {
return new Response('NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 });
}
const body = await req.text();
const resp = await fetch(`${BACKEND_BASE}/data-requests`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body,
});
const text = await resp.text();
return new Response(text, {
status: resp.status,
headers: { 'Content-Type': resp.headers.get('Content-Type') || 'application/json' },
});
}

View File

@ -0,0 +1,23 @@
const BACKEND_BASE = process.env.NEXT_PUBLIC_BACKEND_URL;
export async function GET(
_req: Request,
context: { params: Promise<{ request_id: string }> }
) {
if (!BACKEND_BASE) {
return new Response('NEXT_PUBLIC_BACKEND_URL 未配置', { status: 500 });
}
const { request_id } = await context.params;
const target = `${BACKEND_BASE}/tasks/${encodeURIComponent(request_id)}`;
const resp = await fetch(target, { headers: { 'Content-Type': 'application/json' } });
const headers = new Headers();
const contentType = resp.headers.get('content-type') || 'application/json; charset=utf-8';
headers.set('content-type', contentType);
const cacheControl = resp.headers.get('cache-control');
if (cacheControl) headers.set('cache-control', cacheControl);
const xAccelBuffering = resp.headers.get('x-accel-buffering');
if (xAccelBuffering) headers.set('x-accel-buffering', xAccelBuffering);
return new Response(resp.body, { status: resp.status, headers });
}

View File

@ -1,66 +1,97 @@
import useSWR from 'swr'; import useSWR, { SWRConfiguration } from "swr";
import { useConfigStore } from '@/stores/useConfigStore'; import { Financials, FinancialsIdentifier } from "@/types";
import { BatchFinancialDataResponse, FinancialConfigResponse, AnalysisConfigResponse, TodaySnapshotResponse, RealTimeQuoteResponse } from '@/types'; import { useEffect, useState } from "react";
import { AnalysisStep, AnalysisTask } from "@/lib/execution-step-manager";
const fetcher = async (url: string) => { const fetcher = (url: string) => fetch(url).then((res) => res.json());
const res = await fetch(url);
const contentType = res.headers.get('Content-Type') || '';
const text = await res.text();
// 尝试解析JSON // --- 新的异步任务Hooks ---
const tryParseJson = () => {
try { return JSON.parse(text); } catch { return null; }
};
const data = contentType.includes('application/json') ? tryParseJson() : tryParseJson(); // 用于触发数据获取任务
export function useDataRequest() {
const [isMutating, setIsMutating] = useState(false);
const [error, setError] = useState<Error | null>(null);
const trigger = async (symbol: string, market: string): Promise<string | undefined> => {
setIsMutating(true);
setError(null);
try {
const res = await fetch(`/api/data-requests`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ symbol, market }),
});
if (!res.ok) { if (!res.ok) {
// 后端可能返回纯文本错误,统一抛出可读错误 throw new Error(`Request failed with status ${res.status}`);
const message = data && data.detail ? data.detail : (text || `Request failed: ${res.status}`);
throw new Error(message);
} }
if (data === null) { const data = await res.json();
throw new Error('无效的服务器响应非JSON'); return data.request_id;
} catch (e: any) {
setError(e);
} finally {
setIsMutating(false);
} }
return data;
}; };
export function useConfig() { return {
const { setConfig, setError } = useConfigStore(); trigger,
const { data, error, isLoading } = useSWR('/api/config', fetcher, { isMutating,
onSuccess: (data) => setConfig(data), error,
onError: (err) => setError(err.message), };
});
return { data, error, isLoading };
} }
export async function updateConfig(newConfig: any) {
const res = await fetch('/api/config', { // 用于轮询任务进度
method: 'PUT', export function useTaskProgress(requestId: string | null, options?: SWRConfiguration) {
headers: { 'Content-Type': 'application/json' }, const { data, error, isLoading } = useSWR(
body: JSON.stringify(newConfig), requestId ? `/api/tasks/${requestId}` : null,
}); fetcher,
if (!res.ok) throw new Error(await res.text()); {
return res.json(); refreshInterval: 2000, // 每2秒轮询一次
...options,
errorRetryCount: 2,
}
);
const isFinished = !isLoading && (data?.status?.includes('completed') || data?.status?.includes('failed') || !data);
return {
progress: data,
isLoading,
isError: error,
isFinished,
};
} }
export async function testConfig(type: string, data: any) { // --- 保留的旧Hooks (用于查询最终数据) ---
const res = await fetch('/api/config/test', {
method: 'POST', export function useCompanyProfile(symbol?: string, market?: string) {
headers: { 'Content-Type': 'application/json' }, const { data, error, isLoading } = useSWR(
body: JSON.stringify({ config_type: type, config_data: data }), symbol && market ? `/api/companies/${symbol}/profile` : null,
}); fetcher
if (!res.ok) throw new Error(await res.text()); );
return res.json();
return {
profile: data,
isLoading,
isError: error,
};
} }
export function useFinancialConfig() { // ... 这里的其他数据查询Hooks (如财务报表等) 也将遵循类似的模式,
return useSWR<FinancialConfigResponse>('/api/financials/config', fetcher); // 直接从api-gateway查询持久化后的数据 ...
}
// --- 废弃的旧Hooks ---
/**
* @deprecated This hook is deprecated and will be removed.
* Use useDataRequest and useTaskProgress instead.
*/
export function useChinaFinancials(ts_code?: string, years: number = 10) { export function useChinaFinancials(ts_code?: string, years: number = 10) {
return useSWR<BatchFinancialDataResponse>( return useSWR<BatchFinancialDataResponse>(
ts_code ? `/api/financials/cn/${encodeURIComponent(ts_code)}?years=${encodeURIComponent(String(years))}` : null, ts_code ? `/api/financials/cn/${encodeURIComponent(ts_code)}?years=${encodeURIComponent(String(years))}` : null,

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,45 @@
[package]
name = "alphavantage-provider-service"
version = "0.1.0"
edition = "2021"
[dependencies]
# Web Service
axum = "0.8.7"
tokio = { version = "1", features = ["full"] }
tower-http = { version = "0.6.6", features = ["cors"] }
# Shared Contracts
common-contracts = { path = "../common-contracts" }
# Generic MCP Client
rmcp = { version = "0.8.5", features = ["client", "transport-streamable-http-client-reqwest"] }
# Message Queue (NATS)
async-nats = "0.45.0"
futures-util = "0.3"
# Data Persistence Client
reqwest = { version = "0.12", features = ["json"] }
# Concurrency & Async
async-trait = "0.1"
dashmap = "6.1.0" # For concurrent task tracking
uuid = { version = "1.8", features = ["v4"] }
# Serialization
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
# Logging & Telemetry
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
# Configuration
config = "0.15.19"
secrecy = { version = "0.10.3", features = ["serde"] }
# Error Handling
thiserror = "2.0.17"
anyhow = "1.0"
chrono = { version = "0.4", features = ["serde"] }

View File

@ -0,0 +1,35 @@
# 1. Build Stage
FROM rust:1.90 as builder
WORKDIR /usr/src/app
# Pre-build dependencies to leverage Docker layer caching
COPY ./services/common-contracts /usr/src/app/services/common-contracts
COPY ./services/alphavantage-provider-service/Cargo.toml ./services/alphavantage-provider-service/Cargo.lock* ./services/alphavantage-provider-service/
WORKDIR /usr/src/app/services/alphavantage-provider-service
RUN mkdir -p src && \
echo "fn main() {}" > src/main.rs && \
cargo build --release --bin alphavantage-provider-service
# Copy the full source code
COPY ./services/alphavantage-provider-service /usr/src/app/services/alphavantage-provider-service
# Build the application
WORKDIR /usr/src/app/services/alphavantage-provider-service
RUN cargo build --release --bin alphavantage-provider-service
# 2. Runtime Stage
FROM debian:bookworm-slim
# Set timezone
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
# Copy the built binary from the builder stage
COPY --from=builder /usr/src/app/services/alphavantage-provider-service/target/release/alphavantage-provider-service /usr/local/bin/
# Set the binary as the entrypoint
ENTRYPOINT ["/usr/local/bin/alphavantage-provider-service"]

View File

@ -0,0 +1,119 @@
use secrecy::{SecretString, ExposeSecret};
use serde::Deserialize;
use crate::error::Result;
const DEFAULT_BASE_URL: &str = "https://www.alphavantage.co";
#[derive(Clone)]
pub struct AlphaVantageHttpClient {
client: reqwest::Client,
base_url: String,
api_key: SecretString,
}
impl AlphaVantageHttpClient {
pub fn new(api_key: SecretString) -> Self {
Self {
client: reqwest::Client::new(),
base_url: DEFAULT_BASE_URL.to_string(),
api_key,
}
}
pub async fn fetch_overview(&self, symbol: &str) -> Result<AvOverview> {
let url = format!("{}/query", self.base_url);
let resp = self
.client
.get(&url)
.query(&[
("function", "OVERVIEW"),
("symbol", symbol),
("apikey", self.api_key.expose_secret()),
])
.send()
.await?
.error_for_status()?
.json::<AvOverview>()
.await?;
Ok(resp)
}
pub async fn fetch_global_quote(&self, symbol: &str) -> Result<AvGlobalQuoteEnvelope> {
let url = format!("{}/query", self.base_url);
let resp = self
.client
.get(&url)
.query(&[
("function", "GLOBAL_QUOTE"),
("symbol", symbol),
("apikey", self.api_key.expose_secret()),
])
.send()
.await?
.error_for_status()?
.json::<AvGlobalQuoteEnvelope>()
.await?;
Ok(resp)
}
}
// --- Alpha Vantage Models (subset, strong-typed) ---
#[derive(Debug, Deserialize)]
pub struct AvOverview {
#[serde(rename = "Symbol")]
pub symbol: String,
#[serde(rename = "Name")]
pub name: String,
#[serde(rename = "Industry")]
pub industry: String,
#[serde(rename = "Exchange")]
pub exchange: String,
#[serde(rename = "Currency")]
pub currency: String,
#[serde(rename = "Country")]
pub country: String,
#[serde(rename = "Sector")]
pub sector: String,
#[serde(rename = "MarketCapitalization")]
pub market_capitalization: Option<String>,
#[serde(rename = "PERatio")]
pub pe_ratio: Option<String>,
#[serde(rename = "Beta")]
pub beta: Option<String>,
#[serde(rename = "Description")]
pub description: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct AvGlobalQuoteEnvelope {
#[serde(rename = "Global Quote")]
pub global_quote: AvGlobalQuote,
}
#[derive(Debug, Deserialize)]
pub struct AvGlobalQuote {
#[serde(rename = "01. symbol")]
pub symbol: String,
#[serde(rename = "02. open")]
pub open: Option<String>,
#[serde(rename = "03. high")]
pub high: Option<String>,
#[serde(rename = "04. low")]
pub low: Option<String>,
#[serde(rename = "05. price")]
pub price: Option<String>,
#[serde(rename = "06. volume")]
pub volume: Option<String>,
#[serde(rename = "07. latest trading day")]
pub latest_trading_day: Option<String>,
#[serde(rename = "08. previous close")]
pub previous_close: Option<String>,
#[serde(rename = "09. change")]
pub change: Option<String>,
#[serde(rename = "10. change percent")]
pub change_percent: Option<String>,
}

View File

@ -0,0 +1,43 @@
use std::collections::HashMap;
use axum::{
extract::State,
response::Json,
routing::get,
Router,
};
use common_contracts::observability::{HealthStatus, ServiceStatus, TaskProgress};
use crate::state::AppState;
pub fn create_router(app_state: AppState) -> Router {
Router::new()
.route("/health", get(health_check))
.route("/tasks", get(get_current_tasks))
.with_state(app_state)
}
/// [GET /health]
/// Provides the current health status of the module.
async fn health_check(State(_state): State<AppState>) -> Json<HealthStatus> {
let mut details = HashMap::new();
// In a real scenario, we would check connections to the message bus, etc.
details.insert("message_bus_connection".to_string(), "ok".to_string());
let status = HealthStatus {
module_id: "alphavantage-provider-service".to_string(),
status: ServiceStatus::Ok,
version: env!("CARGO_PKG_VERSION").to_string(),
details,
};
Json(status)
}
/// [GET /tasks]
/// Reports all currently processing tasks and their progress.
async fn get_current_tasks(State(state): State<AppState>) -> Json<Vec<TaskProgress>> {
let tasks: Vec<TaskProgress> = state
.tasks
.iter()
.map(|entry| entry.value().clone())
.collect();
Json(tasks)
}

View File

@ -0,0 +1,60 @@
use crate::error::{AppError, Result};
use rmcp::{ClientHandler, ServiceExt};
use rmcp::model::CallToolRequestParam;
use rmcp::transport::StreamableHttpClientTransport;
use serde_json::{Map, Value};
#[derive(Debug, Clone, Default)]
struct DummyClientHandler;
impl ClientHandler for DummyClientHandler {
fn get_info(&self) -> rmcp::model::ClientInfo {
rmcp::model::ClientInfo::default()
}
}
// 不需要 Clone由外部用 Arc 包裹
pub struct AvClient {
service: rmcp::service::RunningService<rmcp::RoleClient, DummyClientHandler>,
}
impl AvClient {
pub async fn connect(mcp_endpoint_url: &str) -> Result<Self> {
let transport = StreamableHttpClientTransport::from_uri(mcp_endpoint_url.to_string());
let running = DummyClientHandler
::default()
.serve(transport)
.await
.map_err(|e| AppError::Configuration(format!("Fail to init MCP service: {e:?}")))?;
Ok(Self { service: running })
}
pub async fn query(&self, function: &str, params: &[(&str, &str)]) -> Result<Value> {
let mut args = Map::new();
args.insert("function".to_string(), Value::String(function.to_string()));
for (k, v) in params {
args.insert((*k).to_string(), Value::String((*v).to_string()));
}
let result = self
.service
.call_tool(CallToolRequestParam {
name: function.to_string().into(),
arguments: Some(args),
})
.await
.map_err(|e| AppError::Configuration(format!("MCP call_tool error: {e:?}")))?;
if let Some(v) = result.structured_content {
return Ok(v);
}
// fallback: try parse first text content as json
if let Some(text) = result.content.first().and_then(|c| c.raw.as_text()).map(|t| t.text.clone()) {
if let Ok(v) = serde_json::from_str::<Value>(&text) {
return Ok(v);
}
return Ok(Value::String(text));
}
Ok(Value::Null)
}
}

View File

@ -0,0 +1,20 @@
use secrecy::SecretString;
use serde::Deserialize;
#[derive(Deserialize, Debug)]
pub struct AppConfig {
pub server_port: u16,
pub nats_addr: String,
pub alphavantage_api_key: SecretString,
pub data_persistence_service_url: String,
}
impl AppConfig {
pub fn load() -> Result<Self, config::ConfigError> {
let config = config::Config::builder()
.add_source(config::Environment::default().separator("__"))
.build()?;
config.try_deserialize()
}
}

View File

@ -0,0 +1,40 @@
use thiserror::Error;
pub type Result<T> = std::result::Result<T, AppError>;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Configuration error: {0}")]
Configuration(String),
#[error("Message bus error: {0}")]
MessageBus(#[from] async_nats::Error),
#[error("Message bus publish error: {0}")]
MessageBusPublish(#[from] async_nats::PublishError),
#[error("Message bus subscribe error: {0}")]
MessageBusSubscribe(String),
#[error("Message bus connect error: {0}")]
MessageBusConnect(String),
#[error("HTTP request to another service failed: {0}")]
ServiceRequest(#[from] reqwest::Error),
#[error("Data parsing error: {0}")]
DataParsing(#[from] anyhow::Error),
}
// 手动实现针对 async-nats 泛型错误类型的 From 转换
impl From<async_nats::error::Error<async_nats::ConnectErrorKind>> for AppError {
fn from(err: async_nats::error::Error<async_nats::ConnectErrorKind>) -> Self {
AppError::MessageBusConnect(err.to_string())
}
}
impl From<async_nats::SubscribeError> for AppError {
fn from(err: async_nats::SubscribeError) -> Self {
AppError::MessageBusSubscribe(err.to_string())
}
}

View File

@ -0,0 +1,46 @@
mod api;
mod config;
mod error;
mod mapping;
mod message_consumer;
mod persistence;
mod state;
mod worker;
mod av_client;
use crate::config::AppConfig;
use crate::error::Result;
use crate::state::AppState;
use tracing::info;
#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
info!("Starting alphavantage-provider-service...");
// Load configuration
let config = AppConfig::load().map_err(|e| error::AppError::Configuration(e.to_string()))?;
let port = config.server_port;
// Initialize application state
let app_state = AppState::new(config)?;
// Create the Axum router
let app = api::create_router(app_state.clone());
// --- Start the message consumer ---
tokio::spawn(message_consumer::run(app_state));
// Start the HTTP server
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port))
.await
.unwrap();
info!("HTTP server listening on port {}", port);
axum::serve(listener, app).await.unwrap();
Ok(())
}

View File

@ -0,0 +1,184 @@
//! 数据模型转换模块AlphaVantage 专用映射)
//!
//! 使用 AlphaVantage HTTP JSON → 我们的 DTO 的强类型转换。
//!
use anyhow::{anyhow, Context};
use chrono::{NaiveDate, Utc};
use common_contracts::dtos::{CompanyProfileDto, RealtimeQuoteDto, TimeSeriesFinancialDto};
use serde_json::Value;
// --- Helpers ---
fn parse_date(s: &str) -> anyhow::Result<NaiveDate> {
NaiveDate::parse_from_str(s, "%Y-%m-%d").map_err(|e| anyhow!("Invalid date '{}': {}", s, e))
}
fn parse_f64_opt(v: &Value) -> Option<f64> {
v.as_str().and_then(|s| s.parse::<f64>().ok())
}
fn required_str<'a>(v: &'a Value, key: &str) -> anyhow::Result<&'a str> {
v.get(key)
.and_then(|x| x.as_str())
.ok_or_else(|| anyhow!("Missing required string field '{}'", key))
}
// --- CompanyProfile Mapping ---
pub fn parse_company_profile(v: Value) -> anyhow::Result<CompanyProfileDto> {
Ok(CompanyProfileDto {
symbol: required_str(&v, "Symbol")?.to_string(),
name: required_str(&v, "Name")?.to_string(),
list_date: v
.get("IPODate")
.and_then(|x| x.as_str())
.map(parse_date)
.transpose()?,
industry: v.get("Industry").and_then(|x| x.as_str()).map(|s| s.to_string()),
additional_info: Some(serde_json::json!({
"exchange": v.get("Exchange"),
"currency": v.get("Currency"),
"country": v.get("Country"),
"sector": v.get("Sector"),
"market_capitalization": v.get("MarketCapitalization"),
"pe_ratio": v.get("PERatio"),
"beta": v.get("Beta")
})),
})
}
// --- Financials Mapping ---
pub struct CombinedFinancials {
pub income: Value,
pub balance_sheet: Value,
pub cash_flow: Value,
}
pub fn parse_financials(cf: CombinedFinancials) -> anyhow::Result<Vec<TimeSeriesFinancialDto>> {
let symbol = required_str(&cf.income, "symbol")?.to_string();
let income_reports = cf
.income
.get("annualReports")
.and_then(|x| x.as_array())
.context("Missing annualReports in income statement")?;
let balance_reports = cf
.balance_sheet
.get("annualReports")
.and_then(|x| x.as_array())
.unwrap_or(&vec![])
.clone();
let cashflow_reports = cf
.cash_flow
.get("annualReports")
.and_then(|x| x.as_array())
.unwrap_or(&vec![])
.clone();
let mut out: Vec<TimeSeriesFinancialDto> = Vec::new();
for inc in income_reports {
let date_str = required_str(inc, "fiscalDateEnding")?;
let period_date = parse_date(date_str)?;
// income metrics
if let Some(v) = parse_f64_opt(&inc["totalRevenue"]) {
out.push(TimeSeriesFinancialDto {
symbol: symbol.clone(),
metric_name: "revenue".to_string(),
period_date,
value: v,
source: Some("alphavantage".to_string()),
});
}
if let Some(v) = parse_f64_opt(&inc["netIncome"]) {
out.push(TimeSeriesFinancialDto {
symbol: symbol.clone(),
metric_name: "net_income".to_string(),
period_date,
value: v,
source: Some("alphavantage".to_string()),
});
}
// match balance and cashflow by fiscalDateEnding
if let Some(bal) = balance_reports
.iter()
.find(|r| r.get("fiscalDateEnding") == Some(&Value::String(date_str.to_string())))
{
if let Some(v) = parse_f64_opt(&bal["totalAssets"]) {
out.push(TimeSeriesFinancialDto {
symbol: symbol.clone(),
metric_name: "total_assets".to_string(),
period_date,
value: v,
source: Some("alphavantage".to_string()),
});
}
if let Some(v) = parse_f64_opt(&bal["totalLiabilities"]) {
out.push(TimeSeriesFinancialDto {
symbol: symbol.clone(),
metric_name: "total_liabilities".to_string(),
period_date,
value: v,
source: Some("alphavantage".to_string()),
});
}
}
if let Some(cf) = cashflow_reports
.iter()
.find(|r| r.get("fiscalDateEnding") == Some(&Value::String(date_str.to_string())))
{
if let Some(v) = parse_f64_opt(&cf["operatingCashflow"]) {
out.push(TimeSeriesFinancialDto {
symbol: symbol.clone(),
metric_name: "operating_cashflow".to_string(),
period_date,
value: v,
source: Some("alphavantage".to_string()),
});
}
}
}
Ok(out)
}
// --- RealtimeQuote Mapping ---
pub fn parse_realtime_quote(v: Value, market: &str) -> anyhow::Result<RealtimeQuoteDto> {
let q = v
.get("Global Quote")
.ok_or_else(|| anyhow!("Missing 'Global Quote' object"))?;
let symbol = required_str(q, "01. symbol")?.to_string();
let price = parse_f64_opt(&q["05. price"]).context("Invalid price")?;
let open_price = parse_f64_opt(&q["02. open"]);
let high_price = parse_f64_opt(&q["03. high"]);
let low_price = parse_f64_opt(&q["04. low"]);
let prev_close = parse_f64_opt(&q["08. previous close"]);
let change = parse_f64_opt(&q["09. change"]);
let change_percent = q
.get("10. change percent")
.and_then(|x| x.as_str())
.and_then(|s| s.trim_end_matches('%').parse::<f64>().ok());
let volume = q
.get("06. volume")
.and_then(|x| x.as_str())
.and_then(|s| s.parse::<i64>().ok());
Ok(RealtimeQuoteDto {
symbol,
market: market.to_string(),
ts: Utc::now(),
price,
open_price,
high_price,
low_price,
prev_close,
change,
change_percent,
volume,
source: Some("alphavantage".to_string()),
})
}

View File

@ -0,0 +1,47 @@
use crate::error::Result;
use crate::state::AppState;
use common_contracts::messages::FetchCompanyDataCommand;
use futures_util::StreamExt;
use tracing::{error, info};
const SUBJECT_NAME: &str = "data_fetch_commands";
pub async fn run(state: AppState) -> Result<()> {
info!("Starting NATS message consumer...");
let client = async_nats::connect(&state.config.nats_addr).await?;
info!("Connected to NATS.");
// This is a simple subscriber. For production, consider JetStream for durability.
let mut subscriber = client.subscribe(SUBJECT_NAME.to_string()).await?;
info!(
"Consumer started, waiting for messages on subject '{}'",
SUBJECT_NAME
);
while let Some(message) = subscriber.next().await {
info!("Received NATS message.");
let state_clone = state.clone();
let publisher_clone = client.clone();
tokio::spawn(async move {
match serde_json::from_slice::<FetchCompanyDataCommand>(&message.payload) {
Ok(command) => {
info!("Deserialized command for symbol: {}", command.symbol);
if let Err(e) =
crate::worker::handle_fetch_command(state_clone, command, publisher_clone)
.await
{
error!("Error handling fetch command: {:?}", e);
}
}
Err(e) => {
error!("Failed to deserialize message: {}", e);
}
}
});
}
Ok(())
}

View File

@ -0,0 +1,70 @@
//!
//! 数据持久化客户端
//!
//! 提供一个类型化的接口,用于与 `data-persistence-service` 进行通信。
//!
use crate::error::Result;
use common_contracts::{
dtos::{CompanyProfileDto, RealtimeQuoteDto, TimeSeriesFinancialBatchDto, TimeSeriesFinancialDto},
};
use tracing::info;
#[derive(Clone)]
pub struct PersistenceClient {
client: reqwest::Client,
base_url: String,
}
impl PersistenceClient {
pub fn new(base_url: String) -> Self {
Self {
client: reqwest::Client::new(),
base_url,
}
}
pub async fn upsert_company_profile(&self, profile: CompanyProfileDto) -> Result<()> {
let url = format!("{}/companies", self.base_url);
info!("Upserting company profile for {} to {}", profile.symbol, url);
self.client
.put(&url)
.json(&profile)
.send()
.await?
.error_for_status()?;
Ok(())
}
pub async fn upsert_realtime_quote(&self, quote: RealtimeQuoteDto) -> Result<()> {
let url = format!("{}/market-data/quotes", self.base_url);
info!("Upserting realtime quote for {} to {}", quote.symbol, url);
self.client
.post(&url)
.json(&quote)
.send()
.await?
.error_for_status()?;
Ok(())
}
pub async fn batch_insert_financials(&self, dtos: Vec<TimeSeriesFinancialDto>) -> Result<()> {
if dtos.is_empty() {
return Ok(());
}
let url = format!("{}/market-data/financials/batch", self.base_url);
let symbol = dtos[0].symbol.clone();
info!("Batch inserting {} financial statements for {} to {}", dtos.len(), symbol, url);
let batch = TimeSeriesFinancialBatchDto { records: dtos };
self.client
.post(&url)
.json(&batch)
.send()
.await?
.error_for_status()?;
Ok(())
}
}

View File

@ -0,0 +1,23 @@
use std::sync::Arc;
use common_contracts::observability::TaskProgress;
use dashmap::DashMap;
use uuid::Uuid;
use crate::config::AppConfig;
use crate::error::Result;
pub type TaskStore = Arc<DashMap<Uuid, TaskProgress>>;
#[derive(Clone)]
pub struct AppState {
pub config: Arc<AppConfig>,
pub tasks: TaskStore,
}
impl AppState {
pub fn new(config: AppConfig) -> Result<Self> {
Ok(Self {
config: Arc::new(config),
tasks: Arc::new(DashMap::new()),
})
}
}

View File

@ -0,0 +1,149 @@
use crate::error::Result;
use crate::mapping::{CombinedFinancials, parse_company_profile, parse_financials, parse_realtime_quote};
use crate::persistence::PersistenceClient;
use crate::state::{AppState, TaskStore};
use anyhow::Context;
use chrono::{Utc, Datelike};
use common_contracts::messages::{FetchCompanyDataCommand, FinancialsPersistedEvent};
use common_contracts::observability::TaskProgress;
use secrecy::ExposeSecret;
use std::sync::Arc;
use tracing::{error, info, instrument};
use uuid::Uuid;
use crate::av_client::AvClient;
#[instrument(skip(state, command, publisher), fields(request_id = %command.request_id, symbol = %command.symbol))]
pub async fn handle_fetch_command(
state: AppState,
command: FetchCompanyDataCommand,
publisher: async_nats::Client,
) -> Result<()> {
info!("Handling fetch data command.");
let task = TaskProgress {
request_id: command.request_id,
task_name: format!("fetch_data_for_{}", command.symbol),
status: "in_progress".to_string(),
progress_percent: 0,
details: "Initializing...".to_string(),
started_at: Utc::now(),
};
state.tasks.insert(command.request_id, task);
let api_key = state.config.alphavantage_api_key.expose_secret();
let mcp_endpoint = format!("https://mcp.alphavantage.co/mcp?apikey={}", api_key);
let client = Arc::new(AvClient::connect(&mcp_endpoint).await?);
let persistence_client =
PersistenceClient::new(state.config.data_persistence_service_url.clone());
let symbol = command.symbol.clone();
update_task_progress(
&state.tasks,
command.request_id,
10,
"Fetching from AlphaVantage...",
)
.await;
// --- 1. Fetch all data in parallel ---
let (overview_json, income_json, balance_json, cashflow_json, quote_json) = {
let params_overview = vec![("symbol", symbol.as_str())];
let params_income = vec![("symbol", symbol.as_str())];
let params_balance = vec![("symbol", symbol.as_str())];
let params_cashflow = vec![("symbol", symbol.as_str())];
let params_quote = vec![("symbol", symbol.as_str())];
let overview_task = client.query("OVERVIEW", &params_overview);
let income_task = client.query("INCOME_STATEMENT", &params_income);
let balance_task = client.query("BALANCE_SHEET", &params_balance);
let cashflow_task = client.query("CASH_FLOW", &params_cashflow);
let quote_task = client.query("GLOBAL_QUOTE", &params_quote);
match tokio::try_join!(
overview_task,
income_task,
balance_task,
cashflow_task,
quote_task
) {
Ok(data) => data,
Err(e) => {
let error_msg = format!("Failed to fetch data from AlphaVantage: {}", e);
error!(error_msg);
update_task_progress(&state.tasks, command.request_id, 100, &error_msg).await;
return Err(e);
}
}
};
update_task_progress(
&state.tasks,
command.request_id,
50,
"Data fetched, transforming and persisting...",
)
.await;
// --- 2. Transform and persist data ---
// Profile
let profile_to_persist =
parse_company_profile(overview_json).context("Failed to parse CompanyProfile")?;
persistence_client
.upsert_company_profile(profile_to_persist)
.await?;
// Financials
let combined_financials = CombinedFinancials {
income: income_json,
balance_sheet: balance_json,
cash_flow: cashflow_json,
};
let financials_to_persist =
parse_financials(combined_financials).context("Failed to parse FinancialStatements")?;
let years_updated: Vec<u16> = financials_to_persist
.iter()
.map(|f| f.period_date.year() as u16)
.collect();
persistence_client
.batch_insert_financials(financials_to_persist)
.await?;
// Quote
let quote_to_persist =
parse_realtime_quote(quote_json, &command.market).context("Failed to parse RealtimeQuote")?;
persistence_client
.upsert_realtime_quote(quote_to_persist)
.await?;
update_task_progress(
&state.tasks,
command.request_id,
90,
"Data persisted, publishing events...",
)
.await;
// --- 3. Publish events ---
let event = FinancialsPersistedEvent {
request_id: command.request_id,
symbol: command.symbol,
years_updated,
};
let subject = "financials.persisted".to_string(); // NATS subject
publisher
.publish(subject, serde_json::to_vec(&event).unwrap().into())
.await?;
state.tasks.remove(&command.request_id);
info!("Task completed successfully.");
Ok(())
}
async fn update_task_progress(tasks: &TaskStore, request_id: Uuid, percent: u8, details: &str) {
if let Some(mut task) = tasks.get_mut(&request_id) {
task.progress_percent = percent;
task.details = details.to_string();
info!("Task update: {}% - {}", percent, details);
}
}

3869
services/api-gateway/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,38 @@
[package]
name = "api-gateway"
version = "0.1.0"
edition = "2024"
[dependencies]
# Web Service
axum = "0.8.7"
tokio = { version = "1", features = ["full"] }
tower-http = { version = "0.6.6", features = ["cors"] }
# Shared Contracts
common-contracts = { path = "../common-contracts" }
# Message Queue (NATS)
async-nats = "0.45.0"
futures-util = "0.3"
# HTTP Client
reqwest = { version = "0.12", features = ["json"] }
# Concurrency & Async
uuid = { version = "1.8", features = ["v4"] }
# Serialization
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
# Logging & Telemetry
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
# Configuration
config = "0.15.19"
# Error Handling
thiserror = "2.0.17"
anyhow = "1.0"

View File

@ -0,0 +1,35 @@
# 1. Build Stage
FROM rust:1.90 as builder
WORKDIR /usr/src/app
# Pre-build dependencies to leverage Docker layer caching
COPY ./services/common-contracts /usr/src/app/services/common-contracts
COPY ./services/api-gateway/Cargo.toml ./services/api-gateway/Cargo.lock* ./services/api-gateway/
WORKDIR /usr/src/app/services/api-gateway
RUN mkdir -p src && \
echo "fn main() {}" > src/main.rs && \
cargo build --release --bin api-gateway
# Copy the full source code
COPY ./services/api-gateway /usr/src/app/services/api-gateway
# Build the application
WORKDIR /usr/src/app/services/api-gateway
RUN cargo build --release --bin api-gateway
# 2. Runtime Stage
FROM debian:bookworm-slim
# Set timezone
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
# Copy the built binary from the builder stage
COPY --from=builder /usr/src/app/services/api-gateway/target/release/api-gateway /usr/local/bin/
# Set the binary as the entrypoint
ENTRYPOINT ["/usr/local/bin/api-gateway"]

View File

@ -0,0 +1,147 @@
use crate::error::Result;
use crate::state::AppState;
use axum::{
extract::{Path, State},
http::StatusCode,
response::{IntoResponse, Json},
routing::{get, post},
Router,
};
use common_contracts::messages::FetchCompanyDataCommand;
use common_contracts::observability::{HealthStatus, ServiceStatus, TaskProgress};
use futures_util::future::join_all;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::{info, warn};
use uuid::Uuid;
const DATA_FETCH_QUEUE: &str = "data_fetch_commands";
// --- Request/Response Structs ---
#[derive(Deserialize)]
pub struct DataRequest {
pub symbol: String,
pub market: String,
}
#[derive(Serialize)]
pub struct RequestAcceptedResponse {
pub request_id: Uuid,
}
// --- Router Definition ---
pub fn create_router(app_state: AppState) -> Router {
Router::new()
.route("/health", get(health_check))
.route("/tasks", get(get_current_tasks)) // This is the old, stateless one
.route("/v1/data-requests", post(trigger_data_fetch))
.route("/v1/companies/:symbol/profile", get(get_company_profile))
.route("/v1/tasks/:request_id", get(get_task_progress))
.with_state(app_state)
}
// --- Health & Stateless Tasks ---
async fn health_check(State(state): State<AppState>) -> Json<HealthStatus> {
let mut details = HashMap::new();
// 提供确定性且无副作用的健康详情,避免访问不存在的状态字段
details.insert("message_bus".to_string(), "nats".to_string());
details.insert("nats_addr".to_string(), state.config.nats_addr.clone());
let status = HealthStatus {
module_id: "api-gateway".to_string(),
status: ServiceStatus::Ok,
version: env!("CARGO_PKG_VERSION").to_string(),
details,
};
Json(status)
}
async fn get_current_tasks() -> Json<Vec<TaskProgress>> {
Json(vec![])
}
// --- API Handlers ---
/// [POST /v1/data-requests]
/// Triggers the data fetching process by publishing a command to the message bus.
async fn trigger_data_fetch(
State(state): State<AppState>,
Json(payload): Json<DataRequest>,
) -> Result<impl IntoResponse> {
let request_id = Uuid::new_v4();
let command = FetchCompanyDataCommand {
request_id,
symbol: payload.symbol,
market: payload.market,
};
info!(request_id = %request_id, "Publishing data fetch command");
state
.nats_client
.publish(
DATA_FETCH_QUEUE.to_string(),
serde_json::to_vec(&command).unwrap().into(),
)
.await?;
Ok((
StatusCode::ACCEPTED,
Json(RequestAcceptedResponse { request_id }),
))
}
/// [GET /v1/companies/:symbol/profile]
/// Queries the persisted company profile from the data-persistence-service.
async fn get_company_profile(
State(state): State<AppState>,
Path(symbol): Path<String>,
) -> Result<impl IntoResponse> {
let profile = state.persistence_client.get_company_profile(&symbol).await?;
Ok(Json(profile))
}
/// [GET /v1/tasks/:request_id]
/// Aggregates task progress from all downstream provider services.
async fn get_task_progress(
State(state): State<AppState>,
Path(request_id): Path<Uuid>,
) -> Result<impl IntoResponse> {
let client = reqwest::Client::new();
let fetches = state
.config
.provider_services
.iter()
.map(|service_url| {
let client = client.clone();
let url = format!("{}/tasks", service_url);
async move {
match client.get(&url).send().await {
Ok(resp) => match resp.json::<Vec<TaskProgress>>().await {
Ok(tasks) => Some(tasks),
Err(e) => {
warn!("Failed to decode tasks from {}: {}", url, e);
None
}
},
Err(e) => {
warn!("Failed to fetch tasks from {}: {}", url, e);
None
}
}
}
});
let results = join_all(fetches).await;
let mut merged: Vec<TaskProgress> = Vec::new();
for maybe_tasks in results {
if let Some(tasks) = maybe_tasks {
merged.extend(tasks);
}
}
if let Some(task) = merged.into_iter().find(|t| t.request_id == request_id) {
Ok((StatusCode::OK, Json(task)).into_response())
} else {
Ok((StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Task not found"}))).into_response())
}
}

View File

@ -0,0 +1,19 @@
use serde::Deserialize;
#[derive(Deserialize, Debug)]
pub struct AppConfig {
pub server_port: u16,
pub nats_addr: String,
pub data_persistence_service_url: String,
pub provider_services: Vec<String>,
}
impl AppConfig {
pub fn load() -> Result<Self, config::ConfigError> {
let config = config::Config::builder()
.add_source(config::Environment::default().separator("__"))
.build()?;
config.try_deserialize()
}
}

View File

@ -0,0 +1,58 @@
use thiserror::Error;
use axum::{http::StatusCode, response::IntoResponse, Json};
use serde_json::json;
pub type Result<T> = std::result::Result<T, AppError>;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Configuration error: {0}")]
Configuration(String),
#[error("Message bus error: {0}")]
MessageBus(#[from] async_nats::Error),
#[error("Message bus publish error: {0}")]
MessageBusPublish(#[from] async_nats::PublishError),
#[error("Message bus subscribe error: {0}")]
MessageBusSubscribe(String),
#[error("Message bus connect error: {0}")]
MessageBusConnect(String),
#[error("HTTP request to another service failed: {0}")]
ServiceRequest(#[from] reqwest::Error),
#[error("An unexpected error occurred.")]
Anyhow(#[from] anyhow::Error),
}
impl IntoResponse for AppError {
fn into_response(self) -> axum::response::Response {
let (status, message) = match &self {
AppError::Configuration(msg) => (StatusCode::INTERNAL_SERVER_ERROR, msg.clone()),
AppError::MessageBus(err) => (StatusCode::SERVICE_UNAVAILABLE, err.to_string()),
AppError::MessageBusPublish(err) => (StatusCode::SERVICE_UNAVAILABLE, err.to_string()),
AppError::MessageBusSubscribe(msg) => (StatusCode::SERVICE_UNAVAILABLE, msg.clone()),
AppError::MessageBusConnect(msg) => (StatusCode::SERVICE_UNAVAILABLE, msg.clone()),
AppError::ServiceRequest(err) => (StatusCode::BAD_GATEWAY, err.to_string()),
AppError::Anyhow(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
};
let body = Json(json!({ "error": message }));
(status, body).into_response()
}
}
// 手动实现针对 async-nats 泛型错误类型的 From 转换
impl From<async_nats::error::Error<async_nats::ConnectErrorKind>> for AppError {
fn from(err: async_nats::error::Error<async_nats::ConnectErrorKind>) -> Self {
AppError::MessageBusConnect(err.to_string())
}
}
impl From<async_nats::SubscribeError> for AppError {
fn from(err: async_nats::SubscribeError) -> Self {
AppError::MessageBusSubscribe(err.to_string())
}
}

View File

@ -0,0 +1,39 @@
mod api;
mod config;
mod error;
mod state;
mod persistence;
use crate::config::AppConfig;
use crate::error::Result;
use crate::state::AppState;
use tracing::info;
#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
info!("Starting api-gateway service...");
// Load configuration
let config = AppConfig::load().map_err(|e| error::AppError::Configuration(e.to_string()))?;
let port = config.server_port;
// Initialize application state
let app_state = AppState::new(config).await?;
// Create the Axum router
let app = api::create_router(app_state);
// Start the HTTP server
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port))
.await
.unwrap();
info!("HTTP server listening on port {}", port);
axum::serve(listener, app).await.unwrap();
Ok(())
}

View File

@ -0,0 +1,34 @@
//!
//! 数据持久化服务客户端
//!
use crate::error::Result;
use common_contracts::dtos::CompanyProfileDto;
#[derive(Clone)]
pub struct PersistenceClient {
client: reqwest::Client,
base_url: String,
}
impl PersistenceClient {
pub fn new(base_url: String) -> Self {
Self {
client: reqwest::Client::new(),
base_url,
}
}
pub async fn get_company_profile(&self, symbol: &str) -> Result<CompanyProfileDto> {
let url = format!("{}/companies/{}", self.base_url, symbol);
let profile = self
.client
.get(&url)
.send()
.await?
.error_for_status()?
.json::<CompanyProfileDto>()
.await?;
Ok(profile)
}
}

View File

@ -0,0 +1,27 @@
use crate::config::AppConfig;
use crate::error::Result;
use crate::persistence::PersistenceClient;
use std::sync::Arc;
use async_nats::Client as NatsClient;
#[derive(Clone)]
pub struct AppState {
pub config: Arc<AppConfig>,
pub nats_client: NatsClient,
pub persistence_client: PersistenceClient,
}
impl AppState {
pub async fn new(config: AppConfig) -> Result<Self> {
let nats_client = async_nats::connect(&config.nats_addr).await?;
let persistence_client =
PersistenceClient::new(config.data_persistence_service_url.clone());
Ok(Self {
config: Arc::new(config),
nats_client,
persistence_client,
})
}
}

2678
services/common-contracts/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,22 @@
[package]
name = "common-contracts"
version = "0.1.0"
edition = "2024"
description = "Shared strongly-typed contracts (models, DTOs, messages, observability) across services."
authors = ["Lv, Qi <lvsoft@gmail.com>"]
[lib]
name = "common_contracts"
path = "src/lib.rs"
[dependencies]
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = { version = "0.4", features = ["serde"] }
uuid = { version = "1", features = ["serde", "v4"] }
rust_decimal = { version = "1.36", features = ["serde"] }
utoipa = { version = "5.4", features = ["chrono", "uuid"] }
sqlx = { version = "0.8.6", features = [ "runtime-tokio-rustls", "postgres", "chrono", "uuid", "json", "rust_decimal" ] }
service_kit = { version = "0.1.2" }

View File

@ -0,0 +1,89 @@
use chrono::NaiveDate;
use service_kit::api_dto;
use serde_json::Value as JsonValue;
use uuid::Uuid;
// Companies API DTOs
#[api_dto]
pub struct CompanyProfileDto {
pub symbol: String,
pub name: String,
pub industry: Option<String>,
pub list_date: Option<NaiveDate>,
pub additional_info: Option<JsonValue>,
}
// Market Data API DTOs
#[api_dto]
pub struct TimeSeriesFinancialDto {
pub symbol: String,
pub metric_name: String,
pub period_date: NaiveDate,
pub value: f64,
pub source: Option<String>,
}
#[api_dto]
pub struct DailyMarketDataDto {
pub symbol: String,
pub trade_date: NaiveDate,
pub open_price: Option<f64>,
pub high_price: Option<f64>,
pub low_price: Option<f64>,
pub close_price: Option<f64>,
pub volume: Option<i64>,
pub pe: Option<f64>,
pub pb: Option<f64>,
pub total_mv: Option<f64>,
}
// Batch DTOs
#[api_dto]
pub struct TimeSeriesFinancialBatchDto {
pub records: Vec<TimeSeriesFinancialDto>,
}
#[api_dto]
pub struct DailyMarketDataBatchDto {
pub records: Vec<DailyMarketDataDto>,
}
// Analysis Results API DTOs
#[api_dto]
pub struct NewAnalysisResultDto {
pub symbol: String,
pub module_id: String,
pub model_name: Option<String>,
pub content: String,
pub meta_data: Option<JsonValue>,
}
#[api_dto]
pub struct AnalysisResultDto {
pub id: Uuid,
pub symbol: String,
pub module_id: String,
pub generated_at: chrono::DateTime<chrono::Utc>,
pub model_name: Option<String>,
pub content: String,
pub meta_data: Option<JsonValue>,
}
// Realtime Quotes DTOs
#[api_dto]
pub struct RealtimeQuoteDto {
pub symbol: String,
pub market: String,
pub ts: chrono::DateTime<chrono::Utc>,
pub price: f64,
pub open_price: Option<f64>,
pub high_price: Option<f64>,
pub low_price: Option<f64>,
pub prev_close: Option<f64>,
pub change: Option<f64>,
pub change_percent: Option<f64>,
pub volume: Option<i64>,
pub source: Option<String>,
}

View File

@ -0,0 +1,6 @@
pub mod dtos;
pub mod models;
pub mod observability;
pub mod messages;

View File

@ -0,0 +1,26 @@
use serde::{Serialize, Deserialize};
use uuid::Uuid;
// --- Commands ---
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FetchCompanyDataCommand {
pub request_id: Uuid,
pub symbol: String,
pub market: String,
}
// --- Events ---
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CompanyProfilePersistedEvent {
pub request_id: Uuid,
pub symbol: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FinancialsPersistedEvent {
pub request_id: Uuid,
pub symbol: String,
pub years_updated: Vec<u16>,
}

View File

@ -0,0 +1,89 @@
use chrono::{DateTime, NaiveDate, Utc};
use serde_json::Value as JsonValue;
use sqlx::FromRow;
use uuid::Uuid;
#[derive(Debug, Clone, FromRow)]
pub struct CompanyProfile {
pub symbol: String,
pub name: String,
pub industry: Option<String>,
pub list_date: Option<NaiveDate>,
pub additional_info: Option<JsonValue>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, FromRow)]
pub struct TimeSeriesFinancial {
pub symbol: String,
pub metric_name: String,
pub period_date: NaiveDate,
pub value: rust_decimal::Decimal,
pub source: Option<String>,
}
#[derive(Debug, Clone, FromRow)]
pub struct DailyMarketData {
pub symbol: String,
pub trade_date: NaiveDate,
pub open_price: Option<rust_decimal::Decimal>,
pub high_price: Option<rust_decimal::Decimal>,
pub low_price: Option<rust_decimal::Decimal>,
pub close_price: Option<rust_decimal::Decimal>,
pub volume: Option<i64>,
pub pe: Option<rust_decimal::Decimal>,
pub pb: Option<rust_decimal::Decimal>,
pub total_mv: Option<rust_decimal::Decimal>,
}
#[derive(Debug, Clone, FromRow)]
pub struct AnalysisResult {
pub id: Uuid,
pub symbol: String,
pub module_id: String,
pub generated_at: DateTime<Utc>,
pub model_name: Option<String>,
pub content: String,
pub meta_data: Option<JsonValue>,
}
#[derive(Debug, Clone, FromRow)]
pub struct SystemConfig {
pub config_key: String,
pub config_value: JsonValue,
pub description: Option<String>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, FromRow)]
pub struct ExecutionLog {
pub id: i64,
pub report_id: Uuid,
pub step_name: String,
pub status: String,
pub start_time: DateTime<Utc>,
pub end_time: Option<DateTime<Utc>>,
pub duration_ms: Option<i32>,
pub token_usage: Option<JsonValue>,
pub error_message: Option<String>,
pub log_details: Option<JsonValue>,
}
#[derive(Debug, Clone, FromRow)]
pub struct RealtimeQuote {
pub symbol: String,
pub market: String,
pub ts: DateTime<Utc>,
pub price: rust_decimal::Decimal,
pub open_price: Option<rust_decimal::Decimal>,
pub high_price: Option<rust_decimal::Decimal>,
pub low_price: Option<rust_decimal::Decimal>,
pub prev_close: Option<rust_decimal::Decimal>,
pub change: Option<rust_decimal::Decimal>,
pub change_percent: Option<rust_decimal::Decimal>,
pub volume: Option<i64>,
pub source: Option<String>,
pub updated_at: DateTime<Utc>,
}

View File

@ -0,0 +1,31 @@
use std::collections::HashMap;
use chrono::{DateTime, Utc};
use serde::{Serialize, Deserialize};
use uuid::Uuid;
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub enum ServiceStatus {
Ok,
Degraded,
Unhealthy,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct HealthStatus {
pub module_id: String,
pub status: ServiceStatus,
pub version: String,
pub details: HashMap<String, String>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct TaskProgress {
pub request_id: Uuid,
pub task_name: String,
pub status: String,
pub progress_percent: u8,
pub details: String,
pub started_at: DateTime<Utc>,
}

1220
services/config-service-rs/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,18 @@
[package]
name = "config-service-rs"
version = "0.1.0"
edition = "2024"
[dependencies]
axum = "0.8.7"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
config = "0.15.19"
anyhow = "1.0"
tower-http = { version = "0.6.6", features = ["cors"] }
once_cell = "1.19"
thiserror = "2.0.17"
hyper = "1"

View File

@ -0,0 +1,35 @@
# 1. Build Stage
FROM rust:1.90 as builder
WORKDIR /usr/src/app
# Pre-build dependencies to leverage Docker layer caching
COPY ./services/config-service-rs/Cargo.toml ./services/config-service-rs/Cargo.lock* ./services/config-service-rs/
WORKDIR /usr/src/app/services/config-service-rs
RUN mkdir -p src && \
echo "fn main() {}" > src/main.rs && \
cargo build --release --bin config-service-rs
# Copy the full source code
COPY ./services/config-service-rs /usr/src/app/services/config-service-rs
COPY ./config /usr/src/app/config
# Build the application
WORKDIR /usr/src/app/services/config-service-rs
RUN cargo build --release --bin config-service-rs
# 2. Runtime Stage
FROM debian:bookworm-slim
WORKDIR /app
# Set timezone
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
# Copy the built binary and the config directory from the builder stage
COPY --from=builder /usr/src/app/services/config-service-rs/target/release/config-service-rs /usr/local/bin/
COPY --from=builder /usr/src/app/config ./config
# Set the binary as the entrypoint
ENTRYPOINT ["/usr/local/bin/config-service-rs"]

View File

@ -0,0 +1,54 @@
use axum::{response::Json, routing::get, Router};
use once_cell::sync::Lazy;
use serde_json::Value;
use std::{path::PathBuf, sync::Arc};
use crate::config::AppConfig;
static CONFIGS: Lazy<Arc<CachedConfig>> = Lazy::new(|| Arc::new(CachedConfig::load_from_disk()));
struct CachedConfig {
system: Value,
analysis: Value,
}
impl CachedConfig {
fn load_from_disk() -> Self {
let config = AppConfig::load().expect("Failed to load app config for caching");
let config_dir = PathBuf::from(&config.project_root).join("config");
let system_path = config_dir.join("config.json");
let analysis_path = config_dir.join("analysis-config.json");
let system_content = std::fs::read_to_string(system_path)
.expect("Failed to read system config.json");
let system: Value = serde_json::from_str(&system_content)
.expect("Failed to parse system config.json");
let analysis_content = std::fs::read_to_string(analysis_path)
.expect("Failed to read analysis-config.json");
let analysis: Value = serde_json::from_str(&analysis_content)
.expect("Failed to parse analysis-config.json");
Self { system, analysis }
}
}
pub fn create_router() -> Router {
Router::new()
.route("/", get(root))
.route("/api/v1/system", get(get_system_config))
.route("/api/v1/analysis-modules", get(get_analysis_modules))
}
async fn root() -> &'static str {
"OK"
}
async fn get_system_config() -> Json<Value> {
Json(CONFIGS.system.clone())
}
async fn get_analysis_modules() -> Json<Value> {
Json(CONFIGS.analysis.clone())
}

View File

@ -0,0 +1,19 @@
use config::{Config, ConfigError, Environment};
use serde::Deserialize;
#[derive(Debug, Deserialize)]
pub struct AppConfig {
pub server_port: u16,
pub project_root: String,
}
impl AppConfig {
pub fn load() -> Result<Self, ConfigError> {
let config = Config::builder()
.set_default("server_port", 8080)?
.set_default("project_root", "/workspace")?
.add_source(Environment::default().separator("__"))
.build()?;
config.try_deserialize()
}
}

View File

@ -0,0 +1,18 @@
use thiserror::Error;
pub type Result<T> = std::result::Result<T, AppError>;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Configuration error: {0}")]
Config(#[from] config::ConfigError),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("JSON parsing error: {0}")]
Json(#[from] serde_json::Error),
#[error("Server startup error: {0}")]
Server(#[from] hyper::Error),
}

View File

@ -0,0 +1,26 @@
mod api;
mod config;
mod error;
use crate::{config::AppConfig, error::Result};
use std::net::SocketAddr;
use tracing::info;
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let config = AppConfig::load()?;
let port = config.server_port;
let app = api::create_router();
let addr = SocketAddr::from(([0, 0, 0, 0], port));
info!("Server listening on {}", addr);
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app).await?;
Ok(())
}

View File

@ -1,6 +0,0 @@
use data_persistence_service::{
db,
dtos::{CompanyProfileDto, DailyMarketDataDto, NewAnalysisResultDto, TimeSeriesFinancialDto},
models,
};
use sqlx::{postgres::PgPoolOptions, PgPool};

View File

@ -72,7 +72,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -98,9 +98,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]] [[package]]
name = "axum" name = "axum"
version = "0.8.6" version = "0.8.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a18ed336352031311f4e0b4dd2ff392d4fbb370777c9d18d7fc9d7359f73871" checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425"
dependencies = [ dependencies = [
"axum-core 0.5.5", "axum-core 0.5.5",
"bytes", "bytes",
@ -244,7 +244,7 @@ dependencies = [
"proc-macro-crate", "proc-macro-crate",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -283,15 +283,15 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.10.1" version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3"
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.2.45" version = "1.2.46"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35900b6c8d709fb1d854671ae27aeaa9eec2f8b01b364e1619a40da3e6fe2afe" checksum = "b97463e1064cb1b1c1384ad0a0b9c8abd0988e2a91f52606c80ef14aadb63e36"
dependencies = [ dependencies = [
"find-msvc-tools", "find-msvc-tools",
"shlex", "shlex",
@ -323,6 +323,20 @@ dependencies = [
"windows-link", "windows-link",
] ]
[[package]]
name = "common-contracts"
version = "0.1.0"
dependencies = [
"chrono",
"rust_decimal",
"serde",
"serde_json",
"service_kit",
"sqlx",
"utoipa",
"uuid",
]
[[package]] [[package]]
name = "concurrent-queue" name = "concurrent-queue"
version = "2.5.0" version = "2.5.0"
@ -394,9 +408,9 @@ checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28"
[[package]] [[package]]
name = "crypto-common" name = "crypto-common"
version = "0.1.6" version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a"
dependencies = [ dependencies = [
"generic-array", "generic-array",
"typenum", "typenum",
@ -423,7 +437,7 @@ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"strsim", "strsim",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -434,7 +448,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81"
dependencies = [ dependencies = [
"darling_core", "darling_core",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -445,6 +459,7 @@ dependencies = [
"axum", "axum",
"axum-embed", "axum-embed",
"chrono", "chrono",
"common-contracts",
"dotenvy", "dotenvy",
"http-body-util", "http-body-util",
"rmcp 0.8.5", "rmcp 0.8.5",
@ -484,7 +499,7 @@ checksum = "1e567bd82dcff979e4b03460c307b3cdc9e96fde3d73bed1496d2bc75d9dd62a"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -507,7 +522,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -561,9 +576,9 @@ dependencies = [
[[package]] [[package]]
name = "find-msvc-tools" name = "find-msvc-tools"
version = "0.1.4" version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" checksum = "3a3076410a55c90011c298b04d0cfa770b00fa04e1e3c97d3f6c9de105a03844"
[[package]] [[package]]
name = "flate2" name = "flate2"
@ -681,7 +696,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -716,9 +731,9 @@ dependencies = [
[[package]] [[package]]
name = "generic-array" name = "generic-array"
version = "0.14.9" version = "0.14.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2" checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a"
dependencies = [ dependencies = [
"typenum", "typenum",
"version_check", "version_check",
@ -869,9 +884,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "1.7.0" version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb3aa54a13a0dfe7fbe3a59e0c76093041720fdc77b110cc0fc260fafb4dc51e" checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11"
dependencies = [ dependencies = [
"atomic-waker", "atomic-waker",
"bytes", "bytes",
@ -890,9 +905,9 @@ dependencies = [
[[package]] [[package]]
name = "hyper-util" name = "hyper-util"
version = "0.1.17" version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c6995591a8f1380fcb4ba966a252a4b29188d51d2b89e3a252f5305be65aea8" checksum = "52e9a2a24dc5c6821e71a7030e1e14b7b632acac55c40e9d2e082c621261bb56"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-core", "futures-core",
@ -1224,9 +1239,9 @@ dependencies = [
[[package]] [[package]]
name = "num-bigint-dig" name = "num-bigint-dig"
version = "0.8.5" version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82c79c15c05d4bf82b6f5ef163104cc81a760d8e874d38ac50ab67c8877b647b" checksum = "e661dda6640fad38e827a6d4a310ff4763082116fe217f279885c97f511bb0b7"
dependencies = [ dependencies = [
"lazy_static", "lazy_static",
"libm", "libm",
@ -1525,7 +1540,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -1670,7 +1685,7 @@ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"serde_json", "serde_json",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -1683,14 +1698,14 @@ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"serde_json", "serde_json",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
name = "rsa" name = "rsa"
version = "0.9.8" version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78928ac1ed176a5ca1d17e578a1825f3d81ca54cf41053a592584b020cfd691b" checksum = "40a0376c50d0358279d9d643e4bf7b7be212f1f4ff1da9070a7b54d22ef75c88"
dependencies = [ dependencies = [
"const-oid", "const-oid",
"digest", "digest",
@ -1726,7 +1741,7 @@ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"rust-embed-utils", "rust-embed-utils",
"syn 2.0.109", "syn 2.0.110",
"walkdir", "walkdir",
] ]
@ -1834,7 +1849,7 @@ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"serde_derive_internals", "serde_derive_internals",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -1876,7 +1891,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -1887,7 +1902,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -1945,7 +1960,7 @@ dependencies = [
"inventory", "inventory",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
"utoipa", "utoipa",
] ]
@ -1966,7 +1981,7 @@ dependencies = [
"serde_json", "serde_json",
"serde_urlencoded", "serde_urlencoded",
"service-kit-macros", "service-kit-macros",
"syn 2.0.109", "syn 2.0.110",
"thiserror", "thiserror",
"toml", "toml",
"utoipa", "utoipa",
@ -2146,7 +2161,7 @@ dependencies = [
"quote", "quote",
"sqlx-core", "sqlx-core",
"sqlx-macros-core", "sqlx-macros-core",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -2169,7 +2184,7 @@ dependencies = [
"sqlx-mysql", "sqlx-mysql",
"sqlx-postgres", "sqlx-postgres",
"sqlx-sqlite", "sqlx-sqlite",
"syn 2.0.109", "syn 2.0.110",
"tokio", "tokio",
"url", "url",
] ]
@ -2340,9 +2355,9 @@ dependencies = [
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.109" version = "2.0.110"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f17c7e013e88258aa9543dcbe81aca68a667a9ac37cd69c9fbc07858bfe0e2f" checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -2363,7 +2378,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -2389,7 +2404,7 @@ checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -2451,7 +2466,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -2593,7 +2608,7 @@ checksum = "81383ab64e72a7a8b8e13130c49e3dab29def6d0c7d76a03087b3cf71c5c6903"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -2719,7 +2734,7 @@ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"regex", "regex",
"syn 2.0.109", "syn 2.0.110",
"uuid", "uuid",
] ]
@ -2834,7 +2849,7 @@ dependencies = [
"bumpalo", "bumpalo",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
@ -2905,7 +2920,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -2916,7 +2931,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -3214,7 +3229,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
"synstructure", "synstructure",
] ]
@ -3235,7 +3250,7 @@ checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]
@ -3255,7 +3270,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
"synstructure", "synstructure",
] ]
@ -3295,7 +3310,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.109", "syn 2.0.110",
] ]
[[package]] [[package]]

View File

@ -1,7 +1,7 @@
[package] [package]
name = "data-persistence-service" name = "data-persistence-service"
version = "0.1.2" version = "0.1.2"
edition = "2021" edition = "2024"
authors = ["Lv, Qi <lvsoft@gmail.com>"] authors = ["Lv, Qi <lvsoft@gmail.com>"]
default-run = "data-persistence-service-server" default-run = "data-persistence-service-server"
@ -26,6 +26,7 @@ rmcp = { version = "0.8.5", features = [
"transport-streamable-http-server", "transport-streamable-http-server",
"transport-worker" "transport-worker"
] } ] }
common-contracts = { path = "../common-contracts" }
# Web framework # Web framework
axum = "0.8" axum = "0.8"

View File

@ -3,14 +3,22 @@ WORKDIR /app
RUN cargo install cargo-chef RUN cargo install cargo-chef
FROM chef AS planner FROM chef AS planner
COPY . . WORKDIR /app/services/data-persistence-service
# 仅复制必要的 Cargo 清单,避免大体积上下文
COPY services/common-contracts/Cargo.toml /app/services/common-contracts/Cargo.toml
COPY services/data-persistence-service/Cargo.toml /app/services/data-persistence-service/Cargo.toml
RUN cargo chef prepare --recipe-path recipe.json RUN cargo chef prepare --recipe-path recipe.json
FROM chef AS builder FROM chef AS builder
ENV SQLX_OFFLINE=true ENV SQLX_OFFLINE=true
COPY --from=planner /app/recipe.json /app/recipe.json WORKDIR /app/services/data-persistence-service
RUN cargo chef cook --release --recipe-path /app/recipe.json COPY --from=planner /app/services/data-persistence-service/recipe.json /app/services/data-persistence-service/recipe.json
COPY . . # 为了支持 path 依赖,先拷贝依赖源码再 cook
COPY services/common-contracts /app/services/common-contracts
RUN cargo chef cook --release --recipe-path /app/services/data-persistence-service/recipe.json
# 复制服务源码用于实际构建
COPY services/common-contracts /app/services/common-contracts
COPY services/data-persistence-service /app/services/data-persistence-service
RUN cargo build --release --bin data-persistence-service-server RUN cargo build --release --bin data-persistence-service-server
FROM debian:bookworm-slim AS runtime FROM debian:bookworm-slim AS runtime
@ -18,8 +26,8 @@ WORKDIR /app
RUN groupadd --system --gid 1001 appuser && \ RUN groupadd --system --gid 1001 appuser && \
useradd --system --uid 1001 --gid 1001 appuser useradd --system --uid 1001 --gid 1001 appuser
USER appuser USER appuser
COPY --from=builder /app/target/release/data-persistence-service-server /usr/local/bin/data-persistence-service-server COPY --from=builder /app/services/data-persistence-service/target/release/data-persistence-service-server /usr/local/bin/data-persistence-service-server
COPY ./migrations ./migrations COPY services/data-persistence-service/migrations ./migrations
ENV HOST=0.0.0.0 ENV HOST=0.0.0.0
ENV PORT=3000 ENV PORT=3000
EXPOSE 3000 EXPOSE 3000

View File

@ -3,3 +3,4 @@
pub mod companies; pub mod companies;
pub mod market_data; pub mod market_data;
pub mod analysis; pub mod analysis;
pub mod system;

View File

@ -0,0 +1,36 @@
use axum::{extract::State, Json};
use common_contracts::observability::{HealthStatus, ServiceStatus, TaskProgress};
use service_kit::api;
use std::collections::HashMap;
use crate::{AppState, ServerError};
#[api(GET, "/health", output(detail = "HealthStatus"))]
pub async fn get_health(State(state): State<AppState>) -> Result<Json<HealthStatus>, ServerError> {
// Basic DB connectivity check
let db_ok = sqlx::query_scalar::<_, i32>("select 1")
.fetch_one(state.pool())
.await
.map(|v| v == 1)
.unwrap_or(false);
let mut details = HashMap::new();
details.insert("db_connection".to_string(), if db_ok { "ok".into() } else { "error".into() });
let status = if db_ok { ServiceStatus::Ok } else { ServiceStatus::Unhealthy };
let health = HealthStatus {
module_id: "data-persistence-service".to_string(),
status,
version: env!("CARGO_PKG_VERSION").to_string(),
details,
};
Ok(Json(health))
}
#[api(GET, "/tasks", output(list = "TaskProgress"))]
pub async fn get_tasks(_state: State<AppState>) -> Result<Json<Vec<TaskProgress>>, ServerError> {
// data-persistence-service 当前不进行异步任务处理,返回空列表
Ok(Json(Vec::new()))
}

View File

@ -1,99 +1 @@
use chrono::NaiveDate; pub use common_contracts::dtos::*;
use service_kit::api_dto;
use serde_json::Value as JsonValue;
use uuid::Uuid;
// =================================================================================
// Companies API DTOs (Task T3.1)
// =================================================================================
#[api_dto]
pub struct CompanyProfileDto {
pub symbol: String,
pub name: String,
pub industry: Option<String>,
pub list_date: Option<NaiveDate>,
pub additional_info: Option<JsonValue>,
}
// =================================================================================
// Market Data API DTOs (Task T3.2)
// =================================================================================
#[api_dto]
pub struct TimeSeriesFinancialDto {
pub symbol: String,
pub metric_name: String,
pub period_date: NaiveDate,
pub value: f64, // Using f64 for simplicity in DTOs, will be handled as Decimal in db
pub source: Option<String>,
}
#[api_dto]
pub struct DailyMarketDataDto {
pub symbol: String,
pub trade_date: NaiveDate,
pub open_price: Option<f64>,
pub high_price: Option<f64>,
pub low_price: Option<f64>,
pub close_price: Option<f64>,
pub volume: Option<i64>,
pub pe: Option<f64>,
pub pb: Option<f64>,
pub total_mv: Option<f64>,
}
// Batch DTOs to satisfy #[api] macro restriction on Json<Vec<T>> in request bodies
#[api_dto]
pub struct TimeSeriesFinancialBatchDto {
pub records: Vec<TimeSeriesFinancialDto>,
}
#[api_dto]
pub struct DailyMarketDataBatchDto {
pub records: Vec<DailyMarketDataDto>,
}
// =================================================================================
// Analysis Results API DTOs (Task T3.3)
// =================================================================================
#[api_dto]
pub struct NewAnalysisResultDto {
pub symbol: String,
pub module_id: String,
pub model_name: Option<String>,
pub content: String,
pub meta_data: Option<JsonValue>,
}
#[api_dto]
pub struct AnalysisResultDto {
pub id: Uuid,
pub symbol: String,
pub module_id: String,
pub generated_at: chrono::DateTime<chrono::Utc>,
pub model_name: Option<String>,
pub content: String,
pub meta_data: Option<JsonValue>,
}
// =================================================================================
// Realtime Quotes DTOs
// =================================================================================
#[api_dto]
pub struct RealtimeQuoteDto {
pub symbol: String,
pub market: String,
pub ts: chrono::DateTime<chrono::Utc>,
pub price: f64,
pub open_price: Option<f64>,
pub high_price: Option<f64>,
pub low_price: Option<f64>,
pub prev_close: Option<f64>,
pub change: Option<f64>,
pub change_percent: Option<f64>,
pub volume: Option<i64>,
pub source: Option<String>,
}

View File

@ -1,6 +1,8 @@
use axum::Router; use axum::Router;
use service_kit::{rest_router_builder::RestRouterBuilder}; use service_kit::{rest_router_builder::RestRouterBuilder};
#[cfg(feature = "mcp")]
use rmcp::transport::streamable_http_server::{session::local::LocalSessionManager, StreamableHttpService}; use rmcp::transport::streamable_http_server::{session::local::LocalSessionManager, StreamableHttpService};
#[cfg(feature = "wasm-cli")]
use rust_embed::RustEmbed; use rust_embed::RustEmbed;
use tower_http::cors::{Any, CorsLayer}; use tower_http::cors::{Any, CorsLayer};
use utoipa::openapi::OpenApi; use utoipa::openapi::OpenApi;

View File

@ -1,87 +1 @@
use chrono::{DateTime, NaiveDate, Utc}; pub use common_contracts::models::*;
use serde_json::Value as JsonValue;
use sqlx::FromRow;
use uuid::Uuid;
#[derive(Debug, Clone, FromRow)]
pub struct CompanyProfile {
pub symbol: String,
pub name: String,
pub industry: Option<String>,
pub list_date: Option<NaiveDate>,
pub additional_info: Option<JsonValue>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, FromRow)]
pub struct TimeSeriesFinancial {
pub symbol: String,
pub metric_name: String,
pub period_date: NaiveDate,
pub value: rust_decimal::Decimal, // Using Decimal for precision with NUMERIC
pub source: Option<String>,
}
#[derive(Debug, Clone, FromRow)]
pub struct DailyMarketData {
pub symbol: String,
pub trade_date: NaiveDate,
pub open_price: Option<rust_decimal::Decimal>,
pub high_price: Option<rust_decimal::Decimal>,
pub low_price: Option<rust_decimal::Decimal>,
pub close_price: Option<rust_decimal::Decimal>,
pub volume: Option<i64>,
pub pe: Option<rust_decimal::Decimal>,
pub pb: Option<rust_decimal::Decimal>,
pub total_mv: Option<rust_decimal::Decimal>,
}
#[derive(Debug, Clone, FromRow)]
pub struct AnalysisResult {
pub id: Uuid,
pub symbol: String,
pub module_id: String,
pub generated_at: DateTime<Utc>,
pub model_name: Option<String>,
pub content: String,
pub meta_data: Option<JsonValue>,
}
#[derive(Debug, Clone, FromRow)]
pub struct SystemConfig {
pub config_key: String,
pub config_value: JsonValue,
pub description: Option<String>,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug, Clone, FromRow)]
pub struct ExecutionLog {
pub id: i64,
pub report_id: Uuid,
pub step_name: String,
pub status: String,
pub start_time: DateTime<Utc>,
pub end_time: Option<DateTime<Utc>>,
pub duration_ms: Option<i32>,
pub token_usage: Option<JsonValue>,
pub error_message: Option<String>,
pub log_details: Option<JsonValue>,
}
#[derive(Debug, Clone, FromRow)]
pub struct RealtimeQuote {
pub symbol: String,
pub market: String,
pub ts: DateTime<Utc>,
pub price: rust_decimal::Decimal,
pub open_price: Option<rust_decimal::Decimal>,
pub high_price: Option<rust_decimal::Decimal>,
pub low_price: Option<rust_decimal::Decimal>,
pub prev_close: Option<rust_decimal::Decimal>,
pub change: Option<rust_decimal::Decimal>,
pub change_percent: Option<rust_decimal::Decimal>,
pub volume: Option<i64>,
pub source: Option<String>,
pub updated_at: DateTime<Utc>,
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,49 @@
[package]
name = "finnhub-provider-service"
version = "0.1.0"
edition = "2024"
[dependencies]
# Web Service
axum = "0.8.7"
tokio = { version = "1.0", features = ["full"] }
tower-http = { version = "0.6.6", features = ["cors"] }
# Shared Contracts
common-contracts = { path = "../common-contracts" }
# Generic MCP Client
reqwest = { version = "0.12.24", features = ["json"] }
url = "2.5.2"
chrono = { version = "0.4.38", features = ["serde"] }
rust_decimal = "1.35.0"
rust_decimal_macros = "1.35.0"
itertools = "0.14.0"
# Message Queue (NATS)
async-nats = "0.45.0"
futures = "0.3"
futures-util = "0.3.31"
# Data Persistence Client
# Concurrency & Async
async-trait = "0.1.80"
dashmap = "6.1.0"
uuid = { version = "1.6", features = ["v4", "serde"] }
# Serialization
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
# Logging & Telemetry
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
# Configuration
config = "0.15.19"
secrecy = { version = "0.10.3", features = ["serde"] }
# Error Handling
thiserror = "2.0.17"
anyhow = "1.0"

View File

@ -0,0 +1,33 @@
# 1. Build Stage
FROM rust:1.90 as builder
WORKDIR /usr/src/app
# Pre-build dependencies to leverage Docker layer caching
COPY ./services/common-contracts /usr/src/app/services/common-contracts
COPY ./services/finnhub-provider-service/Cargo.toml ./services/finnhub-provider-service/Cargo.lock* ./services/finnhub-provider-service/
WORKDIR /usr/src/app/services/finnhub-provider-service
RUN mkdir -p src && \
echo "fn main() {}" > src/main.rs && \
cargo build --release --bin finnhub-provider-service
# Copy the full source code
COPY ./services/finnhub-provider-service /usr/src/app/services/finnhub-provider-service
# Build the application
WORKDIR /usr/src/app/services/finnhub-provider-service
RUN cargo build --release --bin finnhub-provider-service
# 2. Runtime Stage
FROM debian:bookworm-slim
# Set timezone
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
# Copy the built binary from the builder stage
COPY --from=builder /usr/src/app/services/finnhub-provider-service/target/release/finnhub-provider-service /usr/local/bin/
# Set the binary as the entrypoint
ENTRYPOINT ["/usr/local/bin/finnhub-provider-service"]

View File

@ -0,0 +1,43 @@
use std::collections::HashMap;
use axum::{
extract::State,
response::Json,
routing::get,
Router,
};
use common_contracts::observability::{HealthStatus, ServiceStatus, TaskProgress};
use crate::state::AppState;
pub fn create_router(app_state: AppState) -> Router {
Router::new()
.route("/health", get(health_check))
.route("/tasks", get(get_current_tasks))
.with_state(app_state)
}
/// [GET /health]
/// Provides the current health status of the module.
async fn health_check(State(_state): State<AppState>) -> Json<HealthStatus> {
let mut details = HashMap::new();
// In a real scenario, we would check connections to the message bus, etc.
details.insert("message_bus_connection".to_string(), "ok".to_string());
let status = HealthStatus {
module_id: "finnhub-provider-service".to_string(),
status: ServiceStatus::Ok,
version: env!("CARGO_PKG_VERSION").to_string(),
details,
};
Json(status)
}
/// [GET /tasks]
/// Reports all currently processing tasks and their progress.
async fn get_current_tasks(State(state): State<AppState>) -> Json<Vec<TaskProgress>> {
let tasks: Vec<TaskProgress> = state
.tasks
.iter()
.map(|entry| entry.value().clone())
.collect();
Json(tasks)
}

View File

@ -0,0 +1,21 @@
use secrecy::SecretString;
use serde::Deserialize;
#[derive(Debug, Deserialize, Clone)]
pub struct AppConfig {
pub server_port: u16,
pub nats_addr: String,
pub data_persistence_service_url: String,
pub finnhub_api_url: String,
pub finnhub_api_key: SecretString,
}
impl AppConfig {
pub fn load() -> Result<Self, config::ConfigError> {
let config = config::Config::builder()
.add_source(config::Environment::default().separator("__"))
.build()?;
config.try_deserialize()
}
}

View File

@ -0,0 +1,40 @@
use thiserror::Error;
pub type Result<T> = std::result::Result<T, AppError>;
#[derive(Error, Debug)]
pub enum AppError {
#[error("Configuration error: {0}")]
Configuration(String),
#[error("Message bus error: {0}")]
MessageBus(#[from] async_nats::Error),
#[error("Message bus publish error: {0}")]
MessageBusPublish(#[from] async_nats::PublishError),
#[error("Message bus subscribe error: {0}")]
MessageBusSubscribe(String),
#[error("Message bus connect error: {0}")]
MessageBusConnect(String),
#[error("HTTP request to another service failed: {0}")]
ServiceRequest(#[from] reqwest::Error),
#[error("Data parsing error: {0}")]
DataParsing(#[from] anyhow::Error),
}
// 手动实现针对 async-nats 泛型错误类型的 From 转换
impl From<async_nats::error::Error<async_nats::ConnectErrorKind>> for AppError {
fn from(err: async_nats::error::Error<async_nats::ConnectErrorKind>) -> Self {
AppError::MessageBusConnect(err.to_string())
}
}
impl From<async_nats::SubscribeError> for AppError {
fn from(err: async_nats::SubscribeError) -> Self {
AppError::MessageBusSubscribe(err.to_string())
}
}

View File

@ -0,0 +1,55 @@
use crate::error::AppError;
use anyhow::anyhow;
use reqwest::Url;
use serde::de::DeserializeOwned;
use tracing::info;
#[derive(Clone)]
pub struct FinnhubClient {
client: reqwest::Client,
base_url: Url,
api_token: String,
}
impl FinnhubClient {
pub fn new(base_url: String, api_token: String) -> Result<Self, AppError> {
let url = Url::parse(&base_url)
.map_err(|e| AppError::Configuration(format!("Invalid base URL: {}", e)))?;
Ok(Self {
client: reqwest::Client::new(),
base_url: url,
api_token,
})
}
pub async fn get<T: DeserializeOwned>(
&self,
endpoint: &str,
params: Vec<(String, String)>,
) -> Result<T, AppError> {
let mut url = self.base_url.join(endpoint).unwrap();
url.query_pairs_mut()
.extend_pairs(params.iter().map(|(k, v)| (k.as_str(), v.as_str())));
url.query_pairs_mut()
.append_pair("token", &self.api_token);
info!("Sending Finnhub request to: {}", url);
let res = self.client.get(url).send().await?;
let status = res.status();
if !status.is_success() {
let error_text = res.text().await.unwrap_or_else(|_| "Unknown error".to_string());
return Err(AppError::DataParsing(anyhow!(format!(
"API request failed with status {}: {}",
status,
error_text
))));
}
let json_res = res.json::<T>().await?;
Ok(json_res)
}
}

View File

@ -0,0 +1,104 @@
use serde::Deserialize;
use crate::{
error::AppError,
fh_client::FinnhubClient,
mapping::{map_financial_dtos, map_profile_dto},
};
use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto};
use tokio;
#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct FinnhubProfile {
pub country: Option<String>,
pub currency: Option<String>,
pub exchange: Option<String>,
pub name: Option<String>,
pub ticker: Option<String>,
pub ipo: Option<String>,
pub market_capitalization: Option<f64>,
pub share_outstanding: Option<f64>,
pub logo: Option<String>,
pub phone: Option<String>,
pub weburl: Option<String>,
pub finnhub_industry: Option<String>,
}
#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct FinnhubFinancialsReported {
pub data: Vec<AnnualReport>,
pub symbol: String,
}
#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct AnnualReport {
pub year: u16,
pub start_date: String,
pub end_date: String,
pub report: Report,
}
#[derive(Debug, Deserialize, Clone)]
pub struct Report {
pub bs: Vec<ReportItem>,
pub ic: Vec<ReportItem>,
pub cf: Vec<ReportItem>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct ReportItem {
pub value: f64,
pub concept: String,
pub label: String,
}
pub struct FinnhubDataProvider {
client: FinnhubClient,
}
impl FinnhubDataProvider {
pub fn new(api_url: String, api_token: String) -> Self {
Self {
client: FinnhubClient::new(api_url, api_token).expect("Failed to create Finnhub client"),
}
}
pub async fn fetch_all_data(
&self,
symbol: &str,
) -> Result<(CompanyProfileDto, Vec<TimeSeriesFinancialDto>), AppError> {
let (profile_raw, financials_raw) = self.fetch_raw_data(symbol).await?;
// 1. Build CompanyProfileDto
let profile = map_profile_dto(&profile_raw, symbol)?;
// 2. Build TimeSeriesFinancialDto list
let financials = map_financial_dtos(&financials_raw, symbol)?;
Ok((profile, financials))
}
async fn fetch_raw_data(
&self,
symbol: &str,
) -> Result<(FinnhubProfile, FinnhubFinancialsReported), AppError> {
let params_profile = vec![("symbol".to_string(), symbol.to_string())];
let params_financials = vec![
("symbol".to_string(), symbol.to_string()),
("freq".to_string(), "annual".to_string()),
];
let profile_task = self.client.get::<FinnhubProfile>("/stock/profile2", params_profile);
let financials_task =
self.client
.get::<FinnhubFinancialsReported>("/stock/financials-reported", params_financials);
let (profile_res, financials_res) = tokio::try_join!(profile_task, financials_task)?;
Ok((profile_res, financials_res))
}
}

View File

@ -0,0 +1,47 @@
mod api;
mod config;
mod error;
mod fh_client;
mod finnhub;
mod mapping;
mod message_consumer;
mod persistence;
mod state;
mod worker;
use crate::config::AppConfig;
use crate::error::Result;
use crate::state::AppState;
use tracing::info;
#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
info!("Starting finnhub-provider-service...");
// Load configuration
let config = AppConfig::load().map_err(|e| error::AppError::Configuration(e.to_string()))?;
let port = config.server_port;
// Initialize application state
let app_state = AppState::new(config);
// Create the Axum router
let app = api::create_router(app_state.clone());
// --- Start the message consumer ---
tokio::spawn(message_consumer::run(app_state));
// Start the HTTP server
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port))
.await
.unwrap();
info!("HTTP server listening on port {}", port);
axum::serve(listener, app).await.unwrap();
Ok(())
}

View File

@ -0,0 +1,111 @@
use crate::error::AppError;
use crate::finnhub::{FinnhubFinancialsReported, FinnhubProfile, ReportItem};
use anyhow::anyhow;
use chrono::NaiveDate;
use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto};
use std::collections::HashMap;
pub fn map_profile_dto(profile_raw: &FinnhubProfile, symbol: &str) -> Result<CompanyProfileDto, AppError> {
let name = profile_raw
.name
.clone()
.ok_or_else(|| AppError::DataParsing(anyhow!("Profile name missing")))?;
let industry = profile_raw.finnhub_industry.clone();
let list_date = profile_raw
.ipo
.as_ref()
.and_then(|s| NaiveDate::parse_from_str(s, "%Y-%m-%d").ok());
Ok(CompanyProfileDto {
symbol: symbol.to_string(),
name,
industry,
list_date,
additional_info: None,
})
}
pub fn map_financial_dtos(
financials_raw: &FinnhubFinancialsReported,
symbol: &str,
) -> Result<Vec<TimeSeriesFinancialDto>, AppError> {
let mut out: Vec<TimeSeriesFinancialDto> = Vec::new();
for annual in &financials_raw.data {
let period_date = NaiveDate::parse_from_str(&annual.end_date, "%Y-%m-%d")
.map_err(|e| AppError::DataParsing(anyhow!("Invalid end_date: {}", e)))?;
let bs = &annual.report.bs;
let ic = &annual.report.ic;
let cf = &annual.report.cf;
let mut push_metric = |name: &str, value_opt: Option<f64>| {
if let Some(v) = value_opt {
out.push(TimeSeriesFinancialDto {
symbol: symbol.to_string(),
metric_name: name.to_string(),
period_date,
value: v,
source: Some("finnhub".to_string()),
});
}
};
let revenue = pick_value(ic, &["Revenues"], &["Total revenue", "Revenue"]);
let net_income = pick_value(ic, &["NetIncomeLoss"], &["Net income"]);
let total_assets = pick_value(bs, &["Assets"], &["Total assets"]);
let total_equity = pick_value(
bs,
&[
"StockholdersEquityIncludingPortionAttributableToNoncontrollingInterest",
"StockholdersEquity",
],
&["Total equity"],
);
let goodwill = pick_value(bs, &["Goodwill"], &["Goodwill"]);
let ocf = pick_value(
cf,
&["NetCashProvidedByUsedInOperatingActivities"],
&["Net cash provided by operating activities"],
);
let capex = pick_value(
cf,
&["CapitalExpenditures", "PaymentsToAcquirePropertyPlantAndEquipment"],
&["Capital expenditures"],
);
push_metric("revenue", revenue);
push_metric("net_income", net_income);
push_metric("total_assets", total_assets);
push_metric("total_equity", total_equity);
push_metric("goodwill", goodwill);
push_metric("operating_cash_flow", ocf);
push_metric("capital_expenditure", capex);
if let (Some(ocf_v), Some(capex_v)) = (ocf, capex) {
push_metric("__free_cash_flow", Some(ocf_v - capex_v));
}
}
Ok(out)
}
fn pick_value(
report_block: &[ReportItem],
concept_candidates: &[&str],
label_candidates: &[&str],
) -> Option<f64> {
let normalize = |s: &str| s.chars().filter(|c| c.is_alphanumeric()).collect::<String>().to_lowercase();
let by_concept: HashMap<_, _> = report_block.iter().map(|item| (normalize(&item.concept), item.value)).collect();
let by_label: HashMap<_, _> = report_block.iter().map(|item| (normalize(&item.label), item.value)).collect();
for key in concept_candidates {
if let Some(&value) = by_concept.get(&normalize(key)) {
return Some(value);
}
}
for key in label_candidates {
if let Some(&value) = by_label.get(&normalize(key)) {
return Some(value);
}
}
None
}

Some files were not shown because too many files have changed in this diff Show More