feat: Implement Context Inspector and fix workflow merge base

- **Observability**: Added `ContextExplorer` component for viewing task input/output file trees and diffs.
- **Workflow Engine**:
  - Implemented Initial Commit logic in `StartWorkflow` to ensure all tasks share a common merge base.
  - Added `input_commit` tracking to `DagScheduler` and events.
  - Exposed `vgcs` (Git) operations via Orchestrator API (tree, blob, diff).
- **API Gateway**: Added proxy routes for Context Inspector (`/api/context/...`).
- **UI**:
  - Refactored `ReportPage` to remove legacy "Fundamental Data" tab.
  - Added "Context Inspector" button in Task Detail view with Dialog support.
  - Visualized Added/Modified/Deleted files with color coding in Context Explorer.
- **Cleanup**: Removed unused `FinancialTable` component.
This commit is contained in:
Lv, Qi 2025-11-28 02:53:25 +08:00
parent b90388b76e
commit 91a6dfc4c1
14 changed files with 673 additions and 371 deletions

View File

@ -1,19 +1,19 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum EntryKind {
File,
Dir,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DirEntry {
pub name: String,
pub kind: EntryKind,
pub object_id: String,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FileChange {
Added(String),
Modified(String),

View File

@ -1,89 +0,0 @@
# 重构任务:统一 Data Provider 工作流抽象
## 1. 背景 (Background)
目前的 Data Provider 服务Tushare, YFinance, AlphaVantage 等)在架构上存在严重的重复代码和实现不一致问题。每个服务都独立实现了完整的工作流,包括:
- NATS 消息接收与反序列化
- 缓存检查与写入 (Cache-Aside Pattern)
- 任务状态管理 (Observability/Task Progress)
- Session Data 持久化
- NATS 事件发布 (Success/Failure events)
这种"散弹式"架构导致了以下问题:
1. **Bug 易发且难以统一修复**:例如 Tushare 服务因未执行 NATS Flush 导致事件丢失,而 YFinance 却因为实现方式不同而没有此问题。修复一个 Bug 需要在每个服务中重复操作。
2. **逻辑不一致**:不同 Provider 对缓存策略、错误处理、重试机制的实现可能存在细微差异,违背了系统的统一性。
3. **维护成本高**:新增一个 Provider 需要复制粘贴大量基础设施代码Boilerplate容易出错。
## 2. 目标 (Objectives)
贯彻 "Rustic" 的设计理念强类型、单一来源、早失败通过控制反转IoC和模板方法模式将**业务逻辑**与**基础设施逻辑**彻底分离。
- **单一来源 (Single Source of Truth)**:工作流的核心逻辑(缓存、持久化、通知)只在一个地方定义和维护。
- **降低耦合**:具体 Provider 只需关注 "如何从 API 获取数据",而无需关心 "如何与系统交互"。
- **提升稳定性**:统一修复基础设施层面的问题(如 NATS Flush所有 Provider 自动受益。
## 3. 技术方案 (Technical Design)
### 3.1 核心抽象 (The Trait)
`common-contracts` 中定义纯粹的业务逻辑接口:
```rust
#[async_trait]
pub trait DataProviderLogic: Send + Sync {
/// Provider 的唯一标识符 (e.g., "tushare", "yfinance")
fn provider_id(&self) -> &str;
/// 检查是否支持该市场 (前置检查)
fn supports_market(&self, market: &str) -> bool {
true
}
/// 核心业务:从外部源获取原始数据并转换为标准 DTO
/// 不涉及任何 DB 或 NATS 操作
async fn fetch_data(&self, symbol: &str) -> Result<(CompanyProfileDto, Vec<TimeSeriesFinancialDto>), anyhow::Error>;
}
```
### 3.2 通用工作流引擎 (The Engine)
实现一个泛型结构体或函数 `StandardFetchWorkflow`,封装所有基础设施逻辑:
1. **接收指令**:解析 `FetchCompanyDataCommand`
2. **前置检查**:调用 `supports_market`
3. **状态更新**:向 `AppState` 写入 "InProgress"。
4. **缓存层**
* 检查 `persistence_client` 缓存。
* HIT -> 直接返回。
* MISS -> 调用 `fetch_data`,然后写入缓存。
5. **持久化层**:将结果写入 `SessionData`
6. **事件通知**
* 构建 `FinancialsPersistedEvent`
* 发布 NATS 消息。
* **关键:执行 `flush().await`。**
7. **错误处理**:统一捕获错误,发布 `DataFetchFailedEvent`,更新 Task 状态为 Failed。
## 4. 执行步骤 (Execution Plan)
1. **基础设施准备**
* 在 `services/common-contracts` 中添加 `DataProviderLogic` trait。
* 在 `services/common-contracts` (或新建 `service-kit` 模块) 中实现 `StandardFetchWorkflow`
2. **重构 Tushare Service**
* 创建 `TushareFetcher` 实现 `DataProviderLogic`
* 删除 `worker.rs` 中的冗余代码,替换为对 `StandardFetchWorkflow` 的调用。
* 验证 NATS Flush 问题是否自然解决。
3. **重构 YFinance Service**
* 同样方式重构,验证通用性。
4. **验证**
* 运行 E2E 测试,确保数据获取流程依然通畅。
## 5. 验收标准 (Acceptance Criteria)
- `common-contracts` 中包含清晰的 Trait 定义。
- Tushare 和 YFinance 的 `worker.rs` 代码量显著减少(预计减少 60%+)。
- 所有 Provider 的行为(日志格式、状态更新频率、缓存行为)完全一致。
- 即使不手动写 `flush`,重构后的 Provider 也能可靠发送 NATS 消息。

View File

@ -1,117 +0,0 @@
import { Card, CardHeader, CardTitle, CardContent } from "@/components/ui/card"
import { Table, TableBody, TableCaption, TableCell, TableHead, TableHeader, TableRow } from "@/components/ui/table"
import { ScrollArea } from "@/components/ui/scroll-area"
import { Badge } from "@/components/ui/badge"
import { CheckCircle2, XCircle, Loader2 } from "lucide-react"
import { useWorkflowStore } from "@/stores/useWorkflowStore"
import { TaskStatus } from "@/types/workflow"
import { schemas } from "@/api/schema.gen"
export function FinancialTable() {
const { tasks, dag } = useWorkflowStore();
// Identify DataFetch tasks dynamically from DAG
const fetchTasks = dag?.nodes.filter(n => n.type === schemas.TaskType.enum.DataFetch) || [];
return (
<div className="space-y-6">
{/* 1. Data Provider Status Cards */}
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4">
{fetchTasks.map(node => {
const taskState = tasks[node.id];
const status = taskState?.status || schemas.TaskStatus.enum.Pending;
return (
<Card key={node.id} className="flex flex-col">
<CardHeader className="pb-2">
<div className="flex justify-between items-start">
<CardTitle className="text-sm font-medium">{node.name}</CardTitle>
<StatusBadge status={status} />
</div>
</CardHeader>
<CardContent className="flex-1 flex flex-col text-xs">
<div className="text-muted-foreground mb-2 flex-1">
{taskState?.message || "Waiting to start..."}
</div>
{/* Mock Raw Data Preview if Completed */}
{status === schemas.TaskStatus.enum.Completed && (
<div className="mt-auto pt-2 border-t">
<div className="text-[10px] uppercase font-mono text-muted-foreground mb-1">Raw Response Preview</div>
<ScrollArea className="h-[80px] w-full rounded border bg-muted/50 p-2 font-mono text-[10px]">
{`{
"symbol": "AAPL",
"currency": "USD",
"data": [
{ "period": "2023", "rev": 383285000000 },
{ "period": "2022", "rev": 394328000000 }
]
}`}
</ScrollArea>
</div>
)}
</CardContent>
</Card>
);
})}
</div>
{/* 2. Consolidated Financial Table (Mock) */}
<Card>
<CardHeader>
<CardTitle>Consolidated Income Statement</CardTitle>
</CardHeader>
<CardContent>
<Table>
<TableCaption>Aggregated from all successful providers.</TableCaption>
<TableHeader>
<TableRow>
<TableHead className="w-[200px]">Metric</TableHead>
<TableHead>TTM</TableHead>
<TableHead>2023</TableHead>
<TableHead>2022</TableHead>
<TableHead>2021</TableHead>
</TableRow>
</TableHeader>
<TableBody>
<TableRow>
<TableCell className="font-medium">Revenue</TableCell>
<TableCell>10.5B</TableCell>
<TableCell>10.0B</TableCell>
<TableCell>9.2B</TableCell>
<TableCell>8.5B</TableCell>
</TableRow>
<TableRow>
<TableCell className="font-medium">Gross Profit</TableCell>
<TableCell>4.2B</TableCell>
<TableCell>4.0B</TableCell>
<TableCell>3.6B</TableCell>
<TableCell>3.2B</TableCell>
</TableRow>
<TableRow>
<TableCell className="font-medium">Net Income</TableCell>
<TableCell>2.1B</TableCell>
<TableCell>2.0B</TableCell>
<TableCell>1.8B</TableCell>
<TableCell>1.5B</TableCell>
</TableRow>
</TableBody>
</Table>
</CardContent>
</Card>
</div>
)
}
function StatusBadge({ status }: { status: TaskStatus }) {
switch (status) {
case schemas.TaskStatus.enum.Running:
return <Badge variant="outline" className="border-blue-500 text-blue-500"><Loader2 className="h-3 w-3 mr-1 animate-spin" /> Fetching</Badge>;
case schemas.TaskStatus.enum.Completed:
return <Badge variant="outline" className="border-green-500 text-green-500"><CheckCircle2 className="h-3 w-3 mr-1" /> Success</Badge>;
case schemas.TaskStatus.enum.Failed:
return <Badge variant="destructive"><XCircle className="h-3 w-3 mr-1" /> Failed</Badge>;
default:
return <Badge variant="secondary" className="text-muted-foreground">Pending</Badge>;
}
}

View File

@ -0,0 +1,291 @@
import React, { useState, useEffect } from 'react';
import { ScrollArea } from "@/components/ui/scroll-area";
import { Badge } from "@/components/ui/badge";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { Button } from "@/components/ui/button";
import { ChevronRight, ChevronDown, FileText, Folder, FileJson, RefreshCw, GitBranch, GitCommit } from 'lucide-react';
import { cn } from "@/lib/utils";
// Types mirroring the Rust backend
type EntryKind = 'File' | 'Dir';
interface DirEntry {
name: string;
kind: EntryKind;
object_id: string;
}
interface ContextExplorerProps {
reqId: string;
commitHash: string;
diffTargetHash?: string; // If present, show diff against this commit
className?: string;
highlightPaths?: string[]; // Paths to auto-expand
}
type DiffStatus = 'added' | 'modified' | 'deleted' | 'none';
interface TreeNodeProps {
reqId: string;
commitHash: string;
path: string;
name: string;
kind: EntryKind;
depth: number;
getDiffStatus: (path: string) => DiffStatus;
onPreview: (path: string, kind: EntryKind) => void;
}
const TreeNode: React.FC<TreeNodeProps> = ({
reqId, commitHash, path, name, kind, depth, getDiffStatus, onPreview
}) => {
const [expanded, setExpanded] = useState(false);
const [children, setChildren] = useState<DirEntry[] | null>(null);
const [loading, setLoading] = useState(false);
const diffStatus = getDiffStatus(path);
const toggleExpand = async (e: React.MouseEvent) => {
e.stopPropagation();
if (kind === 'File') {
onPreview(path, kind);
return;
}
const nextExpanded = !expanded;
setExpanded(nextExpanded);
if (nextExpanded && !children) {
setLoading(true);
try {
// The path logic: root is empty string. Subdirs are "dir/subdir".
const queryPath = path;
const res = await fetch(`/api/context/${reqId}/tree/${commitHash}?path=${encodeURIComponent(queryPath)}`);
if (res.ok) {
const data = await res.json();
// Sort: Dirs first, then Files
data.sort((a: DirEntry, b: DirEntry) => {
if (a.kind === b.kind) return a.name.localeCompare(b.name);
return a.kind === 'Dir' ? -1 : 1;
});
setChildren(data);
}
} catch (err) {
console.error("Failed to load tree", err);
} finally {
setLoading(false);
}
}
};
// Icons
const Icon = kind === 'Dir' ? Folder : (name.endsWith('.json') ? FileJson : FileText);
const getDiffColor = () => {
switch (diffStatus) {
case 'added': return 'text-green-600 font-medium dark:text-green-400';
case 'modified': return 'text-yellow-600 font-medium dark:text-yellow-400';
case 'deleted': return 'text-red-600 line-through dark:text-red-400';
default: return 'text-foreground';
}
};
return (
<div className="select-none">
<div
className={cn(
"flex items-center py-1 px-2 hover:bg-accent/50 rounded-sm cursor-pointer transition-colors",
getDiffColor()
)}
style={{ paddingLeft: `${depth * 12 + 4}px` }}
onClick={toggleExpand}
>
<div className="w-4 h-4 mr-1 flex items-center justify-center opacity-70">
{kind === 'Dir' && (
loading ? <RefreshCw className="w-3 h-3 animate-spin" /> :
expanded ? <ChevronDown className="w-3 h-3" /> : <ChevronRight className="w-3 h-3" />
)}
</div>
<Icon className={cn("w-4 h-4 mr-2", kind === 'Dir' ? "text-blue-500" : "text-gray-500")} />
<span className="text-sm truncate">{name}</span>
{diffStatus && diffStatus !== 'none' && (
<Badge variant="outline" className={cn("ml-2 text-[10px] h-4 px-1",
diffStatus === 'added' ? "border-green-500 text-green-600" :
diffStatus === 'modified' ? "border-yellow-500 text-yellow-600" : "")}>
{diffStatus}
</Badge>
)}
</div>
{expanded && children && (
<div>
{children.map((child) => (
<TreeNode
key={child.object_id + child.name}
reqId={reqId}
commitHash={commitHash}
path={path ? `${path}/${child.name}` : child.name}
name={child.name}
kind={child.kind}
depth={depth + 1}
getDiffStatus={getDiffStatus}
onPreview={onPreview}
/>
))}
</div>
)}
</div>
);
};
export const ContextExplorer: React.FC<ContextExplorerProps> = ({
reqId, commitHash, diffTargetHash, className
}) => {
const [rootEntries, setRootEntries] = useState<DirEntry[]>([]);
const [previewContent, setPreviewContent] = useState<string | null>(null);
const [previewPath, setPreviewPath] = useState<string | null>(null);
const [diffMap, setDiffMap] = useState<Map<string, DiffStatus>>(new Map());
useEffect(() => {
if (!reqId || !commitHash) return;
const fetchRoot = async () => {
try {
const res = await fetch(`/api/context/${reqId}/tree/${commitHash}?path=`);
if (res.ok) {
const data = await res.json();
data.sort((a: DirEntry, b: DirEntry) => {
if (a.kind === b.kind) return a.name.localeCompare(b.name);
return a.kind === 'Dir' ? -1 : 1;
});
setRootEntries(data);
}
} catch (e) {
console.error(e);
}
};
const fetchDiff = async () => {
if (!diffTargetHash || diffTargetHash === commitHash) {
setDiffMap(new Map());
return;
}
try {
const res = await fetch(`/api/context/${reqId}/diff/${diffTargetHash}/${commitHash}`);
if (res.ok) {
const changes = await res.json();
const map = new Map<string, DiffStatus>();
// Rust enum serialization: { "Added": "path" }
changes.forEach((change: any) => {
if (change.Added) map.set(change.Added, 'added');
else if (change.Modified) map.set(change.Modified, 'modified');
else if (change.Deleted) map.set(change.Deleted, 'deleted');
});
setDiffMap(map);
}
} catch (e) {
console.error("Failed to fetch diff", e);
}
};
fetchRoot();
fetchDiff();
setPreviewContent(null);
setPreviewPath(null);
}, [reqId, commitHash, diffTargetHash]);
const getDiffStatus = (path: string) => diffMap.get(path) || 'none';
const handlePreview = async (path: string, kind: EntryKind) => {
if (kind !== 'File') return;
setPreviewPath(path);
setPreviewContent("Loading...");
try {
const res = await fetch(`/api/context/${reqId}/blob/${commitHash}/${encodeURIComponent(path)}`);
if (res.ok) {
const text = await res.text();
// Try to format JSON
try {
if (path.endsWith('.json')) {
const obj = JSON.parse(text);
setPreviewContent(JSON.stringify(obj, null, 2));
} else {
setPreviewContent(text);
}
} catch {
setPreviewContent(text);
}
} else {
setPreviewContent(`Error loading content: ${res.statusText}`);
}
} catch (e) {
setPreviewContent(`Error: ${e}`);
}
};
if (!reqId || !commitHash) {
return (
<Card className={cn("h-full flex items-center justify-center text-muted-foreground", className)}>
Select a task to view context
</Card>
);
}
return (
<div className={cn("grid grid-cols-3 gap-4 h-full max-h-full", className)}>
{/* Tree View */}
<Card className="col-span-1 flex flex-col overflow-hidden min-h-0 h-full">
<CardHeader className="py-3 px-4 bg-muted/30 border-b shrink-0">
<CardTitle className="text-sm font-medium flex items-center gap-2">
<GitCommit className="w-4 h-4" />
Context: {commitHash.substring(0, 7)}
{diffTargetHash && diffTargetHash !== commitHash && (
<Badge variant="secondary" className="ml-auto text-[10px] h-5">Diff Mode</Badge>
)}
</CardTitle>
</CardHeader>
<ScrollArea className="flex-1 p-2 h-full">
{rootEntries.map(entry => (
<TreeNode
key={entry.object_id}
reqId={reqId}
commitHash={commitHash}
path={entry.name}
name={entry.name}
kind={entry.kind}
depth={0}
getDiffStatus={getDiffStatus}
onPreview={handlePreview}
/>
))}
</ScrollArea>
</Card>
{/* Preview Pane */}
<Card className="col-span-2 flex flex-col overflow-hidden min-h-0 h-full">
<CardHeader className="py-3 px-4 bg-muted/30 border-b shrink-0">
<CardTitle className="text-sm font-medium flex items-center gap-2">
<FileText className="w-4 h-4" />
{previewPath || "Preview"}
</CardTitle>
</CardHeader>
<ScrollArea className="flex-1 h-full bg-background w-full rounded-b-lg">
<div className="p-4">
{previewContent ? (
<pre className="text-xs font-mono whitespace-pre-wrap break-words text-foreground/90">
{previewContent}
</pre>
) : (
<div className="flex items-center justify-center h-full min-h-[200px] text-muted-foreground text-sm">
Select a file to view content
</div>
)}
</div>
</ScrollArea>
</Card>
</div>
);
};

View File

@ -188,11 +188,7 @@ export function WorkflowVisualizer() {
const { getLayoutedElements } = useGridLayout(); // Use custom layout
const onNodeClick: NodeMouseHandler = useCallback((_event, node) => {
if (node.data.type === schemas.TaskType.enum.DataFetch) {
setActiveTab('data');
} else {
setActiveTab(node.id);
}
setActiveTab(node.id);
}, [setActiveTab]);
// Transform DAG to ReactFlow nodes/edges when DAG or Task Status changes
@ -230,15 +226,6 @@ export function WorkflowVisualizer() {
// In overview, show all edges but subtly
isHighlighted = false;
isDimmed = false;
} else if (activeTab === 'data') {
// If 'data' tab is active, highlight edges connected to DataFetch nodes
const fromNode = dag.nodes.find(n => n.id === edge.from);
const toNode = dag.nodes.find(n => n.id === edge.to);
if (fromNode?.type === schemas.TaskType.enum.DataFetch || toNode?.type === schemas.TaskType.enum.DataFetch) {
isHighlighted = true;
} else {
isDimmed = true;
}
} else {
// Specific node selected
if (edge.from === activeTab || edge.to === activeTab) {

View File

@ -4,21 +4,23 @@ import { Badge } from '@/components/ui/badge';
import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs"
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"
import { WorkflowVisualizer } from '@/components/workflow/WorkflowVisualizer';
import { ContextExplorer } from '@/components/workflow/ContextExplorer';
import { useWorkflowStore } from '@/stores/useWorkflowStore';
import { TaskStatus, schemas } from '@/api/schema.gen';
import { Loader2, CheckCircle2, AlertCircle, Clock, PanelLeftClose, PanelLeftOpen } from 'lucide-react';
import { Loader2, CheckCircle2, AlertCircle, Clock, PanelLeftClose, PanelLeftOpen, FileText, GitBranch, ArrowRight } from 'lucide-react';
import { Button } from '@/components/ui/button';
import ReactMarkdown from 'react-markdown';
import remarkGfm from 'remark-gfm';
import { FinancialTable } from '@/components/report/FinancialTable';
import { useAnalysisTemplates } from "@/hooks/useConfig"
import { RealtimeLogs } from '@/components/RealtimeLogs';
import { WorkflowStatus, ConnectionStatus, TaskState } from '@/types/workflow';
import { Progress } from "@/components/ui/progress"
import { cn, formatNodeName } from '@/lib/utils';
import { Dialog, DialogContent, DialogHeader, DialogTitle, DialogTrigger } from "@/components/ui/dialog";
export function ReportPage() {
const { id } = useParams();
// ... (rest of the imports)
const [searchParams] = useSearchParams();
const symbol = searchParams.get('symbol');
const market = searchParams.get('market');
@ -83,7 +85,8 @@ export function ReportPage() {
state.logs.map(log => ({ taskId, log }))
), [tasks]);
const tabNodes = dag?.nodes.filter(n => n.type === schemas.TaskType.enum.Analysis) || [];
// Include ALL nodes in tabs to allow debugging context for DataFetch tasks
const tabNodes = dag?.nodes || [];
return (
<div className="container py-4 space-y-4 min-h-[calc(100vh-4rem)] flex flex-col">
@ -175,26 +178,6 @@ export function ReportPage() {
>
Overview
</TabsTrigger>
<TabsTrigger
value="data"
className="
rounded-t-md rounded-b-none
border border-b-0 border-border/50
bg-muted/60
data-[state=active]:bg-background
data-[state=active]:border-border
data-[state=active]:border-b-background
data-[state=active]:mb-[-1px]
data-[state=active]:shadow-sm
data-[state=active]:z-10
px-4 py-2.5
text-muted-foreground
data-[state=active]:text-foreground
relative
"
>
Fundamental Data
</TabsTrigger>
{tabNodes.map(node => (
<TabsTrigger
key={node.id}
@ -233,13 +216,9 @@ export function ReportPage() {
/>
</TabsContent>
<TabsContent value="data" className="m-0 p-6">
<FinancialTable />
</TabsContent>
{tabNodes.map(node => (
<TabsContent key={node.id} value={node.id} className="m-0 p-0">
<TaskDetailView task={tasks[node.id]} />
<TaskDetailView task={tasks[node.id]} requestId={id} />
</TabsContent>
))}
</div>
@ -259,6 +238,7 @@ function OverviewTabContent({ status, tasks, totalTasks, completedTasks }: {
totalTasks: number,
completedTasks: number
}) {
// ... (implementation remains same)
const progress = totalTasks > 0 ? (completedTasks / totalTasks) * 100 : 0;
// Find errors
@ -346,44 +326,100 @@ function OverviewTabContent({ status, tasks, totalTasks, completedTasks }: {
)
}
function TaskDetailView({ task }: { task?: TaskState }) {
// Auto-scroll removed for global scrolling layout
// const contentScrollRef = useAutoScroll(task?.content?.length || 0);
function TaskDetailView({ task, requestId }: { task?: TaskState, requestId?: string }) {
// Only show context tab if we have commits
const hasContext = task?.inputCommit || task?.outputCommit;
if (task?.status === schemas.TaskStatus.enum.Failed && !task.content) {
return (
<div className="flex flex-col items-center justify-center h-full min-h-[400px] p-8 text-muted-foreground space-y-4">
<AlertCircle className="h-12 w-12 text-destructive/80 mb-2" />
<h3 className="text-lg font-medium text-foreground">Analysis Failed</h3>
<div className="max-w-xl text-center space-y-2">
<p className="text-sm text-muted-foreground">The task encountered an error and could not complete.</p>
<p className="text-destructive bg-destructive/10 p-4 rounded-md border border-destructive/20 font-mono text-sm whitespace-pre-wrap">
{task.message || "Unknown error occurred."}
</p>
</div>
<div className="flex flex-col h-full">
<Tabs defaultValue="report" className="flex-1 flex flex-col">
{hasContext && (
<div className="border-b px-4 bg-muted/40">
<TabsList className="bg-transparent p-0 h-10 w-full justify-start gap-6">
<TabsTrigger value="report" className="data-[state=active]:bg-transparent data-[state=active]:shadow-none data-[state=active]:border-b-2 data-[state=active]:border-primary rounded-none px-0 py-2 h-full">
<span className="flex items-center gap-2"><FileText className="w-4 h-4" /> Report Content</span>
</TabsTrigger>
<TabsTrigger value="context" className="data-[state=active]:bg-transparent data-[state=active]:shadow-none data-[state=active]:border-b-2 data-[state=active]:border-primary rounded-none px-0 py-2 h-full">
<span className="flex items-center gap-2"><GitBranch className="w-4 h-4" /> Context Inspector</span>
</TabsTrigger>
</TabsList>
</div>
)}
<TabsContent value="report" className="flex-1 m-0">
<div className="flex flex-col items-center justify-center h-full min-h-[400px] p-8 text-muted-foreground space-y-4">
<AlertCircle className="h-12 w-12 text-destructive/80 mb-2" />
<h3 className="text-lg font-medium text-foreground">Analysis Failed</h3>
<div className="max-w-xl text-center space-y-2">
<p className="text-sm text-muted-foreground">The task encountered an error and could not complete.</p>
<p className="text-destructive bg-destructive/10 p-4 rounded-md border border-destructive/20 font-mono text-sm whitespace-pre-wrap">
{task.message || "Unknown error occurred."}
</p>
</div>
</div>
</TabsContent>
<TabsContent value="context" className="flex-1 m-0 h-[600px]">
{requestId && (task.inputCommit || task.outputCommit) && (
<ContextExplorer
reqId={requestId}
commitHash={task.outputCommit || task.inputCommit!}
diffTargetHash={task.outputCommit ? task.inputCommit : undefined}
className="h-full p-4"
/>
)}
</TabsContent>
</Tabs>
</div>
);
}
return (
<div className="flex-1">
<div className="p-8 max-w-4xl mx-auto">
<div className="prose dark:prose-invert max-w-none prose-p:text-foreground prose-headings:text-foreground prose-li:text-foreground prose-strong:text-foreground prose-span:text-foreground">
{task?.content ? (
<ReactMarkdown remarkPlugins={[remarkGfm]}>
{task.content || ''}
</ReactMarkdown>
) : (
<div className="flex flex-col items-center justify-center h-[300px] text-muted-foreground space-y-4">
{task?.status === schemas.TaskStatus.enum.Pending && <p>Waiting to start...</p>}
{task?.status === schemas.TaskStatus.enum.Running && !task?.content && <Loader2 className="h-8 w-8 animate-spin" />}
<div className="flex flex-col h-full">
<Tabs defaultValue="report" className="flex-1 flex flex-col">
{hasContext && (
<div className="border-b px-4 bg-muted/40">
<TabsList className="bg-transparent p-0 h-10 w-full justify-start gap-6">
<TabsTrigger value="report" className="data-[state=active]:bg-transparent data-[state=active]:shadow-none data-[state=active]:border-b-2 data-[state=active]:border-primary rounded-none px-0 py-2 h-full">
<span className="flex items-center gap-2"><FileText className="w-4 h-4" /> Report Content</span>
</TabsTrigger>
<TabsTrigger value="context" className="data-[state=active]:bg-transparent data-[state=active]:shadow-none data-[state=active]:border-b-2 data-[state=active]:border-primary rounded-none px-0 py-2 h-full">
<span className="flex items-center gap-2"><GitBranch className="w-4 h-4" /> Context Inspector</span>
</TabsTrigger>
</TabsList>
</div>
)}
<TabsContent value="report" className="flex-1 m-0 min-h-0 overflow-auto">
<div className="p-8 max-w-4xl mx-auto">
<div className="prose dark:prose-invert max-w-none prose-p:text-foreground prose-headings:text-foreground prose-li:text-foreground prose-strong:text-foreground prose-span:text-foreground">
{task?.content ? (
<ReactMarkdown remarkPlugins={[remarkGfm]}>
{task.content || ''}
</ReactMarkdown>
) : (
<div className="flex flex-col items-center justify-center h-[300px] text-muted-foreground space-y-4">
{task?.status === schemas.TaskStatus.enum.Pending && <p>Waiting to start...</p>}
{task?.status === schemas.TaskStatus.enum.Running && !task?.content && <Loader2 className="h-8 w-8 animate-spin" />}
</div>
)}
{task?.status === schemas.TaskStatus.enum.Running && (
<span className="inline-block w-2 h-4 ml-1 bg-primary animate-pulse"/>
)}
</div>
</div>
</TabsContent>
<TabsContent value="context" className="flex-1 m-0 h-[600px]">
{requestId && (task?.inputCommit || task?.outputCommit) && (
<ContextExplorer
reqId={requestId}
commitHash={task.outputCommit || task.inputCommit!}
diffTargetHash={task.outputCommit ? task.inputCommit : undefined}
className="h-full p-4"
/>
)}
{task?.status === schemas.TaskStatus.enum.Running && (
<span className="inline-block w-2 h-4 ml-1 bg-primary animate-pulse"/>
)}
</div>
</div>
</TabsContent>
</Tabs>
</div>
);
}

View File

@ -13,7 +13,7 @@ interface WorkflowStoreState {
// Actions
initialize: (requestId: string) => void;
setDag: (dag: WorkflowDag) => void;
updateTaskStatus: (taskId: string, status: TaskStatus, message?: string, progress?: number) => void;
updateTaskStatus: (taskId: string, status: TaskStatus, message?: string, progress?: number, inputCommit?: string, outputCommit?: string) => void;
updateTaskContent: (taskId: string, delta: string) => void; // Stream content (append)
setTaskContent: (taskId: string, content: string) => void; // Set full content
appendTaskLog: (taskId: string, log: string) => void;
@ -54,7 +54,7 @@ export const useWorkflowStore = create<WorkflowStoreState>((set, get) => ({
set({ dag, tasks: initialTasks, status: schemas.TaskStatus.enum.Running });
},
updateTaskStatus: (taskId, status, message, progress) => {
updateTaskStatus: (taskId, status, message, progress, inputCommit, outputCommit) => {
set(state => {
let task = state.tasks[taskId];
@ -81,7 +81,9 @@ export const useWorkflowStore = create<WorkflowStoreState>((set, get) => ({
status,
message: message || task.message,
progress: progress !== undefined ? progress : task.progress,
logs: newLogs
logs: newLogs,
inputCommit: inputCommit || task.inputCommit,
outputCommit: outputCommit || task.outputCommit
}
}
};
@ -162,11 +164,14 @@ export const useWorkflowStore = create<WorkflowStoreState>((set, get) => ({
case 'TaskStateChanged': {
// Explicit typing to help TS
const p = event.payload;
// @ts-ignore - input_commit/output_commit added
state.updateTaskStatus(
p.task_id,
p.status,
p.message || undefined,
p.progress || undefined
p.progress || undefined,
p.input_commit,
p.output_commit
);
break;
}
@ -209,9 +214,10 @@ export const useWorkflowStore = create<WorkflowStoreState>((set, get) => ({
}
if (event.payload.tasks_output) {
Object.entries(event.payload.tasks_output).forEach(([taskId, content]) => {
if (newTasks[taskId] && content) {
newTasks[taskId] = { ...newTasks[taskId], content: content || undefined };
Object.entries(event.payload.tasks_output).forEach(([taskId, outputCommit]) => {
if (newTasks[taskId] && outputCommit) {
// Correctly mapping outputCommit, not content
newTasks[taskId] = { ...newTasks[taskId], outputCommit: outputCommit };
}
});
}

View File

@ -41,4 +41,7 @@ export interface TaskState {
logs: string[]; // Full log history
content?: string; // Streaming content (Markdown)
result?: unknown; // Structured result
// Context Inspector
inputCommit?: string;
outputCommit?: string;
}

View File

@ -78,6 +78,10 @@ pub fn create_router(app_state: AppState) -> Router {
let mut router = Router::new()
.route("/health", get(health_check))
.route("/tasks/{request_id}", get(get_task_progress))
// Context Inspector Proxies
.route("/api/context/{req_id}/tree/{commit_hash}", get(proxy_context_tree))
.route("/api/context/{req_id}/blob/{commit_hash}/{*path}", get(proxy_context_blob))
.route("/api/context/{req_id}/diff/{from_commit}/{to_commit}", get(proxy_context_diff))
.nest("/api/v1", create_v1_router())
.with_state(app_state);
@ -1048,3 +1052,82 @@ async fn get_workflow_graph_proxy(
axum::body::Body::from(body),
))
}
// --- Context Inspector Proxies ---
async fn proxy_context_tree(
State(state): State<AppState>,
Path((req_id, commit_hash)): Path<(String, String)>,
Query(params): Query<HashMap<String, String>>,
) -> Result<impl IntoResponse> {
let url = format!(
"{}/context/{}/tree/{}",
state.config.workflow_orchestrator_service_url.trim_end_matches('/'),
req_id,
commit_hash
);
let client = reqwest::Client::new();
let resp = client.get(&url).query(&params).send().await?;
let status = resp.status();
let body = resp.bytes().await?;
Ok((
StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
axum::body::Body::from(body),
))
}
async fn proxy_context_blob(
State(state): State<AppState>,
Path((req_id, commit_hash, path)): Path<(String, String, String)>,
) -> Result<impl IntoResponse> {
let url = format!(
"{}/context/{}/blob/{}/{}",
state.config.workflow_orchestrator_service_url.trim_end_matches('/'),
req_id,
commit_hash,
path
);
let client = reqwest::Client::new();
let resp = client.get(&url).send().await?;
let status = resp.status();
let headers = resp.headers().clone();
let body = resp.bytes().await?;
let mut response_builder = axum::http::Response::builder()
.status(StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR));
if let Some(ct) = headers.get(axum::http::header::CONTENT_TYPE) {
response_builder = response_builder.header(axum::http::header::CONTENT_TYPE, ct);
}
Ok(response_builder.body(axum::body::Body::from(body)).unwrap())
}
async fn proxy_context_diff(
State(state): State<AppState>,
Path((req_id, from_commit, to_commit)): Path<(String, String, String)>,
) -> Result<impl IntoResponse> {
let url = format!(
"{}/context/{}/diff/{}/{}",
state.config.workflow_orchestrator_service_url.trim_end_matches('/'),
req_id,
from_commit,
to_commit
);
let client = reqwest::Client::new();
let resp = client.get(&url).send().await?;
let status = resp.status();
let body = resp.bytes().await?;
Ok((
StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
axum::body::Body::from(body),
))
}

View File

@ -98,6 +98,9 @@ pub enum WorkflowEvent {
message: Option<String>,
timestamp: i64,
progress: Option<u8>, // 0-100
// New fields for Context Inspector
input_commit: Option<String>,
output_commit: Option<String>,
},
// 3. 任务流式输出 (用于 LLM 打字机效果)

View File

@ -97,21 +97,6 @@ pub async fn run_report_generation_workflow(
info!(module_id = %module_id, "All dependencies met. Generating report for module.");
// Publish TaskLog
if let Some(task_id) = &command.task_id {
let log_evt = WorkflowEvent::TaskLog {
task_id: task_id.clone(),
level: "INFO".to_string(),
message: format!("Starting module: {}", module_id),
timestamp: chrono::Utc::now().timestamp_millis(),
};
if let Ok(payload) = serde_json::to_vec(&log_evt) {
let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(command.request_id).to_string();
let nats = state.nats.clone();
tokio::spawn(async move { let _ = nats.publish(subject, payload.into()).await; });
}
}
// Broadcast Module Start
let _ = stream_tx.send(serde_json::json!({
"type": "module_start",
@ -163,20 +148,6 @@ pub async fn run_report_generation_workflow(
let formatted_financials = format_financials_to_markdown(&financials);
context.insert("financial_data", &formatted_financials);
if let Some(task_id) = &command.task_id {
let log_evt = WorkflowEvent::TaskLog {
task_id: task_id.clone(),
level: "INFO".to_string(),
message: format!("Rendering prompt template for module: {}", module_id),
timestamp: chrono::Utc::now().timestamp_millis(),
};
if let Ok(payload) = serde_json::to_vec(&log_evt) {
let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(command.request_id).to_string();
let nats = state.nats.clone();
tokio::spawn(async move { let _ = nats.publish(subject, payload.into()).await; });
}
}
info!(module_id = %module_id, "Rendering prompt template...");
let prompt = match Tera::one_off(&module_config.prompt_template, &context, true) {
Ok(p) => {
@ -229,19 +200,6 @@ pub async fn run_report_generation_workflow(
// Streaming Generation
info!(module_id = %module_id, "Initiating LLM stream...");
if let Some(task_id) = &command.task_id {
let log_evt = WorkflowEvent::TaskLog {
task_id: task_id.clone(),
level: "INFO".to_string(),
message: format!("Initiating LLM stream with model: {}", module_config.model_id),
timestamp: chrono::Utc::now().timestamp_millis(),
};
if let Ok(payload) = serde_json::to_vec(&log_evt) {
let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(command.request_id).to_string();
let nats = state.nats.clone();
tokio::spawn(async move { let _ = nats.publish(subject, payload.into()).await; });
}
}
let mut stream = match llm_client.stream_text(prompt).await {
Ok(s) => s,
Err(e) => {
@ -323,20 +281,6 @@ pub async fn run_report_generation_workflow(
info!(module_id = %module_id, "Successfully generated content (Length: {}).", full_content.len());
if let Some(task_id) = &command.task_id {
let log_evt = WorkflowEvent::TaskLog {
task_id: task_id.clone(),
level: "INFO".to_string(),
message: format!("Module completed: {}. Content length: {}", module_id, full_content.len()),
timestamp: chrono::Utc::now().timestamp_millis(),
};
if let Ok(payload) = serde_json::to_vec(&log_evt) {
let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(command.request_id).to_string();
let nats = state.nats.clone();
tokio::spawn(async move { let _ = nats.publish(subject, payload.into()).await; });
}
}
// Broadcast Module Done
let _ = stream_tx.send(serde_json::json!({
"type": "module_done",

View File

@ -2,17 +2,25 @@ use axum::{
routing::get,
Router,
Json,
extract::{State, Path},
extract::{State, Path, Query},
response::{IntoResponse, Response},
http::StatusCode,
};
use std::sync::Arc;
use crate::state::AppState;
use serde::Deserialize;
use serde_json::json;
use uuid::Uuid;
use workflow_context::traits::ContextStore;
pub fn create_router(state: Arc<AppState>) -> Router {
Router::new()
.route("/health", get(health_check))
.route("/workflows/{id}/graph", get(get_workflow_graph))
// Context Inspector APIs
.route("/context/{req_id}/tree/{commit_hash}", get(get_context_tree))
.route("/context/{req_id}/blob/{commit_hash}/{*path}", get(get_context_blob))
.route("/context/{req_id}/diff/{from_commit}/{to_commit}", get(get_context_diff))
.with_state(state)
}
@ -40,3 +48,54 @@ async fn get_workflow_graph(
}
}
#[derive(Deserialize)]
struct TreeQuery {
path: Option<String>,
}
async fn get_context_tree(
State(state): State<Arc<AppState>>,
Path((req_id, commit_hash)): Path<(String, String)>,
Query(query): Query<TreeQuery>,
) -> Json<serde_json::Value> {
let path = query.path.unwrap_or_default();
match state.vgcs.list_dir(&req_id, &commit_hash, &path) {
Ok(entries) => Json(json!(entries)),
Err(e) => Json(json!({ "error": e.to_string() })),
}
}
async fn get_context_blob(
State(state): State<Arc<AppState>>,
Path((req_id, commit_hash, path)): Path<(String, String, String)>,
) -> Response {
match state.vgcs.read_file(&req_id, &commit_hash, &path) {
Ok(mut reader) => {
let mut buffer = Vec::new();
if let Err(e) = std::io::Read::read_to_end(&mut reader, &mut buffer) {
return (StatusCode::INTERNAL_SERVER_ERROR, format!("Read error: {}", e)).into_response();
}
// Detect mime type or just return plain text/json
// For now we assume text/json usually
let content_type = if path.ends_with(".json") {
"application/json"
} else {
"text/plain"
};
([(axum::http::header::CONTENT_TYPE, content_type)], buffer).into_response()
},
Err(e) => (StatusCode::NOT_FOUND, format!("Error: {}", e)).into_response(),
}
}
async fn get_context_diff(
State(state): State<Arc<AppState>>,
Path((req_id, from_commit, to_commit)): Path<(String, String, String)>,
) -> Json<serde_json::Value> {
match state.vgcs.diff(&req_id, &from_commit, &to_commit) {
Ok(changes) => Json(json!(changes)),
Err(e) => Json(json!({ "error": e.to_string() })),
}
}

View File

@ -84,6 +84,8 @@ pub struct DagNode {
pub status: TaskStatus,
pub config: serde_json::Value,
pub routing_key: String,
/// The commit hash used as input for this task
pub input_commit: Option<String>,
}
impl DagScheduler {
@ -105,9 +107,16 @@ impl DagScheduler {
status: TaskStatus::Pending,
config,
routing_key,
input_commit: None,
});
}
pub fn set_input_commit(&mut self, task_id: &str, commit: String) {
if let Some(node) = self.nodes.get_mut(task_id) {
node.input_commit = Some(commit);
}
}
pub fn add_dependency(&mut self, from: &str, to: &str) {
self.forward_deps.entry(from.to_string()).or_default().push(to.to_string());
self.reverse_deps.entry(to.to_string()).or_default().push(from.to_string());

View File

@ -14,7 +14,7 @@ use tokio::sync::Mutex;
use crate::dag_scheduler::DagScheduler;
use crate::state::AppState;
use workflow_context::{Vgcs, ContextStore}; // Added ContextStore
use workflow_context::{Vgcs, ContextStore, traits::Transaction}; // Added Transaction
pub struct WorkflowEngine {
state: Arc<AppState>,
@ -30,21 +30,38 @@ impl WorkflowEngine {
let req_id = cmd.request_id;
info!("Starting workflow {}", req_id);
self.publish_log(req_id, "workflow", "INFO", "Workflow started. Initializing...").await;
// 1. Init VGCS Repo
self.state.vgcs.init_repo(&req_id.to_string())?;
// 2. Create Scheduler
// Initial commit is empty for a fresh workflow
let mut dag = DagScheduler::new(req_id, String::new());
// 2. Create Initial Commit (Context Baseline)
let mut tx = self.state.vgcs.begin_transaction(&req_id.to_string(), "")?;
let request_info = json!({
"request_id": req_id,
"symbol": cmd.symbol.as_str(),
"market": cmd.market,
"template_id": cmd.template_id,
"timestamp": chrono::Utc::now().to_rfc3339()
});
tx.write("request.json", serde_json::to_vec_pretty(&request_info)?.as_slice())?;
let initial_commit = Box::new(tx).commit("Initial Workflow Setup", "System")?;
// 3. Fetch Template Config
// 3. Create Scheduler with Initial Commit
let mut dag = DagScheduler::new(req_id, initial_commit);
// 4. Fetch Template Config
self.publish_log(req_id, "workflow", "INFO", "Fetching template configuration...").await;
let template_sets = self.state.persistence_client.get_analysis_template_sets().await?;
let template = template_sets.get(&cmd.template_id).ok_or_else(|| {
anyhow::anyhow!("Template {} not found", cmd.template_id)
})?;
// 3.1 Fetch Data Sources Config
let data_sources = self.state.persistence_client.get_data_sources_config().await?;
// 4. Build DAG
self.build_dag(&mut dag, template, &cmd.template_id, &cmd.market, &cmd.symbol);
self.build_dag(&mut dag, template, &data_sources, &cmd.template_id, &cmd.market, &cmd.symbol);
// 5. Save State
self.state.workflows.insert(req_id, Arc::new(Mutex::new(dag.clone())));
@ -61,6 +78,8 @@ impl WorkflowEngine {
}
}
self.publish_log(req_id, "workflow", "INFO", "DAG built and workflow initialized.").await;
// 6. Trigger Initial Tasks
let initial_tasks = dag.get_initial_tasks();
@ -109,7 +128,7 @@ impl WorkflowEngine {
timestamp: chrono::Utc::now().timestamp_millis(),
task_graph: dag.to_dto(),
tasks_status,
tasks_output: std::collections::HashMap::new(), // TODO: Populate output if needed
tasks_output: dag.commit_tracker.task_commits.clone().into_iter().map(|(k, v)| (k, Some(v))).collect(),
};
let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(req_id).to_string();
@ -136,6 +155,8 @@ impl WorkflowEngine {
// 1. Update Status & Record Commit
dag.update_status(&evt.task_id, evt.status);
self.publish_log(req_id, &evt.task_id, "INFO", &format!("Task status changed to {:?}", evt.status)).await;
// Lookup task_type
let task_type = dag.nodes.get(&evt.task_id).map(|n| n.task_type).unwrap_or(TaskType::DataFetch);
@ -158,6 +179,10 @@ impl WorkflowEngine {
None
};
// Resolve commits
let input_commit = dag.nodes.get(&evt.task_id).and_then(|n| n.input_commit.clone());
let output_commit = evt.result.as_ref().and_then(|r| r.new_commit.clone());
// Publish TaskStateChanged event
let progress_event = WorkflowEvent::TaskStateChanged {
task_id: evt.task_id.clone(),
@ -166,6 +191,8 @@ impl WorkflowEngine {
message: error_message,
timestamp: chrono::Utc::now().timestamp_millis(),
progress: None,
input_commit: input_commit,
output_commit: output_commit,
};
let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(req_id).to_string();
if let Ok(payload) = serde_json::to_vec(&progress_event) {
@ -190,6 +217,7 @@ impl WorkflowEngine {
for task_id in ready_tasks {
if let Err(e) = self.dispatch_task(&mut dag, &task_id, &self.state.vgcs).await {
error!("Failed to dispatch task {}: {}", task_id, e);
self.publish_log(req_id, &task_id, "ERROR", &format!("Failed to dispatch task: {}", e)).await;
}
}
}
@ -199,6 +227,7 @@ impl WorkflowEngine {
let timestamp = chrono::Utc::now().timestamp_millis();
let event = if dag.has_failures() {
info!("Workflow {} failed (some tasks failed)", req_id);
self.publish_log(req_id, "workflow", "ERROR", "Workflow finished with failures.").await;
WorkflowEvent::WorkflowFailed {
end_timestamp: timestamp,
reason: "Some tasks failed".to_string(),
@ -206,6 +235,7 @@ impl WorkflowEngine {
}
} else {
info!("Workflow {} completed successfully", req_id);
self.publish_log(req_id, "workflow", "INFO", "Workflow completed successfully.").await;
WorkflowEvent::WorkflowCompleted {
end_timestamp: timestamp,
result_summary: Some(json!({})),
@ -227,8 +257,14 @@ impl WorkflowEngine {
// 1. Resolve Context (Merge if needed)
let context = dag.resolve_context(task_id, vgcs)?;
// Store the input commit in the node for observability
if let Some(base_commit) = &context.base_commit {
dag.set_input_commit(task_id, base_commit.clone());
}
// 2. Update Status
dag.update_status(task_id, TaskStatus::Scheduled);
self.publish_log(dag.request_id, task_id, "INFO", "Task scheduled and dispatched.").await;
// 3. Construct Command
let node = dag.nodes.get(task_id).ok_or_else(|| anyhow::anyhow!("Node not found"))?;
@ -264,33 +300,65 @@ impl WorkflowEngine {
}
// Helper to build DAG
fn build_dag(&self, dag: &mut DagScheduler, template: &common_contracts::config_models::AnalysisTemplateSet, template_id: &str, market: &str, symbol: &CanonicalSymbol) {
let mut providers = Vec::new();
match market {
"CN" => {
providers.push("tushare");
},
"US" => providers.push("yfinance"),
"MOCK" => providers.push("mock"),
_ => providers.push("yfinance"),
}
fn build_dag(
&self,
dag: &mut DagScheduler,
template: &common_contracts::config_models::AnalysisTemplateSet,
data_sources: &common_contracts::config_models::DataSourcesConfig,
template_id: &str,
market: &str,
symbol: &CanonicalSymbol
) {
// 1. Data Fetch Nodes
let mut fetch_tasks = Vec::new();
for p in &providers {
let task_id = format!("fetch:{}", p);
fetch_tasks.push(task_id.clone());
let display_name = format!("Data Fetch ({})", p);
dag.add_node(
task_id.clone(),
Some(display_name),
TaskType::DataFetch,
format!("provider.{}", p),
// Use all enabled data sources regardless of market
// The provider itself will decide whether to skip or process based on market support.
// We sort keys to ensure deterministic DAG generation.
let mut source_keys: Vec<&String> = data_sources.keys().collect();
source_keys.sort();
for key in source_keys {
let config = &data_sources[key];
if config.enabled {
let provider_key = key.to_lowercase();
let task_id = format!("fetch:{}", provider_key);
fetch_tasks.push(task_id.clone());
let display_name = format!("Data Fetch ({:?})", config.provider);
let routing_key = format!("provider.{}", provider_key);
dag.add_node(
task_id.clone(),
Some(display_name),
TaskType::DataFetch,
routing_key,
json!({
"symbol": symbol.as_str(),
"market": market
})
);
}
}
// Fallback for MOCK if not in config (usually mock is not in data_sources.json but hardcoded for tests)
if market == "MOCK" && fetch_tasks.is_empty() {
let task_id = "fetch:mock".to_string();
fetch_tasks.push(task_id.clone());
dag.add_node(
task_id,
Some("Data Fetch (Mock)".to_string()),
TaskType::DataFetch,
"provider.mock".to_string(),
json!({
"symbol": symbol.as_str(),
"market": market
})
);
);
}
if fetch_tasks.is_empty() {
warn!("No enabled data providers found in configuration.");
}
// 2. Analysis Nodes (Dynamic from Template)
@ -329,4 +397,23 @@ impl WorkflowEngine {
}
}
}
async fn publish_log(&self, req_id: uuid::Uuid, task_id: &str, level: &str, message: &str) {
let event = WorkflowEvent::TaskLog {
task_id: task_id.to_string(),
level: level.to_string(),
message: message.to_string(),
timestamp: chrono::Utc::now().timestamp_millis(),
};
let subject = common_contracts::subjects::NatsSubject::WorkflowProgress(req_id).to_string();
if let Ok(payload) = serde_json::to_vec(&event) {
// Fire and forget
let nats = self.nats.clone();
tokio::spawn(async move {
if let Err(e) = nats.publish(subject, payload.into()).await {
error!("Failed to publish TaskLog: {}", e);
}
});
}
}
}