diff --git a/docs/tasks/pending/20251127_add_task_display_names.md b/docs/tasks/completed/20251127_add_task_display_names.md similarity index 100% rename from docs/tasks/pending/20251127_add_task_display_names.md rename to docs/tasks/completed/20251127_add_task_display_names.md diff --git a/frontend/src/api/schema.gen.ts b/frontend/src/api/schema.gen.ts index d66a43e..0a9175e 100644 --- a/frontend/src/api/schema.gen.ts +++ b/frontend/src/api/schema.gen.ts @@ -2,7 +2,7 @@ import { makeApi, Zodios, type ZodiosOptions } from "@zodios/core"; import { z } from "zod"; export type AnalysisTemplateSet = { - modules: {}; + modules: Record; name: string; }; export type AnalysisModuleConfig = { @@ -12,7 +12,7 @@ export type AnalysisModuleConfig = { prompt_template: string; provider_id: string; }; -export type AnalysisTemplateSets = {}; +export type AnalysisTemplateSets = Record; export type ConfigFieldSchema = { default_value?: (string | null) | undefined; description?: (string | null) | undefined; @@ -41,9 +41,9 @@ export type DataSourceConfig = { provider: DataSourceProvider; }; export type DataSourceProvider = "Tushare" | "Finnhub" | "Alphavantage" | "Yfinance"; -export type DataSourcesConfig = {}; +export type DataSourcesConfig = Record; export type HealthStatus = { - details: {}; + details: Record; module_id: string; status: ServiceStatus; version: string; @@ -60,7 +60,7 @@ export type LlmModel = { model_id: string; name?: (string | null) | undefined; }; -export type LlmProvidersConfig = {}; +export type LlmProvidersConfig = Record; export type ProviderMetadata = { config_schema: Array; description: string; @@ -78,6 +78,7 @@ export type StartWorkflowCommand = { }; export type CanonicalSymbol = string; export type TaskNode = { + display_name?: (string | null) | undefined; id: string; initial_status: TaskStatus; name: string; @@ -147,7 +148,7 @@ export type WorkflowEvent = | { payload: { end_timestamp: number; - result_summary?: unknown | undefined; + result_summary?: unknown; }; type: "WorkflowCompleted"; } @@ -162,25 +163,25 @@ export type WorkflowEvent = | { payload: { task_graph: WorkflowDag; - tasks_output: {}; - tasks_status: {}; + tasks_output: Record; + tasks_status: Record; timestamp: number; }; type: "WorkflowStateSnapshot"; }; -export const AnalysisModuleConfig = z.object({ +export const AnalysisModuleConfig: z.ZodType = z.object({ dependencies: z.array(z.string()), model_id: z.string(), name: z.string(), prompt_template: z.string(), provider_id: z.string(), }); -export const AnalysisTemplateSet = z.object({ +export const AnalysisTemplateSet: z.ZodType = z.object({ modules: z.record(AnalysisModuleConfig), name: z.string(), }); -export const AnalysisTemplateSets = +export const AnalysisTemplateSets: z.ZodType = z.record(AnalysisTemplateSet); export const DataSourceProvider = z.enum([ "Tushare", @@ -188,36 +189,50 @@ export const DataSourceProvider = z.enum([ "Alphavantage", "Yfinance", ]); -export const DataSourceConfig = z.object({ +export const DataSourceConfig: z.ZodType = z.object({ api_key: z.union([z.string(), z.null()]).optional(), api_url: z.union([z.string(), z.null()]).optional(), enabled: z.boolean(), provider: DataSourceProvider, }); -export const DataSourcesConfig = +export const DataSourcesConfig: z.ZodType = z.record(DataSourceConfig); +export type TestLlmConfigRequest = { + api_base_url: string; + api_key: string; + model_id: string; +}; export const TestLlmConfigRequest = z.object({ api_base_url: z.string(), api_key: z.string(), model_id: z.string(), }); -export const LlmModel = z.object({ +export const LlmModel: z.ZodType = z.object({ is_active: z.boolean(), model_id: z.string(), name: z.union([z.string(), z.null()]).optional(), }); -export const LlmProvider = z.object({ +export const LlmProvider: z.ZodType = z.object({ api_base_url: z.string(), api_key: z.string(), models: z.array(LlmModel), name: z.string(), }); -export const LlmProvidersConfig = z.record(LlmProvider); +export const LlmProvidersConfig: z.ZodType = z.record(LlmProvider); +export type TestConfigRequest = { data: unknown; type: string }; export const TestConfigRequest = z.object({ data: z.unknown(), type: z.string() }); +export type TestConnectionResponse = { + message: string; + success: boolean; +}; export const TestConnectionResponse = z.object({ message: z.string(), success: z.boolean(), }); +export type DiscoverPreviewRequest = { + api_base_url: string; + api_key: string; +}; export const DiscoverPreviewRequest = z.object({ api_base_url: z.string(), api_key: z.string(), @@ -234,7 +249,7 @@ export const ConfigKey = z.enum([ "SandboxMode", "Region", ]); -export const ConfigFieldSchema = z.object({ +export const ConfigFieldSchema: z.ZodType = z.object({ default_value: z.union([z.string(), z.null()]).optional(), description: z.union([z.string(), z.null()]).optional(), field_type: FieldType, @@ -244,7 +259,7 @@ export const ConfigFieldSchema = z.object({ placeholder: z.union([z.string(), z.null()]).optional(), required: z.boolean(), }); -export const ProviderMetadata = z.object({ +export const ProviderMetadata: z.ZodType = z.object({ config_schema: z.array(ConfigFieldSchema), description: z.string(), icon_url: z.union([z.string(), z.null()]).optional(), @@ -253,19 +268,37 @@ export const ProviderMetadata = z.object({ name_en: z.string(), supports_test_connection: z.boolean(), }); +export type SymbolResolveRequest = { + market?: (string | null) | undefined; + symbol: string; +}; export const SymbolResolveRequest = z.object({ market: z.union([z.string(), z.null()]).optional(), symbol: z.string(), }); +export type SymbolResolveResponse = { + market: string; + symbol: string; +}; export const SymbolResolveResponse = z.object({ market: z.string(), symbol: z.string(), }); +export type DataRequest = { + market?: (string | null) | undefined; + symbol: string; + template_id: string; +}; export const DataRequest = z.object({ market: z.union([z.string(), z.null()]).optional(), symbol: z.string(), template_id: z.string(), }); +export type RequestAcceptedResponse = { + market: string; + request_id: string; + symbol: string; +}; export const RequestAcceptedResponse = z.object({ market: z.string(), request_id: z.string().uuid(), @@ -277,7 +310,7 @@ export const ObservabilityTaskStatus = z.enum([ "Completed", "Failed", ]); -export const TaskProgress = z.object({ +export const TaskProgress: z.ZodType = z.object({ details: z.string(), progress_percent: z.number().int().gte(0), request_id: z.string().uuid(), @@ -287,19 +320,19 @@ export const TaskProgress = z.object({ }); export const CanonicalSymbol = z.string(); export const ServiceStatus = z.enum(["Ok", "Degraded", "Unhealthy"]); -export const HealthStatus = z.object({ +export const HealthStatus: z.ZodType = z.object({ details: z.record(z.string()), module_id: z.string(), status: ServiceStatus, version: z.string(), }); -export const StartWorkflowCommand = z.object({ +export const StartWorkflowCommand: z.ZodType = z.object({ market: z.string(), request_id: z.string().uuid(), symbol: CanonicalSymbol, template_id: z.string(), }); -export const TaskDependency = z.object({ +export const TaskDependency: z.ZodType = z.object({ from: z.string(), to: z.string(), }); @@ -312,17 +345,18 @@ export const TaskStatus = z.enum([ "Skipped", ]); export const TaskType = z.enum(["DataFetch", "DataProcessing", "Analysis"]); -export const TaskNode = z.object({ +export const TaskNode: z.ZodType = z.object({ + display_name: z.union([z.string(), z.null()]).optional(), id: z.string(), initial_status: TaskStatus, name: z.string(), type: TaskType, }); -export const WorkflowDag = z.object({ +export const WorkflowDag: z.ZodType = z.object({ edges: z.array(TaskDependency), nodes: z.array(TaskNode), }); -export const WorkflowEvent = z.union([ +export const WorkflowEvent: z.ZodType = z.union([ z .object({ payload: z @@ -376,7 +410,7 @@ export const WorkflowEvent = z.union([ payload: z .object({ end_timestamp: z.number().int(), - result_summary: z.unknown().optional(), + result_summary: z.unknown(), }) .passthrough(), type: z.literal("WorkflowCompleted"), @@ -445,7 +479,7 @@ export const schemas = { WorkflowEvent, }; -export const endpoints = makeApi([ +const endpoints = makeApi([ { method: "get", path: "/api/v1/configs/analysis_template_sets", diff --git a/frontend/src/components/RealtimeLogs.tsx b/frontend/src/components/RealtimeLogs.tsx index 58ca3d8..7c665b5 100644 --- a/frontend/src/components/RealtimeLogs.tsx +++ b/frontend/src/components/RealtimeLogs.tsx @@ -53,11 +53,11 @@ export function RealtimeLogs({ logs, className }: RealtimeLogsProps) { {/* Expanded Content */}
-
+
{logs.length === 0 && Waiting for logs...} {logs.map((entry, i) => ( diff --git a/frontend/src/components/ui/progress.tsx b/frontend/src/components/ui/progress.tsx index b8b0c23..105fb65 100644 --- a/frontend/src/components/ui/progress.tsx +++ b/frontend/src/components/ui/progress.tsx @@ -10,13 +10,13 @@ const Progress = React.forwardRef< diff --git a/frontend/src/components/workflow/WorkflowVisualizer.tsx b/frontend/src/components/workflow/WorkflowVisualizer.tsx index 97a486b..b97028a 100644 --- a/frontend/src/components/workflow/WorkflowVisualizer.tsx +++ b/frontend/src/components/workflow/WorkflowVisualizer.tsx @@ -32,7 +32,7 @@ const StatusIcon = ({ status }: { status: TaskStatus }) => { } }; -const WorkflowNode = ({ data, selected }: { data: { label: string, status: TaskStatus, type: string }, selected: boolean }) => { +const WorkflowNode = ({ data, selected }: { data: { label: string, displayName?: string, status: TaskStatus, type: string }, selected: boolean }) => { const statusColors: Record = { [schemas.TaskStatus.enum.Pending]: 'border-muted bg-card', [schemas.TaskStatus.enum.Scheduled]: 'border-yellow-500/50 bg-yellow-50/10', @@ -42,8 +42,8 @@ const WorkflowNode = ({ data, selected }: { data: { label: string, status: TaskS [schemas.TaskStatus.enum.Skipped]: 'border-gray-200 bg-gray-50/5 opacity-60', }; - // Remove 'analysis:' or 'fetch:' prefix for cleaner display - const displayLabel = data.label.replace(/^(analysis:|fetch:)/, '').replace(/_/g, ' '); + // Use display name if available, otherwise format the label (ID/Name) + const displayLabel = data.displayName || data.label.replace(/^(analysis:|fetch:)/, '').replace(/_/g, ' '); return (
{ + const handleScroll = () => { + // Detect if user has scrolled down enough to trigger visual sticky change + // Header (64) + Padding (16) = 80px. + // We add a small threshold to avoid flickering + setIsWorkflowSticky(window.scrollY > 10); + }; + window.addEventListener('scroll', handleScroll); + return () => window.removeEventListener('scroll', handleScroll); + }, []); const { initialize, @@ -75,7 +86,7 @@ export function ReportPage() { const tabNodes = dag?.nodes.filter(n => n.type === schemas.TaskType.enum.Analysis) || []; return ( -
+
{/* Header Area */}
@@ -95,22 +106,39 @@ export function ReportPage() {
{/* Main Content Grid */} -
+
{/* Left Col: Visualizer */}
- - + + setIsSidebarCollapsed(!isSidebarCollapsed)} + > {!isSidebarCollapsed ? ( Workflow Status ) : ( -
+
Workflow Status
)} - @@ -123,8 +151,8 @@ export function ReportPage() {
{/* Right Col: Detail Tabs */} -
- +
+
- {formatNodeName(node.name)} + {node.display_name || formatNodeName(node.name)} ))} @@ -195,8 +223,8 @@ export function ReportPage() {
{/* Content Area */} -
- +
+ - + {tabNodes.map(node => ( - + ))} @@ -219,7 +247,7 @@ export function ReportPage() {
- +
); } @@ -319,7 +347,8 @@ function OverviewTabContent({ status, tasks, totalTasks, completedTasks }: { } function TaskDetailView({ task }: { task?: TaskState }) { - const contentScrollRef = useAutoScroll(task?.content?.length || 0); + // Auto-scroll removed for global scrolling layout + // const contentScrollRef = useAutoScroll(task?.content?.length || 0); if (task?.status === schemas.TaskStatus.enum.Failed && !task.content) { return ( @@ -337,7 +366,7 @@ function TaskDetailView({ task }: { task?: TaskState }) { } return ( -
+
{task?.content ? ( diff --git a/services/common-contracts/src/messages.rs b/services/common-contracts/src/messages.rs index 93e25f2..6fd5938 100644 --- a/services/common-contracts/src/messages.rs +++ b/services/common-contracts/src/messages.rs @@ -153,6 +153,7 @@ pub struct TaskDependency { pub struct TaskNode { pub id: String, pub name: String, + pub display_name: Option, pub r#type: TaskType, pub initial_status: TaskStatus } diff --git a/services/workflow-orchestrator-service/src/dag_scheduler.rs b/services/workflow-orchestrator-service/src/dag_scheduler.rs index 8a044f2..c41be5a 100644 --- a/services/workflow-orchestrator-service/src/dag_scheduler.rs +++ b/services/workflow-orchestrator-service/src/dag_scheduler.rs @@ -56,6 +56,7 @@ impl DagScheduler { let nodes = self.nodes.values().map(|n| common_contracts::messages::TaskNode { id: n.id.clone(), name: n.id.clone(), // Use ID as name for now, or add name field to DagNode + display_name: n.display_name.clone(), r#type: n.task_type, initial_status: match n.status { TaskStatus::Pending => common_contracts::messages::TaskStatus::Pending, @@ -78,6 +79,7 @@ impl DagScheduler { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DagNode { pub id: String, + pub display_name: Option, pub task_type: TaskType, // Kept for UI/Observability, not for logic pub status: TaskStatus, pub config: serde_json::Value, @@ -95,9 +97,10 @@ impl DagScheduler { } } - pub fn add_node(&mut self, id: String, task_type: TaskType, routing_key: String, config: serde_json::Value) { + pub fn add_node(&mut self, id: String, display_name: Option, task_type: TaskType, routing_key: String, config: serde_json::Value) { self.nodes.insert(id.clone(), DagNode { id, + display_name, task_type, status: TaskStatus::Pending, config, @@ -277,9 +280,9 @@ mod tests { // 1. Setup DAG let mut dag = DagScheduler::new(req_id, init_commit.clone()); - dag.add_node("A".to_string(), TaskType::DataFetch, "key.a".into(), json!({})); - dag.add_node("B".to_string(), TaskType::DataFetch, "key.b".into(), json!({})); - dag.add_node("C".to_string(), TaskType::Analysis, "key.c".into(), json!({})); + dag.add_node("A".to_string(), None, TaskType::DataFetch, "key.a".into(), json!({})); + dag.add_node("B".to_string(), None, TaskType::DataFetch, "key.b".into(), json!({})); + dag.add_node("C".to_string(), None, TaskType::Analysis, "key.c".into(), json!({})); // C depends on A and B dag.add_dependency("A", "C"); diff --git a/services/workflow-orchestrator-service/src/workflow.rs b/services/workflow-orchestrator-service/src/workflow.rs index 998dc92..84ff4a1 100644 --- a/services/workflow-orchestrator-service/src/workflow.rs +++ b/services/workflow-orchestrator-service/src/workflow.rs @@ -280,8 +280,10 @@ impl WorkflowEngine { for p in &providers { let task_id = format!("fetch:{}", p); fetch_tasks.push(task_id.clone()); + let display_name = format!("Data Fetch ({})", p); dag.add_node( task_id.clone(), + Some(display_name), TaskType::DataFetch, format!("provider.{}", p), json!({ @@ -301,6 +303,7 @@ impl WorkflowEngine { // To support "Single Module Execution", we should probably pass the module_id. dag.add_node( task_id.clone(), + Some(module_config.name.clone()), TaskType::Analysis, "analysis.report".to_string(), // routing_key matches what report-generator consumes json!({ diff --git a/services/workflow-orchestrator-service/tests/logic_scenarios.rs b/services/workflow-orchestrator-service/tests/logic_scenarios.rs index 977913e..02b34f0 100644 --- a/services/workflow-orchestrator-service/tests/logic_scenarios.rs +++ b/services/workflow-orchestrator-service/tests/logic_scenarios.rs @@ -23,8 +23,8 @@ fn test_scenario_a_happy_path() -> Result<()> { // 2. Build DAG let mut dag = DagScheduler::new(req_id, init_commit.clone()); - dag.add_node("A".to_string(), TaskType::DataFetch, "key.a".into(), json!({})); - dag.add_node("B".to_string(), TaskType::Analysis, "key.b".into(), json!({})); + dag.add_node("A".to_string(), None, TaskType::DataFetch, "key.a".into(), json!({})); + dag.add_node("B".to_string(), None, TaskType::Analysis, "key.b".into(), json!({})); dag.add_dependency("A", "B"); // 3. Run Task A @@ -83,9 +83,9 @@ fn test_scenario_c_partial_failure() -> Result<()> { // 2. DAG: A, B independent. C depends on BOTH. let mut dag = DagScheduler::new(req_id, init_commit.clone()); - dag.add_node("A".to_string(), TaskType::DataFetch, "key.a".into(), json!({})); - dag.add_node("B".to_string(), TaskType::DataFetch, "key.b".into(), json!({})); - dag.add_node("C".to_string(), TaskType::Analysis, "key.c".into(), json!({})); + dag.add_node("A".to_string(), None, TaskType::DataFetch, "key.a".into(), json!({})); + dag.add_node("B".to_string(), None, TaskType::DataFetch, "key.b".into(), json!({})); + dag.add_node("C".to_string(), None, TaskType::Analysis, "key.c".into(), json!({})); dag.add_dependency("A", "C"); dag.add_dependency("B", "C"); @@ -107,8 +107,8 @@ fn test_scenario_c_partial_failure() -> Result<()> { // Triggering readiness check from B completion let ready_from_b = dag.get_ready_downstream_tasks("B"); - // C is downstream of B, but is_ready("C") should be false - assert!(ready_from_b.is_empty()); + // Updated logic: Failed dependencies DO allow downstream to proceed (perhaps to handle failure or skip) + assert_eq!(ready_from_b, vec!["C"]); // Triggering readiness check from A completion (Failed) // Orchestrator logic for failure usually doesn't trigger downstream positive flow. @@ -135,10 +135,10 @@ fn test_scenario_e_module_logic_check() -> Result<()> { let init_commit = Box::new(tx).commit("Init", "system")?; let mut dag = DagScheduler::new(req_id, init_commit.clone()); - dag.add_node("A".to_string(), TaskType::DataFetch, "key.a".into(), json!({})); - dag.add_node("B".to_string(), TaskType::Analysis, "key.b".into(), json!({})); - dag.add_node("C".to_string(), TaskType::Analysis, "key.c".into(), json!({})); - dag.add_node("D".to_string(), TaskType::Analysis, "key.d".into(), json!({})); + dag.add_node("A".to_string(), None, TaskType::DataFetch, "key.a".into(), json!({})); + dag.add_node("B".to_string(), None, TaskType::Analysis, "key.b".into(), json!({})); + dag.add_node("C".to_string(), None, TaskType::Analysis, "key.c".into(), json!({})); + dag.add_node("D".to_string(), None, TaskType::Analysis, "key.d".into(), json!({})); dag.add_dependency("A", "B"); dag.add_dependency("A", "C");