import { useState, useRef, useCallback, useEffect } from 'react'; import { WorkflowEvent, WorkflowDag, TaskStatus, StartWorkflowRequest, StartWorkflowResponse, TaskType } from '@/types/workflow'; export type WorkflowConnectionStatus = 'idle' | 'connecting' | 'connected' | 'disconnected' | 'error'; export interface TaskInfo { taskId: string; type?: TaskType; status: TaskStatus; message?: string; lastUpdate: number; } interface UseWorkflowReturn { // State status: WorkflowConnectionStatus; requestId: string | null; dag: WorkflowDag | null; taskStates: Record; taskInfos: Record; // Added for rich metadata taskOutputs: Record; // Accumulates streaming content error: string | null; finalResult: any | null; // Actions // Returns StartWorkflowResponse to allow caller to handle redirects (e.g. symbol normalization) startWorkflow: (params: StartWorkflowRequest) => Promise; connectToWorkflow: (requestId: string) => void; disconnect: () => void; } export function useWorkflow(): UseWorkflowReturn { const [status, setStatus] = useState('idle'); const [requestId, setRequestId] = useState(null); const [dag, setDag] = useState(null); const [taskStates, setTaskStates] = useState>({}); const [taskInfos, setTaskInfos] = useState>({}); const [taskOutputs, setTaskOutputs] = useState>({}); const [error, setError] = useState(null); const [finalResult, setFinalResult] = useState(null); // Ref for EventSource to handle cleanup const eventSourceRef = useRef(null); // Refs for state that updates frequently to avoid closure staleness in event handlers if needed // (Though in this React pattern, simple state updates usually suffice unless high freq) const disconnect = useCallback(() => { if (eventSourceRef.current) { eventSourceRef.current.close(); eventSourceRef.current = null; } setStatus('disconnected'); }, []); const handleEvent = useCallback((eventData: WorkflowEvent) => { console.log('[useWorkflow] Handling event type:', eventData.type); switch (eventData.type) { case 'WorkflowStarted': console.log('[useWorkflow] WorkflowStarted. Nodes:', eventData.payload.task_graph.nodes.length); setDag(eventData.payload.task_graph); // Initialize states based on graph const initialStates: Record = {}; const initialInfos: Record = {}; eventData.payload.task_graph.nodes.forEach(node => { initialStates[node.id] = node.initial_status; initialInfos[node.id] = { taskId: node.id, type: node.type, status: node.initial_status, lastUpdate: eventData.payload.timestamp }; }); setTaskStates(initialStates); setTaskInfos(initialInfos); break; case 'TaskStateChanged': const { task_id, status, message, timestamp, task_type } = eventData.payload; console.log(`[useWorkflow] TaskStateChanged: ${task_id} -> ${status}`); setTaskStates(prev => ({ ...prev, [task_id]: status })); setTaskInfos(prev => ({ ...prev, [task_id]: { taskId: task_id, type: task_type, status: status, message: message || undefined, // normalize null to undefined lastUpdate: timestamp } })); break; case 'TaskStreamUpdate': // console.log(`[useWorkflow] StreamUpdate for ${eventData.payload.task_id}, len: ${eventData.payload.content_delta.length}`); setTaskOutputs(prev => ({ ...prev, [eventData.payload.task_id]: (prev[eventData.payload.task_id] || '') + eventData.payload.content_delta })); break; case 'WorkflowStateSnapshot': console.log('[useWorkflow] Snapshot received. Tasks:', Object.keys(eventData.payload.tasks_status).length); // Restore full state setDag(eventData.payload.task_graph); setTaskStates(eventData.payload.tasks_status); // Reconstruct basic infos from snapshot (Snapshot doesn't carry full history messages sadly, // but we can at least sync status) const syncedInfos: Record = {}; Object.entries(eventData.payload.tasks_status).forEach(([tid, stat]) => { syncedInfos[tid] = { taskId: tid, status: stat, lastUpdate: eventData.payload.timestamp }; }); setTaskInfos(prev => ({...prev, ...syncedInfos})); // Merge to keep existing messages if we have them // Restore outputs if present const outputs: Record = {}; Object.entries(eventData.payload.tasks_output).forEach(([k, v]) => { if (v) outputs[k] = v; }); setTaskOutputs(prev => ({ ...prev, ...outputs })); break; case 'WorkflowCompleted': console.log('[useWorkflow] Workflow Completed'); setFinalResult(eventData.payload.result_summary); disconnect(); // Close connection on completion break; case 'WorkflowFailed': console.error('[useWorkflow] Workflow Failed:', eventData.payload.reason); setError(eventData.payload.reason); // We might want to keep connected or disconnect depending on if retry is possible // For now, treat fatal error as disconnect reason if (eventData.payload.is_fatal) { disconnect(); setStatus('error'); } break; } }, [disconnect]); const connectToWorkflow = useCallback((id: string) => { console.log('[useWorkflow] connectToWorkflow called for ID:', id); if (eventSourceRef.current) { console.log('[useWorkflow] Closing existing EventSource'); eventSourceRef.current.close(); } setRequestId(id); setStatus('connecting'); setError(null); try { const url = `/api/workflow/events/${id}`; console.log('[useWorkflow] Creating new EventSource:', url); // IMPORTANT: Do NOT use Next.js rewrites for SSE. They buffer. // We use a direct API Route Handler (app/api/workflow/events/[requestId]/route.ts) // which explicitly disables buffering via headers. const es = new EventSource(url); eventSourceRef.current = es; es.onopen = (e) => { console.log(`[useWorkflow] SSE onopen triggered. URL: ${url}`, e); setStatus('connected'); }; es.onmessage = (event) => { try { console.log('[useWorkflow] Raw SSE Message:', event.data); const data = JSON.parse(event.data) as WorkflowEvent; console.log('[useWorkflow] Parsed Event:', data.type, data); handleEvent(data); } catch (e) { console.error('[useWorkflow] Failed to parse workflow event:', e, event.data); } }; es.onerror = (e) => { console.error('[useWorkflow] Workflow SSE error:', e); // EventSource automatically retries, but we might want to handle it explicitly // For now, let's assume if readyState is CLOSED, it's a fatal error if (es.readyState === EventSource.CLOSED) { console.log('[useWorkflow] SSE Closed permanently'); setStatus('error'); setError('Connection lost'); es.close(); } }; } catch (e) { console.error('Failed to create EventSource:', e); setStatus('error'); setError(e instanceof Error ? e.message : 'Connection initialization failed'); } }, [handleEvent]); const startWorkflow = useCallback(async (params: StartWorkflowRequest) => { console.log('[useWorkflow] startWorkflow called with params:', params); setStatus('connecting'); setError(null); setDag(null); setTaskStates({}); setTaskInfos({}); setTaskOutputs({}); setFinalResult(null); try { console.log('[useWorkflow] Sending POST /api/workflow/start...'); const res = await fetch('/api/workflow/start', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(params), }); console.log('[useWorkflow] POST response status:', res.status); if (!res.ok) { const errorBody = await res.json().catch(() => ({})); console.error('[useWorkflow] Start failed:', errorBody); throw new Error(errorBody.error || `HTTP ${res.status}`); } const data: StartWorkflowResponse = await res.json(); console.log('[useWorkflow] Workflow started successfully. Response:', data); // Start listening console.log('[useWorkflow] Initiating SSE connection for requestId:', data.request_id); connectToWorkflow(data.request_id); return data; // Return response so UI can handle symbol normalization redirection } catch (e) { console.error('[useWorkflow] Exception in startWorkflow:', e); setStatus('error'); setError(e instanceof Error ? e.message : 'Failed to start workflow'); return undefined; } }, [connectToWorkflow]); // Cleanup on unmount useEffect(() => { return () => { if (eventSourceRef.current) { eventSourceRef.current.close(); } }; }, []); return { status, requestId, dag, taskStates, taskInfos, taskOutputs, error, finalResult, startWorkflow, connectToWorkflow, disconnect }; }