diff --git a/crates/workflow-context/src/types.rs b/crates/workflow-context/src/types.rs index f277924..9080b0c 100644 --- a/crates/workflow-context/src/types.rs +++ b/crates/workflow-context/src/types.rs @@ -1,19 +1,19 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub enum EntryKind { File, Dir, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct DirEntry { pub name: String, pub kind: EntryKind, pub object_id: String, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum FileChange { Added(String), Modified(String), diff --git a/docs/3_project_management/tasks/pending/20251125_refactor_data_provider_abstraction.md b/docs/3_project_management/tasks/pending/20251125_refactor_data_provider_abstraction.md deleted file mode 100644 index da9f4c3..0000000 --- a/docs/3_project_management/tasks/pending/20251125_refactor_data_provider_abstraction.md +++ /dev/null @@ -1,89 +0,0 @@ -# 重构任务:统一 Data Provider 工作流抽象 - -## 1. 背景 (Background) - -目前的 Data Provider 服务(Tushare, YFinance, AlphaVantage 等)在架构上存在严重的重复代码和实现不一致问题。每个服务都独立实现了完整的工作流,包括: -- NATS 消息接收与反序列化 -- 缓存检查与写入 (Cache-Aside Pattern) -- 任务状态管理 (Observability/Task Progress) -- Session Data 持久化 -- NATS 事件发布 (Success/Failure events) - -这种"散弹式"架构导致了以下问题: -1. **Bug 易发且难以统一修复**:例如 Tushare 服务因未执行 NATS Flush 导致事件丢失,而 YFinance 却因为实现方式不同而没有此问题。修复一个 Bug 需要在每个服务中重复操作。 -2. **逻辑不一致**:不同 Provider 对缓存策略、错误处理、重试机制的实现可能存在细微差异,违背了系统的统一性。 -3. **维护成本高**:新增一个 Provider 需要复制粘贴大量基础设施代码(Boilerplate),容易出错。 - -## 2. 目标 (Objectives) - -贯彻 "Rustic" 的设计理念(强类型、单一来源、早失败),通过控制反转(IoC)和模板方法模式,将**业务逻辑**与**基础设施逻辑**彻底分离。 - -- **单一来源 (Single Source of Truth)**:工作流的核心逻辑(缓存、持久化、通知)只在一个地方定义和维护。 -- **降低耦合**:具体 Provider 只需关注 "如何从 API 获取数据",而无需关心 "如何与系统交互"。 -- **提升稳定性**:统一修复基础设施层面的问题(如 NATS Flush),所有 Provider 自动受益。 - -## 3. 技术方案 (Technical Design) - -### 3.1 核心抽象 (The Trait) - -在 `common-contracts` 中定义纯粹的业务逻辑接口: - -```rust -#[async_trait] -pub trait DataProviderLogic: Send + Sync { - /// Provider 的唯一标识符 (e.g., "tushare", "yfinance") - fn provider_id(&self) -> &str; - - /// 检查是否支持该市场 (前置检查) - fn supports_market(&self, market: &str) -> bool { - true - } - - /// 核心业务:从外部源获取原始数据并转换为标准 DTO - /// 不涉及任何 DB 或 NATS 操作 - async fn fetch_data(&self, symbol: &str) -> Result<(CompanyProfileDto, Vec), anyhow::Error>; -} -``` - -### 3.2 通用工作流引擎 (The Engine) - -实现一个泛型结构体或函数 `StandardFetchWorkflow`,封装所有基础设施逻辑: - -1. **接收指令**:解析 `FetchCompanyDataCommand`。 -2. **前置检查**:调用 `supports_market`。 -3. **状态更新**:向 `AppState` 写入 "InProgress"。 -4. **缓存层**: - * 检查 `persistence_client` 缓存。 - * HIT -> 直接返回。 - * MISS -> 调用 `fetch_data`,然后写入缓存。 -5. **持久化层**:将结果写入 `SessionData`。 -6. **事件通知**: - * 构建 `FinancialsPersistedEvent`。 - * 发布 NATS 消息。 - * **关键:执行 `flush().await`。** -7. **错误处理**:统一捕获错误,发布 `DataFetchFailedEvent`,更新 Task 状态为 Failed。 - -## 4. 执行步骤 (Execution Plan) - -1. **基础设施准备**: - * 在 `services/common-contracts` 中添加 `DataProviderLogic` trait。 - * 在 `services/common-contracts` (或新建 `service-kit` 模块) 中实现 `StandardFetchWorkflow`。 - -2. **重构 Tushare Service**: - * 创建 `TushareFetcher` 实现 `DataProviderLogic`。 - * 删除 `worker.rs` 中的冗余代码,替换为对 `StandardFetchWorkflow` 的调用。 - * 验证 NATS Flush 问题是否自然解决。 - -3. **重构 YFinance Service**: - * 同样方式重构,验证通用性。 - -4. **验证**: - * 运行 E2E 测试,确保数据获取流程依然通畅。 - -## 5. 验收标准 (Acceptance Criteria) - -- `common-contracts` 中包含清晰的 Trait 定义。 -- Tushare 和 YFinance 的 `worker.rs` 代码量显著减少(预计减少 60%+)。 -- 所有 Provider 的行为(日志格式、状态更新频率、缓存行为)完全一致。 -- 即使不手动写 `flush`,重构后的 Provider 也能可靠发送 NATS 消息。 - diff --git a/frontend/src/components/report/FinancialTable.tsx b/frontend/src/components/report/FinancialTable.tsx deleted file mode 100644 index 3236860..0000000 --- a/frontend/src/components/report/FinancialTable.tsx +++ /dev/null @@ -1,117 +0,0 @@ -import { Card, CardHeader, CardTitle, CardContent } from "@/components/ui/card" -import { Table, TableBody, TableCaption, TableCell, TableHead, TableHeader, TableRow } from "@/components/ui/table" -import { ScrollArea } from "@/components/ui/scroll-area" -import { Badge } from "@/components/ui/badge" -import { CheckCircle2, XCircle, Loader2 } from "lucide-react" -import { useWorkflowStore } from "@/stores/useWorkflowStore" -import { TaskStatus } from "@/types/workflow" -import { schemas } from "@/api/schema.gen" - -export function FinancialTable() { - const { tasks, dag } = useWorkflowStore(); - - // Identify DataFetch tasks dynamically from DAG - const fetchTasks = dag?.nodes.filter(n => n.type === schemas.TaskType.enum.DataFetch) || []; - - return ( -
- {/* 1. Data Provider Status Cards */} -
- {fetchTasks.map(node => { - const taskState = tasks[node.id]; - const status = taskState?.status || schemas.TaskStatus.enum.Pending; - - return ( - - -
- {node.name} - -
-
- -
- {taskState?.message || "Waiting to start..."} -
- - {/* Mock Raw Data Preview if Completed */} - {status === schemas.TaskStatus.enum.Completed && ( -
-
Raw Response Preview
- - {`{ - "symbol": "AAPL", - "currency": "USD", - "data": [ - { "period": "2023", "rev": 383285000000 }, - { "period": "2022", "rev": 394328000000 } - ] -}`} - -
- )} -
-
- ); - })} -
- - {/* 2. Consolidated Financial Table (Mock) */} - - - Consolidated Income Statement - - - - Aggregated from all successful providers. - - - Metric - TTM - 2023 - 2022 - 2021 - - - - - Revenue - 10.5B - 10.0B - 9.2B - 8.5B - - - Gross Profit - 4.2B - 4.0B - 3.6B - 3.2B - - - Net Income - 2.1B - 2.0B - 1.8B - 1.5B - - -
-
-
-
- ) -} - -function StatusBadge({ status }: { status: TaskStatus }) { - switch (status) { - case schemas.TaskStatus.enum.Running: - return Fetching; - case schemas.TaskStatus.enum.Completed: - return Success; - case schemas.TaskStatus.enum.Failed: - return Failed; - default: - return Pending; - } -} diff --git a/frontend/src/components/workflow/ContextExplorer.tsx b/frontend/src/components/workflow/ContextExplorer.tsx new file mode 100644 index 0000000..9377d8d --- /dev/null +++ b/frontend/src/components/workflow/ContextExplorer.tsx @@ -0,0 +1,291 @@ +import React, { useState, useEffect } from 'react'; +import { ScrollArea } from "@/components/ui/scroll-area"; +import { Badge } from "@/components/ui/badge"; +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; +import { Button } from "@/components/ui/button"; +import { ChevronRight, ChevronDown, FileText, Folder, FileJson, RefreshCw, GitBranch, GitCommit } from 'lucide-react'; +import { cn } from "@/lib/utils"; + +// Types mirroring the Rust backend +type EntryKind = 'File' | 'Dir'; + +interface DirEntry { + name: string; + kind: EntryKind; + object_id: string; +} + +interface ContextExplorerProps { + reqId: string; + commitHash: string; + diffTargetHash?: string; // If present, show diff against this commit + className?: string; + highlightPaths?: string[]; // Paths to auto-expand +} + +type DiffStatus = 'added' | 'modified' | 'deleted' | 'none'; + +interface TreeNodeProps { + reqId: string; + commitHash: string; + path: string; + name: string; + kind: EntryKind; + depth: number; + getDiffStatus: (path: string) => DiffStatus; + onPreview: (path: string, kind: EntryKind) => void; +} + +const TreeNode: React.FC = ({ + reqId, commitHash, path, name, kind, depth, getDiffStatus, onPreview +}) => { + const [expanded, setExpanded] = useState(false); + const [children, setChildren] = useState(null); + const [loading, setLoading] = useState(false); + + const diffStatus = getDiffStatus(path); + + const toggleExpand = async (e: React.MouseEvent) => { + e.stopPropagation(); + if (kind === 'File') { + onPreview(path, kind); + return; + } + + const nextExpanded = !expanded; + setExpanded(nextExpanded); + + if (nextExpanded && !children) { + setLoading(true); + try { + // The path logic: root is empty string. Subdirs are "dir/subdir". + const queryPath = path; + const res = await fetch(`/api/context/${reqId}/tree/${commitHash}?path=${encodeURIComponent(queryPath)}`); + if (res.ok) { + const data = await res.json(); + // Sort: Dirs first, then Files + data.sort((a: DirEntry, b: DirEntry) => { + if (a.kind === b.kind) return a.name.localeCompare(b.name); + return a.kind === 'Dir' ? -1 : 1; + }); + setChildren(data); + } + } catch (err) { + console.error("Failed to load tree", err); + } finally { + setLoading(false); + } + } + }; + + // Icons + const Icon = kind === 'Dir' ? Folder : (name.endsWith('.json') ? FileJson : FileText); + + const getDiffColor = () => { + switch (diffStatus) { + case 'added': return 'text-green-600 font-medium dark:text-green-400'; + case 'modified': return 'text-yellow-600 font-medium dark:text-yellow-400'; + case 'deleted': return 'text-red-600 line-through dark:text-red-400'; + default: return 'text-foreground'; + } + }; + + return ( +
+
+
+ {kind === 'Dir' && ( + loading ? : + expanded ? : + )} +
+ + {name} + {diffStatus && diffStatus !== 'none' && ( + + {diffStatus} + + )} +
+ + {expanded && children && ( +
+ {children.map((child) => ( + + ))} +
+ )} +
+ ); +}; + +export const ContextExplorer: React.FC = ({ + reqId, commitHash, diffTargetHash, className +}) => { + const [rootEntries, setRootEntries] = useState([]); + const [previewContent, setPreviewContent] = useState(null); + const [previewPath, setPreviewPath] = useState(null); + const [diffMap, setDiffMap] = useState>(new Map()); + + useEffect(() => { + if (!reqId || !commitHash) return; + + const fetchRoot = async () => { + try { + const res = await fetch(`/api/context/${reqId}/tree/${commitHash}?path=`); + if (res.ok) { + const data = await res.json(); + data.sort((a: DirEntry, b: DirEntry) => { + if (a.kind === b.kind) return a.name.localeCompare(b.name); + return a.kind === 'Dir' ? -1 : 1; + }); + setRootEntries(data); + } + } catch (e) { + console.error(e); + } + }; + + const fetchDiff = async () => { + if (!diffTargetHash || diffTargetHash === commitHash) { + setDiffMap(new Map()); + return; + } + try { + const res = await fetch(`/api/context/${reqId}/diff/${diffTargetHash}/${commitHash}`); + if (res.ok) { + const changes = await res.json(); + const map = new Map(); + // Rust enum serialization: { "Added": "path" } + changes.forEach((change: any) => { + if (change.Added) map.set(change.Added, 'added'); + else if (change.Modified) map.set(change.Modified, 'modified'); + else if (change.Deleted) map.set(change.Deleted, 'deleted'); + }); + setDiffMap(map); + } + } catch (e) { + console.error("Failed to fetch diff", e); + } + }; + + fetchRoot(); + fetchDiff(); + setPreviewContent(null); + setPreviewPath(null); + }, [reqId, commitHash, diffTargetHash]); + + const getDiffStatus = (path: string) => diffMap.get(path) || 'none'; + + const handlePreview = async (path: string, kind: EntryKind) => { + if (kind !== 'File') return; + + setPreviewPath(path); + setPreviewContent("Loading..."); + + try { + const res = await fetch(`/api/context/${reqId}/blob/${commitHash}/${encodeURIComponent(path)}`); + if (res.ok) { + const text = await res.text(); + // Try to format JSON + try { + if (path.endsWith('.json')) { + const obj = JSON.parse(text); + setPreviewContent(JSON.stringify(obj, null, 2)); + } else { + setPreviewContent(text); + } + } catch { + setPreviewContent(text); + } + } else { + setPreviewContent(`Error loading content: ${res.statusText}`); + } + } catch (e) { + setPreviewContent(`Error: ${e}`); + } + }; + + if (!reqId || !commitHash) { + return ( + + Select a task to view context + + ); + } + + return ( +
+ {/* Tree View */} + + + + + Context: {commitHash.substring(0, 7)} + {diffTargetHash && diffTargetHash !== commitHash && ( + Diff Mode + )} + + + + {rootEntries.map(entry => ( + + ))} + + + + {/* Preview Pane */} + + + + + {previewPath || "Preview"} + + + +
+ {previewContent ? ( +
+                {previewContent}
+              
+ ) : ( +
+ Select a file to view content +
+ )} +
+
+
+
+ ); +}; + diff --git a/frontend/src/components/workflow/WorkflowVisualizer.tsx b/frontend/src/components/workflow/WorkflowVisualizer.tsx index b97028a..445b9c2 100644 --- a/frontend/src/components/workflow/WorkflowVisualizer.tsx +++ b/frontend/src/components/workflow/WorkflowVisualizer.tsx @@ -188,11 +188,7 @@ export function WorkflowVisualizer() { const { getLayoutedElements } = useGridLayout(); // Use custom layout const onNodeClick: NodeMouseHandler = useCallback((_event, node) => { - if (node.data.type === schemas.TaskType.enum.DataFetch) { - setActiveTab('data'); - } else { - setActiveTab(node.id); - } + setActiveTab(node.id); }, [setActiveTab]); // Transform DAG to ReactFlow nodes/edges when DAG or Task Status changes @@ -230,15 +226,6 @@ export function WorkflowVisualizer() { // In overview, show all edges but subtly isHighlighted = false; isDimmed = false; - } else if (activeTab === 'data') { - // If 'data' tab is active, highlight edges connected to DataFetch nodes - const fromNode = dag.nodes.find(n => n.id === edge.from); - const toNode = dag.nodes.find(n => n.id === edge.to); - if (fromNode?.type === schemas.TaskType.enum.DataFetch || toNode?.type === schemas.TaskType.enum.DataFetch) { - isHighlighted = true; - } else { - isDimmed = true; - } } else { // Specific node selected if (edge.from === activeTab || edge.to === activeTab) { diff --git a/frontend/src/pages/ReportPage.tsx b/frontend/src/pages/ReportPage.tsx index e9ab873..a5638b9 100644 --- a/frontend/src/pages/ReportPage.tsx +++ b/frontend/src/pages/ReportPage.tsx @@ -4,21 +4,23 @@ import { Badge } from '@/components/ui/badge'; import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs" import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card" import { WorkflowVisualizer } from '@/components/workflow/WorkflowVisualizer'; +import { ContextExplorer } from '@/components/workflow/ContextExplorer'; import { useWorkflowStore } from '@/stores/useWorkflowStore'; import { TaskStatus, schemas } from '@/api/schema.gen'; -import { Loader2, CheckCircle2, AlertCircle, Clock, PanelLeftClose, PanelLeftOpen } from 'lucide-react'; +import { Loader2, CheckCircle2, AlertCircle, Clock, PanelLeftClose, PanelLeftOpen, FileText, GitBranch, ArrowRight } from 'lucide-react'; import { Button } from '@/components/ui/button'; import ReactMarkdown from 'react-markdown'; import remarkGfm from 'remark-gfm'; -import { FinancialTable } from '@/components/report/FinancialTable'; import { useAnalysisTemplates } from "@/hooks/useConfig" import { RealtimeLogs } from '@/components/RealtimeLogs'; import { WorkflowStatus, ConnectionStatus, TaskState } from '@/types/workflow'; import { Progress } from "@/components/ui/progress" import { cn, formatNodeName } from '@/lib/utils'; +import { Dialog, DialogContent, DialogHeader, DialogTitle, DialogTrigger } from "@/components/ui/dialog"; export function ReportPage() { const { id } = useParams(); + // ... (rest of the imports) const [searchParams] = useSearchParams(); const symbol = searchParams.get('symbol'); const market = searchParams.get('market'); @@ -83,7 +85,8 @@ export function ReportPage() { state.logs.map(log => ({ taskId, log })) ), [tasks]); - const tabNodes = dag?.nodes.filter(n => n.type === schemas.TaskType.enum.Analysis) || []; + // Include ALL nodes in tabs to allow debugging context for DataFetch tasks + const tabNodes = dag?.nodes || []; return (
@@ -175,26 +178,6 @@ export function ReportPage() { > Overview - - Fundamental Data - {tabNodes.map(node => ( - - - - {tabNodes.map(node => ( - + ))}
@@ -259,6 +238,7 @@ function OverviewTabContent({ status, tasks, totalTasks, completedTasks }: { totalTasks: number, completedTasks: number }) { + // ... (implementation remains same) const progress = totalTasks > 0 ? (completedTasks / totalTasks) * 100 : 0; // Find errors @@ -346,44 +326,100 @@ function OverviewTabContent({ status, tasks, totalTasks, completedTasks }: { ) } -function TaskDetailView({ task }: { task?: TaskState }) { - // Auto-scroll removed for global scrolling layout - // const contentScrollRef = useAutoScroll(task?.content?.length || 0); +function TaskDetailView({ task, requestId }: { task?: TaskState, requestId?: string }) { + // Only show context tab if we have commits + const hasContext = task?.inputCommit || task?.outputCommit; if (task?.status === schemas.TaskStatus.enum.Failed && !task.content) { return ( -
- -

Analysis Failed

-
-

The task encountered an error and could not complete.

-

- {task.message || "Unknown error occurred."} -

-
+
+ + {hasContext && ( +
+ + + Report Content + + + Context Inspector + + +
+ )} + +
+ +

Analysis Failed

+
+

The task encountered an error and could not complete.

+

+ {task.message || "Unknown error occurred."} +

+
+
+
+ + {requestId && (task.inputCommit || task.outputCommit) && ( + + )} + +
); } return ( -
-
-
- {task?.content ? ( - - {task.content || ''} - - ) : ( -
- {task?.status === schemas.TaskStatus.enum.Pending &&

Waiting to start...

} - {task?.status === schemas.TaskStatus.enum.Running && !task?.content && } +
+ + {hasContext && ( +
+ + + Report Content + + + Context Inspector + + +
+ )} + + +
+
+ {task?.content ? ( + + {task.content || ''} + + ) : ( +
+ {task?.status === schemas.TaskStatus.enum.Pending &&

Waiting to start...

} + {task?.status === schemas.TaskStatus.enum.Running && !task?.content && } +
+ )} + {task?.status === schemas.TaskStatus.enum.Running && ( + + )}
+
+
+ + + {requestId && (task?.inputCommit || task?.outputCommit) && ( + )} - {task?.status === schemas.TaskStatus.enum.Running && ( - - )} -
-
+ +
); } diff --git a/frontend/src/stores/useWorkflowStore.ts b/frontend/src/stores/useWorkflowStore.ts index 078f94d..ff6acd1 100644 --- a/frontend/src/stores/useWorkflowStore.ts +++ b/frontend/src/stores/useWorkflowStore.ts @@ -13,7 +13,7 @@ interface WorkflowStoreState { // Actions initialize: (requestId: string) => void; setDag: (dag: WorkflowDag) => void; - updateTaskStatus: (taskId: string, status: TaskStatus, message?: string, progress?: number) => void; + updateTaskStatus: (taskId: string, status: TaskStatus, message?: string, progress?: number, inputCommit?: string, outputCommit?: string) => void; updateTaskContent: (taskId: string, delta: string) => void; // Stream content (append) setTaskContent: (taskId: string, content: string) => void; // Set full content appendTaskLog: (taskId: string, log: string) => void; @@ -54,7 +54,7 @@ export const useWorkflowStore = create((set, get) => ({ set({ dag, tasks: initialTasks, status: schemas.TaskStatus.enum.Running }); }, - updateTaskStatus: (taskId, status, message, progress) => { + updateTaskStatus: (taskId, status, message, progress, inputCommit, outputCommit) => { set(state => { let task = state.tasks[taskId]; @@ -81,7 +81,9 @@ export const useWorkflowStore = create((set, get) => ({ status, message: message || task.message, progress: progress !== undefined ? progress : task.progress, - logs: newLogs + logs: newLogs, + inputCommit: inputCommit || task.inputCommit, + outputCommit: outputCommit || task.outputCommit } } }; @@ -162,11 +164,14 @@ export const useWorkflowStore = create((set, get) => ({ case 'TaskStateChanged': { // Explicit typing to help TS const p = event.payload; + // @ts-ignore - input_commit/output_commit added state.updateTaskStatus( p.task_id, p.status, p.message || undefined, - p.progress || undefined + p.progress || undefined, + p.input_commit, + p.output_commit ); break; } @@ -209,9 +214,10 @@ export const useWorkflowStore = create((set, get) => ({ } if (event.payload.tasks_output) { - Object.entries(event.payload.tasks_output).forEach(([taskId, content]) => { - if (newTasks[taskId] && content) { - newTasks[taskId] = { ...newTasks[taskId], content: content || undefined }; + Object.entries(event.payload.tasks_output).forEach(([taskId, outputCommit]) => { + if (newTasks[taskId] && outputCommit) { + // Correctly mapping outputCommit, not content + newTasks[taskId] = { ...newTasks[taskId], outputCommit: outputCommit }; } }); } diff --git a/frontend/src/types/workflow.ts b/frontend/src/types/workflow.ts index 480faf7..dcefe0c 100644 --- a/frontend/src/types/workflow.ts +++ b/frontend/src/types/workflow.ts @@ -41,4 +41,7 @@ export interface TaskState { logs: string[]; // Full log history content?: string; // Streaming content (Markdown) result?: unknown; // Structured result + // Context Inspector + inputCommit?: string; + outputCommit?: string; } diff --git a/services/api-gateway/src/api.rs b/services/api-gateway/src/api.rs index 349f8a9..0f51b67 100644 --- a/services/api-gateway/src/api.rs +++ b/services/api-gateway/src/api.rs @@ -78,6 +78,10 @@ pub fn create_router(app_state: AppState) -> Router { let mut router = Router::new() .route("/health", get(health_check)) .route("/tasks/{request_id}", get(get_task_progress)) + // Context Inspector Proxies + .route("/api/context/{req_id}/tree/{commit_hash}", get(proxy_context_tree)) + .route("/api/context/{req_id}/blob/{commit_hash}/{*path}", get(proxy_context_blob)) + .route("/api/context/{req_id}/diff/{from_commit}/{to_commit}", get(proxy_context_diff)) .nest("/api/v1", create_v1_router()) .with_state(app_state); @@ -1048,3 +1052,82 @@ async fn get_workflow_graph_proxy( axum::body::Body::from(body), )) } + +// --- Context Inspector Proxies --- + +async fn proxy_context_tree( + State(state): State, + Path((req_id, commit_hash)): Path<(String, String)>, + Query(params): Query>, +) -> Result { + let url = format!( + "{}/context/{}/tree/{}", + state.config.workflow_orchestrator_service_url.trim_end_matches('/'), + req_id, + commit_hash + ); + + let client = reqwest::Client::new(); + let resp = client.get(&url).query(¶ms).send().await?; + + let status = resp.status(); + let body = resp.bytes().await?; + + Ok(( + StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), + axum::body::Body::from(body), + )) +} + +async fn proxy_context_blob( + State(state): State, + Path((req_id, commit_hash, path)): Path<(String, String, String)>, +) -> Result { + let url = format!( + "{}/context/{}/blob/{}/{}", + state.config.workflow_orchestrator_service_url.trim_end_matches('/'), + req_id, + commit_hash, + path + ); + + let client = reqwest::Client::new(); + let resp = client.get(&url).send().await?; + + let status = resp.status(); + let headers = resp.headers().clone(); + let body = resp.bytes().await?; + + let mut response_builder = axum::http::Response::builder() + .status(StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)); + + if let Some(ct) = headers.get(axum::http::header::CONTENT_TYPE) { + response_builder = response_builder.header(axum::http::header::CONTENT_TYPE, ct); + } + + Ok(response_builder.body(axum::body::Body::from(body)).unwrap()) +} + +async fn proxy_context_diff( + State(state): State, + Path((req_id, from_commit, to_commit)): Path<(String, String, String)>, +) -> Result { + let url = format!( + "{}/context/{}/diff/{}/{}", + state.config.workflow_orchestrator_service_url.trim_end_matches('/'), + req_id, + from_commit, + to_commit + ); + + let client = reqwest::Client::new(); + let resp = client.get(&url).send().await?; + + let status = resp.status(); + let body = resp.bytes().await?; + + Ok(( + StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), + axum::body::Body::from(body), + )) +} diff --git a/services/common-contracts/src/messages.rs b/services/common-contracts/src/messages.rs index 6fd5938..e48835e 100644 --- a/services/common-contracts/src/messages.rs +++ b/services/common-contracts/src/messages.rs @@ -98,6 +98,9 @@ pub enum WorkflowEvent { message: Option, timestamp: i64, progress: Option, // 0-100 + // New fields for Context Inspector + input_commit: Option, + output_commit: Option, }, // 3. 任务流式输出 (用于 LLM 打字机效果) diff --git a/services/report-generator-service/src/worker.rs b/services/report-generator-service/src/worker.rs index f19c222..1a7b3e2 100644 --- a/services/report-generator-service/src/worker.rs +++ b/services/report-generator-service/src/worker.rs @@ -97,21 +97,6 @@ pub async fn run_report_generation_workflow( info!(module_id = %module_id, "All dependencies met. Generating report for module."); - // Publish TaskLog - if let Some(task_id) = &command.task_id { - let log_evt = WorkflowEvent::TaskLog { - task_id: task_id.clone(), - level: "INFO".to_string(), - message: format!("Starting module: {}", module_id), - timestamp: chrono::Utc::now().timestamp_millis(), - }; - if let Ok(payload) = serde_json::to_vec(&log_evt) { - let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(command.request_id).to_string(); - let nats = state.nats.clone(); - tokio::spawn(async move { let _ = nats.publish(subject, payload.into()).await; }); - } - } - // Broadcast Module Start let _ = stream_tx.send(serde_json::json!({ "type": "module_start", @@ -163,20 +148,6 @@ pub async fn run_report_generation_workflow( let formatted_financials = format_financials_to_markdown(&financials); context.insert("financial_data", &formatted_financials); - if let Some(task_id) = &command.task_id { - let log_evt = WorkflowEvent::TaskLog { - task_id: task_id.clone(), - level: "INFO".to_string(), - message: format!("Rendering prompt template for module: {}", module_id), - timestamp: chrono::Utc::now().timestamp_millis(), - }; - if let Ok(payload) = serde_json::to_vec(&log_evt) { - let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(command.request_id).to_string(); - let nats = state.nats.clone(); - tokio::spawn(async move { let _ = nats.publish(subject, payload.into()).await; }); - } - } - info!(module_id = %module_id, "Rendering prompt template..."); let prompt = match Tera::one_off(&module_config.prompt_template, &context, true) { Ok(p) => { @@ -229,19 +200,6 @@ pub async fn run_report_generation_workflow( // Streaming Generation info!(module_id = %module_id, "Initiating LLM stream..."); - if let Some(task_id) = &command.task_id { - let log_evt = WorkflowEvent::TaskLog { - task_id: task_id.clone(), - level: "INFO".to_string(), - message: format!("Initiating LLM stream with model: {}", module_config.model_id), - timestamp: chrono::Utc::now().timestamp_millis(), - }; - if let Ok(payload) = serde_json::to_vec(&log_evt) { - let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(command.request_id).to_string(); - let nats = state.nats.clone(); - tokio::spawn(async move { let _ = nats.publish(subject, payload.into()).await; }); - } - } let mut stream = match llm_client.stream_text(prompt).await { Ok(s) => s, Err(e) => { @@ -323,20 +281,6 @@ pub async fn run_report_generation_workflow( info!(module_id = %module_id, "Successfully generated content (Length: {}).", full_content.len()); - if let Some(task_id) = &command.task_id { - let log_evt = WorkflowEvent::TaskLog { - task_id: task_id.clone(), - level: "INFO".to_string(), - message: format!("Module completed: {}. Content length: {}", module_id, full_content.len()), - timestamp: chrono::Utc::now().timestamp_millis(), - }; - if let Ok(payload) = serde_json::to_vec(&log_evt) { - let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(command.request_id).to_string(); - let nats = state.nats.clone(); - tokio::spawn(async move { let _ = nats.publish(subject, payload.into()).await; }); - } - } - // Broadcast Module Done let _ = stream_tx.send(serde_json::json!({ "type": "module_done", diff --git a/services/workflow-orchestrator-service/src/api.rs b/services/workflow-orchestrator-service/src/api.rs index 7bd4bd7..edaf0d5 100644 --- a/services/workflow-orchestrator-service/src/api.rs +++ b/services/workflow-orchestrator-service/src/api.rs @@ -2,17 +2,25 @@ use axum::{ routing::get, Router, Json, - extract::{State, Path}, + extract::{State, Path, Query}, + response::{IntoResponse, Response}, + http::StatusCode, }; use std::sync::Arc; use crate::state::AppState; +use serde::Deserialize; use serde_json::json; use uuid::Uuid; +use workflow_context::traits::ContextStore; pub fn create_router(state: Arc) -> Router { Router::new() .route("/health", get(health_check)) .route("/workflows/{id}/graph", get(get_workflow_graph)) + // Context Inspector APIs + .route("/context/{req_id}/tree/{commit_hash}", get(get_context_tree)) + .route("/context/{req_id}/blob/{commit_hash}/{*path}", get(get_context_blob)) + .route("/context/{req_id}/diff/{from_commit}/{to_commit}", get(get_context_diff)) .with_state(state) } @@ -40,3 +48,54 @@ async fn get_workflow_graph( } } +#[derive(Deserialize)] +struct TreeQuery { + path: Option, +} + +async fn get_context_tree( + State(state): State>, + Path((req_id, commit_hash)): Path<(String, String)>, + Query(query): Query, +) -> Json { + let path = query.path.unwrap_or_default(); + match state.vgcs.list_dir(&req_id, &commit_hash, &path) { + Ok(entries) => Json(json!(entries)), + Err(e) => Json(json!({ "error": e.to_string() })), + } +} + +async fn get_context_blob( + State(state): State>, + Path((req_id, commit_hash, path)): Path<(String, String, String)>, +) -> Response { + match state.vgcs.read_file(&req_id, &commit_hash, &path) { + Ok(mut reader) => { + let mut buffer = Vec::new(); + if let Err(e) = std::io::Read::read_to_end(&mut reader, &mut buffer) { + return (StatusCode::INTERNAL_SERVER_ERROR, format!("Read error: {}", e)).into_response(); + } + // Detect mime type or just return plain text/json + // For now we assume text/json usually + let content_type = if path.ends_with(".json") { + "application/json" + } else { + "text/plain" + }; + + ([(axum::http::header::CONTENT_TYPE, content_type)], buffer).into_response() + }, + Err(e) => (StatusCode::NOT_FOUND, format!("Error: {}", e)).into_response(), + } +} + +async fn get_context_diff( + State(state): State>, + Path((req_id, from_commit, to_commit)): Path<(String, String, String)>, +) -> Json { + match state.vgcs.diff(&req_id, &from_commit, &to_commit) { + Ok(changes) => Json(json!(changes)), + Err(e) => Json(json!({ "error": e.to_string() })), + } +} + diff --git a/services/workflow-orchestrator-service/src/dag_scheduler.rs b/services/workflow-orchestrator-service/src/dag_scheduler.rs index c41be5a..3048baa 100644 --- a/services/workflow-orchestrator-service/src/dag_scheduler.rs +++ b/services/workflow-orchestrator-service/src/dag_scheduler.rs @@ -84,6 +84,8 @@ pub struct DagNode { pub status: TaskStatus, pub config: serde_json::Value, pub routing_key: String, + /// The commit hash used as input for this task + pub input_commit: Option, } impl DagScheduler { @@ -105,9 +107,16 @@ impl DagScheduler { status: TaskStatus::Pending, config, routing_key, + input_commit: None, }); } + pub fn set_input_commit(&mut self, task_id: &str, commit: String) { + if let Some(node) = self.nodes.get_mut(task_id) { + node.input_commit = Some(commit); + } + } + pub fn add_dependency(&mut self, from: &str, to: &str) { self.forward_deps.entry(from.to_string()).or_default().push(to.to_string()); self.reverse_deps.entry(to.to_string()).or_default().push(from.to_string()); diff --git a/services/workflow-orchestrator-service/src/workflow.rs b/services/workflow-orchestrator-service/src/workflow.rs index 84ff4a1..d10ddfe 100644 --- a/services/workflow-orchestrator-service/src/workflow.rs +++ b/services/workflow-orchestrator-service/src/workflow.rs @@ -14,7 +14,7 @@ use tokio::sync::Mutex; use crate::dag_scheduler::DagScheduler; use crate::state::AppState; -use workflow_context::{Vgcs, ContextStore}; // Added ContextStore +use workflow_context::{Vgcs, ContextStore, traits::Transaction}; // Added Transaction pub struct WorkflowEngine { state: Arc, @@ -30,21 +30,38 @@ impl WorkflowEngine { let req_id = cmd.request_id; info!("Starting workflow {}", req_id); + self.publish_log(req_id, "workflow", "INFO", "Workflow started. Initializing...").await; + // 1. Init VGCS Repo self.state.vgcs.init_repo(&req_id.to_string())?; - // 2. Create Scheduler - // Initial commit is empty for a fresh workflow - let mut dag = DagScheduler::new(req_id, String::new()); + // 2. Create Initial Commit (Context Baseline) + let mut tx = self.state.vgcs.begin_transaction(&req_id.to_string(), "")?; + let request_info = json!({ + "request_id": req_id, + "symbol": cmd.symbol.as_str(), + "market": cmd.market, + "template_id": cmd.template_id, + "timestamp": chrono::Utc::now().to_rfc3339() + }); + tx.write("request.json", serde_json::to_vec_pretty(&request_info)?.as_slice())?; + let initial_commit = Box::new(tx).commit("Initial Workflow Setup", "System")?; - // 3. Fetch Template Config + // 3. Create Scheduler with Initial Commit + let mut dag = DagScheduler::new(req_id, initial_commit); + + // 4. Fetch Template Config + self.publish_log(req_id, "workflow", "INFO", "Fetching template configuration...").await; let template_sets = self.state.persistence_client.get_analysis_template_sets().await?; let template = template_sets.get(&cmd.template_id).ok_or_else(|| { anyhow::anyhow!("Template {} not found", cmd.template_id) })?; + // 3.1 Fetch Data Sources Config + let data_sources = self.state.persistence_client.get_data_sources_config().await?; + // 4. Build DAG - self.build_dag(&mut dag, template, &cmd.template_id, &cmd.market, &cmd.symbol); + self.build_dag(&mut dag, template, &data_sources, &cmd.template_id, &cmd.market, &cmd.symbol); // 5. Save State self.state.workflows.insert(req_id, Arc::new(Mutex::new(dag.clone()))); @@ -61,6 +78,8 @@ impl WorkflowEngine { } } + self.publish_log(req_id, "workflow", "INFO", "DAG built and workflow initialized.").await; + // 6. Trigger Initial Tasks let initial_tasks = dag.get_initial_tasks(); @@ -109,7 +128,7 @@ impl WorkflowEngine { timestamp: chrono::Utc::now().timestamp_millis(), task_graph: dag.to_dto(), tasks_status, - tasks_output: std::collections::HashMap::new(), // TODO: Populate output if needed + tasks_output: dag.commit_tracker.task_commits.clone().into_iter().map(|(k, v)| (k, Some(v))).collect(), }; let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(req_id).to_string(); @@ -136,6 +155,8 @@ impl WorkflowEngine { // 1. Update Status & Record Commit dag.update_status(&evt.task_id, evt.status); + + self.publish_log(req_id, &evt.task_id, "INFO", &format!("Task status changed to {:?}", evt.status)).await; // Lookup task_type let task_type = dag.nodes.get(&evt.task_id).map(|n| n.task_type).unwrap_or(TaskType::DataFetch); @@ -158,6 +179,10 @@ impl WorkflowEngine { None }; + // Resolve commits + let input_commit = dag.nodes.get(&evt.task_id).and_then(|n| n.input_commit.clone()); + let output_commit = evt.result.as_ref().and_then(|r| r.new_commit.clone()); + // Publish TaskStateChanged event let progress_event = WorkflowEvent::TaskStateChanged { task_id: evt.task_id.clone(), @@ -166,6 +191,8 @@ impl WorkflowEngine { message: error_message, timestamp: chrono::Utc::now().timestamp_millis(), progress: None, + input_commit: input_commit, + output_commit: output_commit, }; let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(req_id).to_string(); if let Ok(payload) = serde_json::to_vec(&progress_event) { @@ -190,6 +217,7 @@ impl WorkflowEngine { for task_id in ready_tasks { if let Err(e) = self.dispatch_task(&mut dag, &task_id, &self.state.vgcs).await { error!("Failed to dispatch task {}: {}", task_id, e); + self.publish_log(req_id, &task_id, "ERROR", &format!("Failed to dispatch task: {}", e)).await; } } } @@ -199,6 +227,7 @@ impl WorkflowEngine { let timestamp = chrono::Utc::now().timestamp_millis(); let event = if dag.has_failures() { info!("Workflow {} failed (some tasks failed)", req_id); + self.publish_log(req_id, "workflow", "ERROR", "Workflow finished with failures.").await; WorkflowEvent::WorkflowFailed { end_timestamp: timestamp, reason: "Some tasks failed".to_string(), @@ -206,6 +235,7 @@ impl WorkflowEngine { } } else { info!("Workflow {} completed successfully", req_id); + self.publish_log(req_id, "workflow", "INFO", "Workflow completed successfully.").await; WorkflowEvent::WorkflowCompleted { end_timestamp: timestamp, result_summary: Some(json!({})), @@ -227,8 +257,14 @@ impl WorkflowEngine { // 1. Resolve Context (Merge if needed) let context = dag.resolve_context(task_id, vgcs)?; + // Store the input commit in the node for observability + if let Some(base_commit) = &context.base_commit { + dag.set_input_commit(task_id, base_commit.clone()); + } + // 2. Update Status dag.update_status(task_id, TaskStatus::Scheduled); + self.publish_log(dag.request_id, task_id, "INFO", "Task scheduled and dispatched.").await; // 3. Construct Command let node = dag.nodes.get(task_id).ok_or_else(|| anyhow::anyhow!("Node not found"))?; @@ -264,33 +300,65 @@ impl WorkflowEngine { } // Helper to build DAG - fn build_dag(&self, dag: &mut DagScheduler, template: &common_contracts::config_models::AnalysisTemplateSet, template_id: &str, market: &str, symbol: &CanonicalSymbol) { - let mut providers = Vec::new(); - match market { - "CN" => { - providers.push("tushare"); - }, - "US" => providers.push("yfinance"), - "MOCK" => providers.push("mock"), - _ => providers.push("yfinance"), - } - + fn build_dag( + &self, + dag: &mut DagScheduler, + template: &common_contracts::config_models::AnalysisTemplateSet, + data_sources: &common_contracts::config_models::DataSourcesConfig, + template_id: &str, + market: &str, + symbol: &CanonicalSymbol + ) { // 1. Data Fetch Nodes let mut fetch_tasks = Vec::new(); - for p in &providers { - let task_id = format!("fetch:{}", p); - fetch_tasks.push(task_id.clone()); - let display_name = format!("Data Fetch ({})", p); - dag.add_node( - task_id.clone(), - Some(display_name), - TaskType::DataFetch, - format!("provider.{}", p), + + // Use all enabled data sources regardless of market + // The provider itself will decide whether to skip or process based on market support. + // We sort keys to ensure deterministic DAG generation. + let mut source_keys: Vec<&String> = data_sources.keys().collect(); + source_keys.sort(); + + for key in source_keys { + let config = &data_sources[key]; + if config.enabled { + let provider_key = key.to_lowercase(); + let task_id = format!("fetch:{}", provider_key); + fetch_tasks.push(task_id.clone()); + + let display_name = format!("Data Fetch ({:?})", config.provider); + let routing_key = format!("provider.{}", provider_key); + + dag.add_node( + task_id.clone(), + Some(display_name), + TaskType::DataFetch, + routing_key, + json!({ + "symbol": symbol.as_str(), + "market": market + }) + ); + } + } + + // Fallback for MOCK if not in config (usually mock is not in data_sources.json but hardcoded for tests) + if market == "MOCK" && fetch_tasks.is_empty() { + let task_id = "fetch:mock".to_string(); + fetch_tasks.push(task_id.clone()); + dag.add_node( + task_id, + Some("Data Fetch (Mock)".to_string()), + TaskType::DataFetch, + "provider.mock".to_string(), json!({ "symbol": symbol.as_str(), "market": market }) - ); + ); + } + + if fetch_tasks.is_empty() { + warn!("No enabled data providers found in configuration."); } // 2. Analysis Nodes (Dynamic from Template) @@ -329,4 +397,23 @@ impl WorkflowEngine { } } } + + async fn publish_log(&self, req_id: uuid::Uuid, task_id: &str, level: &str, message: &str) { + let event = WorkflowEvent::TaskLog { + task_id: task_id.to_string(), + level: level.to_string(), + message: message.to_string(), + timestamp: chrono::Utc::now().timestamp_millis(), + }; + let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(req_id).to_string(); + if let Ok(payload) = serde_json::to_vec(&event) { + // Fire and forget + let nats = self.nats.clone(); + tokio::spawn(async move { + if let Err(e) = nats.publish(subject, payload.into()).await { + error!("Failed to publish TaskLog: {}", e); + } + }); + } + } }