docs: 更新路线图—勾选光荣退役并归档旧Python服务
- 勾选阶段五第3项下线Python服务 - 描述将 backend/ 与 services/config-service/ 归档至 archive/python/ - 明确无Python组件下系统运行的验收标准
This commit is contained in:
parent
a6cca48fed
commit
0e45dd4a3f
542
docs/data_provider_refactoring_and_trait_design.md
Normal file
542
docs/data_provider_refactoring_and_trait_design.md
Normal file
@ -0,0 +1,542 @@
|
||||
# 设计文档: 面向Rust的事件驱动数据微服务架构
|
||||
|
||||
## 1. 引言
|
||||
|
||||
### 1.1. 文档目的
|
||||
|
||||
本文档旨在为“基本面选股系统”设计一个**完全基于Rust的、事件驱动的、去中心化的微服务架构**。此设计将作为彻底替换现有Python组件、并构建下一代数据处理生态系统的核心技术蓝图。
|
||||
|
||||
新的架构目标是:
|
||||
1. **服务独立化**:将每个外部数据源(Tushare, Finnhub等)封装成独立的、可独立部署和运行的微服务。
|
||||
2. **事件驱动**:引入消息总线(Message Bus)作为服务间通信的主干,实现服务的高度解耦和异步协作。
|
||||
3. **数据中心化**:所有微服务将标准化的数据写入一个由`data-persistence-service`独占管理的中央数据库,实现“数据写入即共享”。
|
||||
4. **纯Rust生态**:从前端网关到最末端的数据提供商,整个后端生态系统将100%使用Rust构建,确保端到端的类型安全、高性能和健壮性。
|
||||
|
||||
### 1.2. 核心架构理念
|
||||
|
||||
- **独立单元 (Independent Units)**: 每个服务都是一个完整的、自包含的应用程序,拥有自己的配置、逻辑和生命周期。
|
||||
- **异步协作 (Asynchronous Collaboration)**: 服务之间通过发布/订阅消息进行通信,而非紧耦合的直接API调用。
|
||||
- **单一事实源 (Single Source of Truth)**: 数据库是所有结构化数据的唯一事实源。服务通过向数据库写入数据来“广播”其工作成果。
|
||||
|
||||
## 2. 目标架构 (Target Architecture)
|
||||
|
||||
### 2.1. 架构图
|
||||
|
||||
```
|
||||
+-------------+ +------------------+ +---------------------------+
|
||||
| | HTTP | | | |
|
||||
| Frontend |----->| API Gateway |----->| Message Bus |
|
||||
| (Next.js) | | (Rust) | | (e.g., RabbitMQ, NATS) |
|
||||
| | | | | |
|
||||
+-------------+ +-------+----------+ +-------------+-------------+
|
||||
| |
|
||||
(Read operations) | | (Pub/Sub Commands & Events)
|
||||
| |
|
||||
+-----------------v------------------+ +------v------+ +----------------+ +----------------+
|
||||
| | | Tushare | | Finnhub | | iFind |
|
||||
| Data Persistence Service (Rust) |<---->| Provider | | Provider | | Provider |
|
||||
| | | Service | | Service | | Service |
|
||||
+-----------------+------------------+ | (Rust) | | (Rust) | | (Rust) |
|
||||
| +-------------+ +----------------+ +----------------+
|
||||
v
|
||||
+-----------------------------------------------------+
|
||||
| |
|
||||
| PostgreSQL Database |
|
||||
| |
|
||||
+-----------------------------------------------------+
|
||||
```
|
||||
|
||||
### 2.2. 服务职责划分
|
||||
|
||||
- **API Gateway (Rust)**:
|
||||
- 面向前端的唯一入口 (BFF - Backend for Frontend)。
|
||||
- 负责处理用户请求、认证鉴权。
|
||||
- 将前端的查询请求转化为对`Data Persistence Service`的数据读取调用。
|
||||
- 将前端的操作请求(如“生成新报告”)转化为命令(Command)并发布到**Message Bus**。
|
||||
|
||||
- **`*_provider-service` (Rust)**:
|
||||
- **一组**独立的微服务,每个服务对应一个外部数据API(如`tushare-provider-service`)。
|
||||
- 订阅Message Bus上的相关命令(如`FetchFinancialsRequest`)。
|
||||
- 独立调用外部API,对返回数据进行清洗、标准化。
|
||||
- 调用`Data Persistence Service`的接口,将标准化后的数据写入数据库。
|
||||
- 操作完成后,可以向Message Bus发布事件(Event),如`FinancialsDataReady`。
|
||||
|
||||
- **Data Persistence Service (Rust)**:
|
||||
- **(职责不变)** 数据库的唯一守门人。
|
||||
- 为所有其他内部微服务提供稳定、统一的数据库读写gRPC/HTTP接口。
|
||||
|
||||
- **Message Bus (e.g., RabbitMQ, NATS)**:
|
||||
- 整个系统的神经中枢,负责所有服务间的异步通信。
|
||||
- 传递命令(“做什么”)和事件(“发生了什么”)。
|
||||
|
||||
## 3. 核心抽象与数据契约
|
||||
|
||||
### 3.1. `DataProvider` Trait (内部实现蓝图)
|
||||
|
||||
此Trait依然是构建**每个独立Provider微服务内部逻辑**的核心蓝图。它定义了一个Provider应该具备的核心能力。
|
||||
|
||||
```rust
|
||||
// This trait defines the internal logic blueprint for each provider microservice.
|
||||
#[async_trait]
|
||||
pub trait DataProvider: Send + Sync {
|
||||
// ... (trait definition remains the same as previous version) ...
|
||||
fn get_id(&self) -> &'static str;
|
||||
async fn get_company_profile(&self, symbol: &str) -> Result<CompanyProfile, DataProviderError>;
|
||||
async fn get_historical_financials(&self, symbol: &str, years: &[u16]) -> Result<Vec<FinancialStatement>, DataProviderError>;
|
||||
// ... etc ...
|
||||
}
|
||||
```
|
||||
|
||||
### 3.2. 标准化数据模型 (共享的数据契约)
|
||||
|
||||
这些模型是服务间共享的“通用语言”,也是存入数据库的最终形态,其重要性在新架构下更高。
|
||||
|
||||
```rust
|
||||
// These structs are the shared "Data Contracts" across all services.
|
||||
// Their definitions remain the same as the previous version.
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)] // Add Serialize/Deserialize for messaging
|
||||
pub struct CompanyProfile { ... }
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct FinancialStatement { ... }
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct MarketDataPoint { ... }
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct RealtimeQuote { ... }
|
||||
```
|
||||
|
||||
### 3.3. 消息/事件定义 (Message/Event Contracts)
|
||||
|
||||
这是新架构的核心,定义了在Message Bus上传递的消息格式。
|
||||
|
||||
```rust
|
||||
use uuid::Uuid;
|
||||
use serde::{Serialize, Deserialize};
|
||||
|
||||
// --- Commands (Instructions to do something) ---
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct FetchCompanyDataCommand {
|
||||
pub request_id: Uuid,
|
||||
pub symbol: String,
|
||||
pub market: String, // To help providers route to the correct API endpoint
|
||||
}
|
||||
|
||||
// --- Events (Notifications that something has happened) ---
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct CompanyProfilePersistedEvent {
|
||||
pub request_id: Uuid,
|
||||
pub symbol: String,
|
||||
// We don't need to carry the full data, as it's now in the database.
|
||||
// Interested services can query it.
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct FinancialsPersistedEvent {
|
||||
pub request_id: Uuid,
|
||||
pub symbol: String,
|
||||
pub years_updated: Vec<u16>,
|
||||
}
|
||||
```
|
||||
|
||||
## 4. 数据工作流示例 (Example Data Workflow)
|
||||
|
||||
1. **请求发起**: 用户在前端请求`AAPL`的分析报告。请求到达`API Gateway`。
|
||||
2. **命令发布**: `API Gateway`生成一个唯一的`request_id`,然后向Message Bus发布一个`FetchCompanyDataCommand`命令。
|
||||
3. **命令消费**: `tushare-provider`、`finnhub-provider`等所有订阅了此命令的服务都会收到消息。
|
||||
4. **独立执行**:
|
||||
- `finnhub-provider`根据`market`和`symbol`,调用Finnhub API获取公司简介、财务、行情数据。
|
||||
- 数据获取成功后,它将数据转换为标准化的`CompanyProfile`, `Vec<FinancialStatement>`等模型。
|
||||
- 它调用`Data Persistence Service`的接口,将这些标准化的数据写入数据库。
|
||||
- 写入成功后,它向Message Bus发布`CompanyProfilePersistedEvent`和`FinancialsPersistedEvent`等事件。
|
||||
- `tushare-provider`收到命令后,可能因为市场不匹配而直接忽略该消息。
|
||||
5. **下游响应**: 一个潜在的`report-generator-service`(图中未画出,属于业务层)可以订阅`...PersistedEvent`。当它收到了生成一份完整报告所需的所有数据事件后,便开始从数据库中拉取这些数据,进行AI分析,并将最终报告存回数据库。
|
||||
6. **前端轮询/通知**: `API Gateway`可以通过WebSocket或长轮询等方式,将最终报告的完成状态通知给前端。
|
||||
|
||||
## 5. 实施路线图 (Roadmap) - 更新于 2025-11-15
|
||||
|
||||
**基于对项目现状的调研,本路线图已更新,明确标识了已完成的工作和接下来的行动计划。**
|
||||
|
||||
---
|
||||
|
||||
### **✔ 阶段-1:容器化与初步服务拆分 (已完成)**
|
||||
- [x] **核心服务已容器化**: `data-persistence-service` 已完全开发、容器化,并通过Docker Compose与数据库和现有Python后端集成。
|
||||
- [x] **数据库已初始化**: Rust服务的 `migrations` 目录证实了数据库表结构已通过 `sqlx-cli` 创建和管理。
|
||||
- [x] **Python后端部分重构**: Python `backend` 服务已经作为客户端,通过HTTP API调用`data-persistence-service`来读写数据。
|
||||
- [x] **配置服务已拆分**: `config-service` 作为一个独立的Python微服务也已存在并运行。
|
||||
- [x] **开发环境已建立**: 整个系统可以通过`Docker Compose`和`Tilt`一键启动。
|
||||
|
||||
---
|
||||
|
||||
### **阶段〇:奠定新架构的基石 (Laying the New Foundation)**
|
||||
- [x] **1. 部署消息总线**: 在`docker-compose.yml`中添加一个消息总线服务 (`NATS`)。这是实现事件驱动架构的**先决条件**。
|
||||
- [x] **2. 创建共享契约库 (`common-contracts`)**: 在`services/`下创建一个新的Rust `common-contracts` crate。
|
||||
- 将`data-persistence-service/src/dtos.rs` 和 `models.rs`中的核心数据结构(如`CompanyProfile`, `FinancialStatement`等)迁移至此。
|
||||
- 添加`architecture_module_specification.md`中定义的消息契约 (`FetchCompanyDataCommand`等) 和可观测性结构 (`HealthStatus`, `TaskProgress`)。
|
||||
- [x] **3. 升级 `data-persistence-service`**:
|
||||
- 使其依赖新的`common-contracts` crate,替换掉本地的数据模型定义。
|
||||
- 为其实现`SystemModule`规范,即添加`/health`和`/tasks`端点。
|
||||
|
||||
---
|
||||
|
||||
### **阶段一:开发 `alphavantage-provider-service` (精确实现蓝图)**
|
||||
|
||||
**目标**: 创建并实现 `alphavantage-provider-service`,使其成为我们新架构下的第一个功能完备、可独立运行的数据提供商微服务。
|
||||
|
||||
- [x] **1. 项目初始化与依赖配置**
|
||||
- [x] **任务**: 基于我们的微服务模板,创建新的Rust项目 `services/alphavantage-provider-service`。
|
||||
- [x] **任务**: 在其`Cargo.toml`中添加核心依赖。
|
||||
```toml
|
||||
# Cargo.toml
|
||||
[dependencies]
|
||||
# ... other dependencies like axum, tokio, etc.
|
||||
common-contracts = { path = "../common-contracts" }
|
||||
|
||||
# Generic MCP Client
|
||||
rmcp = "0.8.5"
|
||||
|
||||
# Message Queue (NATS)
|
||||
async-nats = "0.33"
|
||||
```
|
||||
- [x] **验收标准**: 项目可以成功编译 (`cargo check`)。
|
||||
|
||||
- [x] **2. 实现 `SystemModule` 规范**
|
||||
- [x] **任务**: 在`main.rs`中启动一个Axum HTTP服务器。
|
||||
- [x] **任务**: 实现强制的`/health`端点,返回当前服务的健康状态。
|
||||
- [x] **任务**: 实现强制的`/tasks`端点。此端点需要从一个线程安全的内存存储(例如 `Arc<DashMap<Uuid, TaskProgress>>`)中读取并返回所有正在进行的任务。
|
||||
- [x] **验收标准**: 启动服务后,可以通过`curl`或浏览器访问`http://localhost:port/health`和`http://localhost:port/tasks`并得到正确的JSON响应。
|
||||
|
||||
- [x] **3. 实现核心业务逻辑:事件驱动的数据处理**
|
||||
- [x] **任务**: 实现连接到Message Bus并订阅`FetchCompanyDataCommand`命令的逻辑。
|
||||
- [x] **任务**: 当收到`FetchCompanyDataCommand`命令时,执行以下异步工作流:
|
||||
1. 在任务存储中创建并插入一个新的`TaskProgress`记录。
|
||||
2. 从配置中读取`ALPHAVANTAGE_API_KEY`,并构建MCP端点URL。
|
||||
3. 初始化通用的`rmcp`客户端: `let client = rmcp::mcp::Client::new(mcp_endpoint_url);`
|
||||
4. 使用`tokio::try_join!`**并行**执行多个数据获取任务。**注意:函数名是字符串,返回的是`serde_json::Value`。**
|
||||
```rust
|
||||
// 伪代码示例
|
||||
let symbol = &command.symbol;
|
||||
|
||||
let overview_task = client.query("OVERVIEW", &[("symbol", symbol)]);
|
||||
let income_task = client.query("INCOME_STATEMENT", &[("symbol", symbol)]);
|
||||
// ... 其他任务
|
||||
|
||||
match tokio::try_join!(overview_task, income_task, /*...*/) {
|
||||
Ok((overview_json, income_json, /*...*/)) => {
|
||||
// overview_json and income_json are of type serde_json::Value
|
||||
// ... 进入步骤 4
|
||||
},
|
||||
Err(e) => { /* ... */ }
|
||||
}
|
||||
```
|
||||
5. 在`try_join!`前后,精确地更新内存中`TaskProgress`的状态。
|
||||
- [x] **验收标准**: 在Message Bus中发布命令后,服务的日志能正确打印出从Alpha Vantage获取到的原始JSON数据。
|
||||
|
||||
- [x] **4. 实现数据转换与持久化 (强类型映射)**
|
||||
- [x] **任务**: **(关键变更)** 实现 `TryFrom<serde_json::Value>` Trait,完成从动态JSON到我们`common-contracts`模型的**带错误处理的**转换。
|
||||
```rust
|
||||
// alphavantage-provider-service/src/mapping.rs
|
||||
use serde_json::Value;
|
||||
use common_contracts::models as our;
|
||||
|
||||
impl TryFrom<Value> for our::CompanyProfile {
|
||||
type Error = anyhow::Error; // Or a more specific parsing error
|
||||
|
||||
fn try_from(v: Value) -> Result<Self, Self::Error> {
|
||||
Ok(our::CompanyProfile {
|
||||
symbol: v["Symbol"].as_str().ok_or_else(|| anyhow!("Missing Symbol"))?.to_string(),
|
||||
name: v["Name"].as_str().ok_or_else(|| anyhow!("Missing Name"))?.to_string(),
|
||||
// ... 其他字段的安全解析和转换
|
||||
})
|
||||
}
|
||||
}
|
||||
```
|
||||
- [x] **任务**: 创建一个类型化的HTTP客户端 (Data Persistence Client),用于与`data-persistence-service`通信。
|
||||
- [x] **任务**: 在所有数据转换成功后,调用上述客户端进行持久化。
|
||||
- [x] **验收标准**: 数据库中查询到的数据,结构完全符合`common-contracts`定义。
|
||||
|
||||
- [x] **5. 实现事件发布与任务完成**
|
||||
- [x] **任务**: 在数据成功持久化到数据库后,向Message Bus发布相应的数据就绪事件,如`CompanyProfilePersistedEvent`和`FinancialsPersistedEvent`。
|
||||
- [x] **任务**: 在所有流程执行完毕(无论成功或失败)后,从内存存储中移除对应的`TaskProgress`对象(或将其标记为“已完成”并设置TTL)。
|
||||
- [x] **验收标准**: 能够在Message Bus中监听到本服务发布的事件。`/tasks`接口不再显示已完成的任务。
|
||||
|
||||
---
|
||||
|
||||
### **阶段二:重构API网关与请求流程 (精确实现蓝图)**
|
||||
|
||||
**目标**: 创建一个纯Rust的`api-gateway`服务,它将作为前端的唯一入口(BFF),负责发起数据获取任务、查询持久化数据以及追踪分布式任务进度。
|
||||
|
||||
- [x] **1. 项目初始化与 `SystemModule` 规范实现**
|
||||
- [x] **任务**: 基于我们的微服务模板,创建新的Rust项目 `services/api-gateway`。
|
||||
- [x] **任务**: 在其`Cargo.toml`中添加核心依赖: `axum`, `tokio`, `common-contracts`, `async-nats`, `reqwest`, `tracing`, `config`。
|
||||
- [x] **任务**: 实现强制的`/health`端点。
|
||||
- [x] **任务**: 实现强制的`/tasks`端点。由于网关本身是无状态的、不执行长任务,此端点当前可以简单地返回一个空数组`[]`。
|
||||
- [x] **验收标准**: `api-gateway`服务可以独立编译和运行,并且`/health`接口按预期工作。
|
||||
|
||||
- [x] **2. 实现数据触发流程 (发布命令)**
|
||||
- [x] **任务**: 在`api-gateway`中创建一个新的HTTP端点 `POST /v1/data-requests`,它应接收一个JSON体,例如: `{"symbol": "AAPL", "market": "US"}`。
|
||||
- [x] **任务**: 为此端点实现处理逻辑:
|
||||
1. 生成一个全局唯一的 `request_id` (UUID)。
|
||||
2. 创建一个`common_contracts::messages::FetchCompanyDataCommand`消息,填入请求参数和`request_id`。
|
||||
3. 连接到Message Bus,并将此命令发布到`data_fetch_commands`队列。
|
||||
4. 向前端立即返回 `202 Accepted` 状态码,响应体中包含 `{ "request_id": "..." }`,以便前端后续追踪。
|
||||
- [x] **验收标准**: 通过工具(如Postman)调用此端点后,能够在NATS的管理界面看到相应的消息被发布,同时`alphavantage-provider-service`的日志显示它已接收并开始处理该命令。
|
||||
|
||||
- [x] **3. 实现数据查询流程 (读取持久化数据)**
|
||||
- [x] **任务**: 在`api-gateway`中创建一个类型化的HTTP客户端 (Persistence Client),用于与`data-persistence-service`通信。
|
||||
- [x] **任务**: 实现 `GET /v1/companies/{symbol}/profile` 端点。该端点接收股票代码,通过Persistence Client调用`data-persistence-service`的相应接口,并将查询到的`CompanyProfile`数据返回给前端。
|
||||
- [x] **任务**: (可选) 根据需要,实现查询财务报表、行情数据等其他数据类型的端点。
|
||||
- [x] **验收标准**: 在`alphavantage-provider-service`成功写入数据后,通过浏览器或`curl`调用这些新端点,可以查询到预期的JSON数据。
|
||||
|
||||
- [x] **4. 实现分布式任务进度追踪**
|
||||
- [x] **任务**: 在`api-gateway`的配置中,增加一个`provider_services`字段,用于列出所有数据提供商服务的地址,例如: `["http://alphavantage-provider-service:8000"]`。
|
||||
- [x] **任务**: 实现 `GET /v1/tasks/{request_id}` 端点。
|
||||
- [x] **任务**: 该端点的处理逻辑需要:
|
||||
1. 读取配置中的`provider_services`列表。
|
||||
2. 使用`tokio::join!`或`futures::future::join_all`,**并行地**向所有provider服务的`/tasks`端点发起HTTP GET请求。
|
||||
3. 聚合所有服务的返回结果(一个`Vec<Vec<TaskProgress>>`),并从中线性搜索与URL路径中`request_id`匹配的`TaskProgress`对象。
|
||||
4. 如果找到匹配的任务,将其作为JSON返回。如果遍历完所有结果都未找到,则返回`404 Not Found`。
|
||||
- [x] **验收标准**: 当`alphavantage-provider-service`正在处理一个任务时,通过`api-gateway`的这个新端点(并传入正确的`request_id`),能够实时查询到该任务的进度详情。
|
||||
|
||||
---
|
||||
|
||||
### **阶段三:逐步迁移与替换 (精确实现蓝图)**
|
||||
|
||||
**目标**: 将前端应用无缝对接到新的Rust `api-gateway`,并随着数据提供商的逐步完善,最终彻底移除旧的Python `backend`服务,完成整个系统的架构升级。
|
||||
|
||||
- [x] **1. 将 `api-gateway` 集成到开发环境**
|
||||
- [x] **任务**: 在根目录的 `docker-compose.yml` 文件中,为我们新创建的 `api-gateway` 服务添加入口定义。
|
||||
```yaml
|
||||
# docker-compose.yml
|
||||
services:
|
||||
# ... other services
|
||||
api-gateway:
|
||||
build:
|
||||
context: ./services/api-gateway
|
||||
dockerfile: Dockerfile
|
||||
container_name: api-gateway
|
||||
environment:
|
||||
# 注入所有必要的配置
|
||||
SERVER_PORT: 4000
|
||||
NATS_ADDR: nats://nats:4222
|
||||
DATA_PERSISTENCE_SERVICE_URL: http://data-persistence-service:3000/api/v1
|
||||
# 注意: provider_services需要包含所有provider的内部地址
|
||||
PROVIDER_SERVICES: '["http://alphavantage-provider-service:8000"]'
|
||||
ports:
|
||||
- "14000:4000"
|
||||
depends_on:
|
||||
- nats
|
||||
- data-persistence-service
|
||||
- alphavantage-provider-service
|
||||
```
|
||||
- [x] **任务**: (如果尚未完成) 在`docker-compose.yml`中添加`nats`服务。
|
||||
- [x] **验收标准**: 运行 `docker-compose up` (或 `tilt up`) 后,`api-gateway` 服务能够成功启动并连接到消息总线。
|
||||
|
||||
- [x] **2. 迁移前端应用的API调用逻辑**
|
||||
- [x] **任务**: 修改前端项目的环境变量,将API请求的目标从旧`backend`指向新`api-gateway`。
|
||||
```
|
||||
# frontend/.env.local (or in docker-compose.yml)
|
||||
NEXT_PUBLIC_BACKEND_URL=http://api-gateway:4000/v1
|
||||
```
|
||||
- [x] **任务**: 重构前端的数据获取Hooks(例如 `useApi.ts`)。
|
||||
- **旧逻辑**: 发起一个长轮询GET请求,等待完整数据返回。
|
||||
- **新逻辑**:
|
||||
1. **触发**: 发起一个 `POST` 请求到 `/data-requests`,并从响应中获取 `request_id`。
|
||||
2. **轮询**: 使用 `useSWR` 或 `react-query` 的轮询功能,每隔2-3秒调用一次 `GET /tasks/{request_id}` 端点来获取任务进度。
|
||||
3. **展示**: 根据任务进度更新UI(例如,显示加载条和状态信息)。
|
||||
4. **完成**: 当任务状态变为 "completed" (或类似状态),或 `GET /tasks/{request_id}` 返回 `404` 时,停止轮询,并调用 `GET /companies/{symbol}/profile` 等数据查询端点来获取最终数据并渲染。
|
||||
- [x] **验收标准**: 在前端页面输入股票代码并点击“生成报告”后,能够触发新的异步工作流,并在UI上看到实时进度的更新,最终成功展示由 `alphavantage-provider-service` 获取的数据。
|
||||
|
||||
---
|
||||
|
||||
### **阶段四:数据提供商生态系统扩展 (Data Provider Ecosystem Expansion)**
|
||||
|
||||
**目标**: 将现有Python `backend`中的核心`data_providers`,逐一重写为独立的Rust微服务,丰富我们的数据维度。
|
||||
|
||||
- [ ] **0. (前置任务) 完成所有Provider的适配性分析**
|
||||
- [ ] **任务**: 在开始大规模编码前,完成 **附录A** 中所有待迁移数据提供商的适配性分析,确保`common-contracts`模型的完备性,并明确每个Provider的实现关键点。
|
||||
|
||||
- [x] **1. 实现 `tushare-provider-service` (中国市场核心)**
|
||||
- [x] **任务**: 基于 `alphavantage-provider-service` 模板,创建并实现服务的基本框架。
|
||||
- [x] **任务**: 完成 Tushare 8个核心API的并行数据获取、聚合与报告期筛选逻辑。
|
||||
- [x] **任务**: 在 `mapping.rs` 中,精确复刻Python版本 `_calculate_derived_metrics` 方法中近20个派生财务指标的计算逻辑。
|
||||
- [x] **任务**: 在 `docker-compose.yml`中添加此服务,并将其地址加入到`api-gateway`的`PROVIDER_SERVICES`环境变量中。
|
||||
- [x] **验收标准**: 收到`market: "CN"`的`FetchCompanyDataCommand`命令时,该服务能被触发,并成功将与Python版本逻辑一致的A股数据(**包含所有派生指标**)写入数据库。
|
||||
|
||||
- [x] **2. (可选) 迁移其他数据提供商**
|
||||
- [x] **任务**: 基于各自的适配性分析,创建并实现`finnhub-provider-service`。
|
||||
- [x] **任务**: 基于各自的适配性分析,创建并实现`yfinance-provider-service`。
|
||||
- [ ] **任务**: 基于各自的适配性分析,创建并实现`ifind-provider-service`。
|
||||
|
||||
---
|
||||
|
||||
### **阶段五:业务逻辑迁移与最终替换 (Business Logic Migration & Final Replacement)**
|
||||
|
||||
**目标**: 将Python `backend`中剩余的AI分析和配置管理逻辑迁移到Rust生态,并最终彻底下线Python服务。
|
||||
|
||||
- [x] **1. 创建 `report-generator-service`**
|
||||
- [x] **任务**: 创建一个新的Rust微服务`report-generator-service`。
|
||||
- [x] **任务**: 实现对Message Bus事件(如`FinancialsPersistedEvent`)的订阅。
|
||||
- [x] **任务**: 将原Python `backend`中的`analysis_client.py`和`company_profile_client.py`的逻辑迁移至此服务。
|
||||
- [x] **验收标准**: 当所有数据提供商完成数据写入后,此服务能被自动触发,并成功生成AI分析报告。
|
||||
|
||||
- [x] **2. (可选) 创建 `config-service-rs`**
|
||||
- [x] **任务**: 用Rust重写现有的Python `config-service`。
|
||||
- [x] **验收标准**: 所有Rust微服务都能从新的配置服务中获取配置并正常启动。
|
||||
|
||||
- [x] **3. 光荣退役:下线所有Python服务**
|
||||
- [x] **前提条件**: 所有数据获取和AI分析功能均已由新的Rust微服务完全承载。
|
||||
- [x] **任务**: 在 `docker-compose.yml` 中,删除 `backend` 和 `config-service` 的服务定义。
|
||||
- [x] **任务**: 将`backend/`和`services/config-service/`目录移动至`archive/python/`进行归档保留。
|
||||
- [x] **验收标准**: 整个系统在没有任何Python组件的情况下,依然能够完整、正常地运行所有核心功能。架构升级正式完成。
|
||||
|
||||
---
|
||||
|
||||
## 附录A: 数据提供商适配性分析
|
||||
|
||||
本附录用于详细记录每个待迁移的数据提供商API与我们`common-contracts`标准模型之间的适配性。
|
||||
|
||||
### A.1 Tushare 适配性分析
|
||||
|
||||
**核心结论**: 适配**完全可行**,但**计算逻辑复杂**。`common-contracts`无需调整。迁移工作的核心是精确复刻Python版本中近400行的财务数据聚合与派生指标计算逻辑。
|
||||
|
||||
**1. 数据模型适配概览**
|
||||
|
||||
| `common-contracts` 模型 | 适配可行性 | 关键实现要点 |
|
||||
| :--- | :--- | :--- |
|
||||
| **`CompanyProfile`** | ✅ **高** | 使用 `stock_basic` 和 `stock_company` 接口。 |
|
||||
| **`DailyMarketData`** | ✅ **高** | 关联 `daily` 和 `daily_basic` 接口。 |
|
||||
| **`RealtimeQuote`** | ⚠️ **中** | Tushare无直接对应接口,可使用最新日线数据作为“准实时”替代。 |
|
||||
| **`FinancialStatement`** | ✅ **高,但复杂** | **(核心难点)** 需聚合 `balancesheet`, `income`, `cashflow`, `fina_indicator` 等8个API,并复刻近20个派生指标的计算。 |
|
||||
|
||||
**2. 关键迁移逻辑**
|
||||
|
||||
- **多表聚合**: Rust版本需实现并行调用多个Tushare API,并以`end_date`为主键将结果聚合。
|
||||
- **报告期筛选**: 需复刻“今年的最新报告 + 往年所有年报”的筛选逻辑。
|
||||
- **派生指标计算**: 必须用Rust精确实现`_calculate_derived_metrics`方法中的所有计算公式。
|
||||
|
||||
---
|
||||
|
||||
### A.2 Finnhub 适配性分析
|
||||
|
||||
**核心结论**: 适配**可行**。Finnhub作为美股和全球市场的主要数据源,数据较为规范,但同样涉及**多API聚合**和**少量派生计算**。`common-contracts`无需调整。
|
||||
|
||||
**1. 数据模型适配概览**
|
||||
|
||||
| `common-contracts` 模型 | 适配可行性 | 关键实现要点 |
|
||||
| :--- | :--- | :--- |
|
||||
| **`CompanyProfile`** | ✅ **高** | 使用 `/stock/profile2` 接口。 |
|
||||
| **`DailyMarketData`** | ✅ **高** | 使用 `/stock/candle` 接口获取OHLCV,使用 `/stock/metric` 获取PE/PB等指标。 |
|
||||
| **`RealtimeQuote`** | ✅ **高** | 使用 `/quote` 接口。 |
|
||||
| **`FinancialStatement`** | ✅ **高,但需聚合** | 需聚合 `/stock/financials-reported` (按`ic`, `bs`, `cf`查询)返回的三张报表,并进行少量派生计算。 |
|
||||
|
||||
**2. 关键迁移逻辑**
|
||||
|
||||
- **多API聚合**: `FinancialStatement`的构建需要组合`/stock/financials-reported`接口的三次调用结果。`DailyMarketData`的构建也需要组合`/stock/candle`和`/stock/metric`。
|
||||
- **派生指标计算**: Python代码 (`finnhub.py`) 中包含了自由现金流 (`__free_cash_flow`) 和其他一些比率的计算,这些需要在Rust中复刻。
|
||||
- **字段名映射**: Finnhub返回的字段名(如`netIncome`)需要被映射到我们标准模型的字段名(如`net_income`)。
|
||||
|
||||
---
|
||||
|
||||
### A.3 YFinance 适配性分析
|
||||
|
||||
**核心结论**: 适配**可行**,主要作为**行情数据**的补充或备用源。`yfinance`库的封装使得数据获取相对简单。
|
||||
|
||||
**1. 数据模型适配概览**
|
||||
|
||||
| `common-contracts` 模型 | 适配可行性 | 关键实现要点 |
|
||||
| :--- | :--- | :--- |
|
||||
| **`CompanyProfile`** | ✅ **中** | `ticker.info` 字典提供了大部分信息,但字段不如Finnhub或Tushare规范。 |
|
||||
| **`DailyMarketData`** | ✅ **高** | `ticker.history()` 方法是主要数据来源,可直接提供OHLCV。 |
|
||||
| **`RealtimeQuote`** | ⚠️ **低** | `yfinance`本身不是为实时流式数据设计的,获取的数据有延迟。 |
|
||||
| **`FinancialStatement`** | ✅ **中** | `ticker.financials`, `ticker.balance_sheet`, `ticker.cashflow` 提供了数据,但需要手动将多年度的数据列转换为按年份的记录行。 |
|
||||
|
||||
**2. 关键迁移逻辑**
|
||||
|
||||
- **数据结构转换**: `yfinance`返回的DataFrame需要被转换为我们期望的`Vec<Record>`结构。特别是财务报表,需要将列式(多年份)数据转换为行式(单年份)记录。
|
||||
- **库的替代**: Rust中没有`yfinance`库。我们需要找到一个替代的Rust库 (如 `yahoo_finance_api`),或者直接模拟其HTTP请求来获取数据。这将是迁移此模块的主要工作。
|
||||
|
||||
---
|
||||
|
||||
- [ ] **2. (可选) 迁移其他数据提供商**
|
||||
- [x] **任务**: 基于各自的适配性分析,创建并实现`finnhub-provider-service`。
|
||||
- [x] **任务**: 基于各自的适配性分析,创建并实现`yfinance-provider-service`。
|
||||
- [ ] **任务**: **(已暂停/待独立规划)** 实现`ifind-provider-service`。
|
||||
|
||||
---
|
||||
|
||||
### A.4 iFind 适配性分析 - **更新于 2025-11-16**
|
||||
|
||||
**核心结论**: **当前阶段纯Rust迁移复杂度极高,任务已暂停**。iFind的Python接口 (`iFinDPy.py`) 是一个基于 `ctypes` 的薄封装,它直接调用了底层的C/C++动态链接库 (`.so` 文件)。这意味着没有任何可见的网络协议或HTTP请求可供我们在Rust中直接模拟。
|
||||
|
||||
**1. 迁移路径评估**
|
||||
|
||||
基于对 `ref/ifind` 库文件的调研,我们确认了迁移此模块面临两个选择:
|
||||
|
||||
1. **HTTP API方案 (首选,待调研)**:
|
||||
- **描述**: 您提到iFind存在一个HTTP API版本。这是最符合我们纯Rust、去中心化架构的理想路径。
|
||||
- **工作量评估**: **中等**。如果该HTTP API文档齐全且功能满足需求,那么开发此服务的工作量将与 `finnhub-provider-service` 类似。
|
||||
- **规划**: 此路径应作为一个**独立的、后续的调研与开发任务**。当前置于暂停状态。
|
||||
|
||||
2. **FFI方案 (备选,不推荐)**:
|
||||
- **描述**: 在Rust服务中通过FFI(如 `pyo3` 或 `rust-cpython` crate)嵌入Python解释器,直接调用 `iFinDPy` 库。
|
||||
- **工作量评估**: **高**。虽然可以复用逻辑,但这会引入技术栈污染,破坏我们纯Rust的目标,并显著增加部署和维护的复杂度(需要在容器中管理Python环境和iFind的二进制依赖)。这与我们“rustic”的确定性原则相悖。
|
||||
|
||||
**2. 最终决定**
|
||||
|
||||
- **暂停实现**: `ifind-provider-service` 的开发工作已**正式暂停**。
|
||||
- **更新路线图**: 在主路线图中,此任务已被标记为“已暂停/待独立规划”。
|
||||
- **未来方向**: 当项目进入下一阶段时,我们将启动一个独立的任务来**专门调研其HTTP API**,并基于调研结果决定最终的实现策略。
|
||||
|
||||
---
|
||||
|
||||
## 附录B: 业务逻辑模块迁移分析
|
||||
|
||||
本附录用于分析`backend/app/services/`中包含的核心业务逻辑,并为将其迁移至Rust服务制定策略。
|
||||
|
||||
### B.1 `analysis_client.py` & `company_profile_client.py`
|
||||
|
||||
- **核心功能**: 这两个模块是AI分析的核心,负责与大语言模型(如Gemini)API进行交互。
|
||||
- `analysis_client.py`: 提供一个**通用**的分析框架,可以根据不同的`prompt_template`执行任意类型的分析。它还包含一个`SafeFormatter`来安全地填充模板。
|
||||
- `company_profile_client.py`: 是一个**特化**的版本,包含了用于生成公司简介的、具体的、硬编码的长篇Prompt。
|
||||
|
||||
- **迁移策略**:
|
||||
1. **统一并重写为 `report-generator-service`**: 这两个模块的功能应被合并并迁移到一个全新的Rust微服务——`report-generator-service`中。
|
||||
2. **订阅事件**: 该服务将订阅Message Bus上的数据就绪事件(如`FinancialsPersistedEvent`),而不是被HTTP直接调用。
|
||||
3. **Prompt管理**: 硬编码在`company_profile_client.py`中的Prompt,以及`analysis_client.py`所依赖的、从`analysis-config.json`加载的模板,都应该由`report-generator-service`统一管理。在初期,可以从配置文件加载;未来,可以由Rust版的`config-service-rs`提供。
|
||||
4. **复刻`SafeFormatter`**: Python版本中用于安全填充模板的`SafeFormatter`逻辑需要在Rust中被等价复刻,以确保在上下文不完整时系统的健壮性。
|
||||
5. **AI客户端**: 使用`reqwest`或其他HTTP客户端库在Rust中重新实现与大模型API的交互逻辑。
|
||||
|
||||
- **结论**: 迁移**完全可行**。核心工作是将Python中的Prompt管理和API调用逻辑,用Rust的异步方式重写。这将使AI分析任务成为一个独立的、可扩展的、事件驱动的后台服务。
|
||||
|
||||
---
|
||||
|
||||
### B.2 `config_manager.py`
|
||||
|
||||
- **核心功能**: 作为Python `backend`内部的一个组件,它负责从`config-service`拉取配置,并与本地`config.json`文件进行合并。它还包含了测试各种配置有效性的逻辑(如测试数据库连接、Tushare Token等)。
|
||||
|
||||
- **迁移策略**:
|
||||
- **功能分散化**: `ConfigManager`本身不会作为一个独立的Rust服务存在,它的功能将被**分散**到每个需要它的微服务中。
|
||||
- **配置拉取**: 每个Rust微服务(`api-gateway`, `tushare-provider`等)在启动时,都将负责**独立地**从环境变量或未来的`config-service-rs`中获取自己的配置。我们为每个服务编写的`config.rs`模块已经实现了这一点。
|
||||
- **配置测试逻辑**: 测试配置的逻辑(如`_test_database`, `_test_tushare`)非常有用,但不属于运行时功能。这些逻辑可以被迁移到:
|
||||
1. **独立的CLI工具**: 创建一个Rust CLI工具,专门用于测试和验证整个系统的配置。
|
||||
2. **服务的`/health`端点**: 在每个服务的`/health`检查中,可以包含对其依赖服务(数据库、外部API)连通性的检查,从而在运行时提供健康状况反馈。
|
||||
|
||||
- **结论**: `ConfigManager`的功能将被**“肢解”并吸收**到新的Rust微服务生态中,而不是直接迁移。
|
||||
|
||||
---
|
||||
|
||||
### B.3 `data_persistence_client.py`
|
||||
|
||||
- **核心功能**: 这是一个HTTP客户端,用于让Python `backend`与其他微服务(`data-persistence-service`)通信。
|
||||
|
||||
- **迁移策略**:
|
||||
- **模式复用**: 这个模块本身就是我们新架构模式的一个成功范例。
|
||||
- **Rust等价实现**: 我们在`alphavantage-provider-service`中创建的`persistence.rs`客户端,以及即将在`api-gateway`和`report-generator-service`中创建的类似客户端,正是`data_persistence_client.py`的Rust等价物。
|
||||
- **最终废弃**: 当Python `backend`最终被下线时,这个客户端模块也将随之被废弃。
|
||||
|
||||
- **结论**: 该模块**无需迁移**,其设计思想已被我们的Rust服务所采纳和实现。
|
||||
Loading…
Reference in New Issue
Block a user