diff --git a/crates/workflow-context/src/vgcs.rs b/crates/workflow-context/src/vgcs.rs index 4a1d813..a6127ed 100644 --- a/crates/workflow-context/src/vgcs.rs +++ b/crates/workflow-context/src/vgcs.rs @@ -1,6 +1,6 @@ use std::path::{Path, PathBuf}; use std::fs::{self, File}; -use std::io::{Cursor, Read, Write, BufRead, BufReader}; +use std::io::{Cursor, Read, Write}; use anyhow::{Context, Result, anyhow}; use git2::{Repository, Oid, ObjectType, Signature, Index, IndexEntry, IndexTime}; use sha2::{Sha256, Digest}; diff --git a/docs/tasks/pending/20251129_refactor_remove_analysis_results.md b/docs/tasks/completed/20251129_refactor_remove_analysis_results.md similarity index 100% rename from docs/tasks/pending/20251129_refactor_remove_analysis_results.md rename to docs/tasks/completed/20251129_refactor_remove_analysis_results.md diff --git a/frontend/src/api/schema.gen.ts b/frontend/src/api/schema.gen.ts index b6d080a..2174866 100644 --- a/frontend/src/api/schema.gen.ts +++ b/frontend/src/api/schema.gen.ts @@ -33,17 +33,6 @@ export type LlmConfig = Partial<{ model_id: string | null; temperature: number | null; }>; -export type AnalysisResultDto = { - content: string; - created_at: string; - id: string; - meta_data: Value; - module_id: string; - request_id: string; - symbol: string; - template_id: string; -}; -export type Value = unknown; export type AnalysisTemplateSet = { modules: {}; name: string; @@ -212,18 +201,19 @@ export type TaskMetadata = Partial<{ execution_log_path: string | null; output_path: string | null; }>; +export type WorkflowHistoryDto = { + created_at: string; + end_time?: (string | null) | undefined; + market: string; + request_id: string; + snapshot_data: Value; + start_time: string; + status: string; + symbol: string; + template_id?: (string | null) | undefined; +}; +export type Value = unknown; -export const Value = z.unknown(); -export const AnalysisResultDto = z.object({ - content: z.string(), - created_at: z.string().datetime({ offset: true }), - id: z.string().uuid(), - meta_data: Value, - module_id: z.string(), - request_id: z.string().uuid(), - symbol: z.string(), - template_id: z.string(), -}); export const LlmConfig = z .object({ max_tokens: z.union([z.number(), z.null()]), @@ -310,6 +300,27 @@ export const DiscoverPreviewRequest = z.object({ api_base_url: z.string(), api_key: z.string(), }); +export const WorkflowHistorySummaryDto = z.object({ + end_time: z.union([z.string(), z.null()]).optional(), + market: z.string(), + request_id: z.string().uuid(), + start_time: z.string().datetime({ offset: true }), + status: z.string(), + symbol: z.string(), + template_id: z.union([z.string(), z.null()]).optional(), +}); +export const Value = z.unknown(); +export const WorkflowHistoryDto = z.object({ + created_at: z.string().datetime({ offset: true }), + end_time: z.union([z.string(), z.null()]).optional(), + market: z.string(), + request_id: z.string().uuid(), + snapshot_data: Value, + start_time: z.string().datetime({ offset: true }), + status: z.string(), + symbol: z.string(), + template_id: z.union([z.string(), z.null()]).optional(), +}); export const FieldType = z.enum(["Text", "Password", "Url", "Boolean", "Select"]); export const ConfigKey = z.enum([ "ApiKey", @@ -508,8 +519,6 @@ export const WorkflowEvent = z.union([ ]); export const schemas = { - Value, - AnalysisResultDto, LlmConfig, SelectionMode, ContextSelectorConfig, @@ -526,6 +535,9 @@ export const schemas = { TestConfigRequest, TestConnectionResponse, DiscoverPreviewRequest, + WorkflowHistorySummaryDto, + Value, + WorkflowHistoryDto, FieldType, ConfigKey, ConfigFieldSchema, @@ -550,41 +562,6 @@ export const schemas = { }; export const endpoints = makeApi([ - { - method: "get", - path: "/api/v1/analysis-results", - alias: "get_analysis_results_by_symbol", - requestFormat: "json", - parameters: [ - { - name: "symbol", - type: "Query", - schema: z.string().optional(), - }, - ], - response: z.array(AnalysisResultDto), - }, - { - method: "get", - path: "/api/v1/analysis-results/:id", - alias: "get_analysis_result_by_id", - requestFormat: "json", - parameters: [ - { - name: "id", - type: "Path", - schema: z.string().uuid(), - }, - ], - response: AnalysisResultDto, - errors: [ - { - status: 404, - description: `Not found`, - schema: z.void(), - }, - ], - }, { method: "get", path: "/api/v1/configs/analysis_template_sets", @@ -723,6 +700,46 @@ export const endpoints = makeApi([ }, ], }, + { + method: "get", + path: "/api/v1/history", + alias: "get_workflow_histories", + requestFormat: "json", + parameters: [ + { + name: "symbol", + type: "Query", + schema: z.string().optional(), + }, + { + name: "limit", + type: "Query", + schema: z.number().int().optional(), + }, + ], + response: z.array(WorkflowHistorySummaryDto), + }, + { + method: "get", + path: "/api/v1/history/:request_id", + alias: "get_workflow_history_by_id", + requestFormat: "json", + parameters: [ + { + name: "request_id", + type: "Path", + schema: z.string().uuid(), + }, + ], + response: WorkflowHistoryDto, + errors: [ + { + status: 404, + description: `History not found`, + schema: z.void(), + }, + ], + }, { method: "get", path: "/api/v1/registry/providers", diff --git a/frontend/src/components/RecentReportsDropdown.tsx b/frontend/src/components/RecentReportsDropdown.tsx index 3549e38..5ea17eb 100644 --- a/frontend/src/components/RecentReportsDropdown.tsx +++ b/frontend/src/components/RecentReportsDropdown.tsx @@ -10,32 +10,26 @@ import { DropdownMenuTrigger, } from '@/components/ui/dropdown-menu'; import { History, Loader2 } from 'lucide-react'; +import { WorkflowHistorySummaryDto } from '@/api/schema.gen'; +import { z } from 'zod'; +import { client } from '@/api/client'; -interface AnalysisResultSummary { - id: string; - request_id: string; - symbol: string; - template_id: string; - created_at: string; -} +type WorkflowHistorySummary = z.infer; export function RecentReportsDropdown() { - const [reports, setReports] = useState([]); + const [reports, setReports] = useState([]); const [loading, setLoading] = useState(false); const navigate = useNavigate(); const loadReports = async () => { setLoading(true); try { - // TEMPORARY: /api/v1/analysis-results removed for refactor - // const response = await fetch('/api/v1/analysis-results?limit=10'); - // if (response.ok) { - // const data = await response.json(); - // setReports(data); - // } - setReports([]); + // Using generated client to fetch history + const data = await client.get_workflow_histories({ queries: { limit: 10 } }); + setReports(data); } catch (e) { console.error("Failed to load reports", e); + setReports([]); } finally { setLoading(false); } @@ -62,16 +56,16 @@ export function RecentReportsDropdown() { reports.map((report) => ( navigate(`/report/${report.request_id}`)} + onClick={() => navigate(`/history/${report.request_id}`)} className="flex flex-col items-start gap-1 cursor-pointer py-3" >
- {report.symbol} + {report.symbol} ({report.market}) {new Date(report.start_time).toLocaleDateString()}
{report.template_id || 'Default'} - {report.status} + {report.status}
)) diff --git a/frontend/src/components/RecentWorkflowsList.tsx b/frontend/src/components/RecentWorkflowsList.tsx new file mode 100644 index 0000000..5f1c986 --- /dev/null +++ b/frontend/src/components/RecentWorkflowsList.tsx @@ -0,0 +1,129 @@ +import { useEffect, useState } from 'react'; +import { useNavigate } from 'react-router-dom'; +import { Card, CardContent, CardHeader, CardTitle, CardDescription } from "@/components/ui/card"; +import { Table, TableBody, TableCell, TableHead, TableHeader, TableRow } from "@/components/ui/table"; +import { Badge } from "@/components/ui/badge"; +import { Button } from "@/components/ui/button"; +import { Loader2, ArrowRight, History, RefreshCw } from "lucide-react"; +import { WorkflowHistorySummaryDto } from '@/api/schema.gen'; +import { z } from 'zod'; +import { client } from '@/api/client'; + +type WorkflowHistorySummary = z.infer; + +export function RecentWorkflowsList() { + const [history, setHistory] = useState([]); + const [loading, setLoading] = useState(false); + const navigate = useNavigate(); + + const fetchHistory = async () => { + setLoading(true); + try { + // Using generated client to fetch history + const data = await client.get_workflow_histories({ queries: { limit: 5 } }); + setHistory(data); + } catch (err) { + console.error("Failed to fetch history:", err); + } finally { + setLoading(false); + } + }; + + useEffect(() => { + fetchHistory(); + }, []); + + if (!loading && history.length === 0) { + return null; + } + + return ( + + +
+ + + Recent Analysis Reports + + + Your recently generated fundamental analysis reports. + +
+ +
+ + + + + Symbol + Market + Template + Status + Date + Action + + + + {loading && history.length === 0 ? ( + + + + + + ) : ( + history.map((item) => ( + navigate(`/history/${item.request_id}`)}> + {item.symbol} + {item.market} + {item.template_id || 'Default'} + + + + + {new Date(item.start_time).toLocaleString()} + + + + + + )) + )} + +
+
+
+ ); +} + +function StatusBadge({ status }: { status: string }) { + let variant: "default" | "destructive" | "outline" | "secondary" = "outline"; + let className = ""; + + switch (status.toLowerCase()) { + case 'completed': + variant = "default"; + className = "bg-green-600 hover:bg-green-700"; + break; + case 'failed': + variant = "destructive"; + break; + case 'running': + case 'pending': + variant = "secondary"; + className = "text-blue-600 bg-blue-100"; + break; + default: + variant = "outline"; + } + + return ( + + {status} + + ); +} + diff --git a/frontend/src/pages/Dashboard.tsx b/frontend/src/pages/Dashboard.tsx index 963385c..467ae30 100644 --- a/frontend/src/pages/Dashboard.tsx +++ b/frontend/src/pages/Dashboard.tsx @@ -12,6 +12,7 @@ import { client } from '@/api/client'; import { DataRequest } from '@/api/schema.gen'; import { z } from 'zod'; import { useToast } from "@/hooks/use-toast" +import { RecentWorkflowsList } from '@/components/RecentWorkflowsList'; type DataRequestDTO = z.infer; @@ -198,6 +199,8 @@ export function Dashboard() { + +
diff --git a/frontend/src/pages/HistoricalReportPage.tsx b/frontend/src/pages/HistoricalReportPage.tsx index 1c98bf3..618c1e9 100644 --- a/frontend/src/pages/HistoricalReportPage.tsx +++ b/frontend/src/pages/HistoricalReportPage.tsx @@ -60,19 +60,29 @@ export function HistoricalReportPage() { const res = await fetch(`/api/v1/workflow/snapshot/${id}`); if (res.ok) { const snapshot = await res.json(); - loadFromSnapshot(snapshot.data_payload); + + // Handle tagged enum wrapper (type/payload) if present + // The Backend serializes WorkflowEvent as { type: "...", payload: { ... } } + // But session data payload might be the raw JSONValue of that. + let rawPayload = snapshot.data_payload; + if (rawPayload && typeof rawPayload === 'object' && 'payload' in rawPayload && 'type' in rawPayload) { + rawPayload = rawPayload.payload; + } + + loadFromSnapshot(rawPayload); // Rehydrate content for completed analysis tasks - const payload = snapshot.data_payload; + const payload = rawPayload; if (payload.task_graph?.nodes) { payload.task_graph.nodes.forEach(async (node: TaskNode) => { const status = payload.tasks_status?.[node.id]; const outputCommit = payload.tasks_output?.[node.id]; // We need the output path to know what file to fetch. - // It should be injected into the config by the Orchestrator. + // It is stored in tasks_metadata by the Orchestrator. // @ts-ignore - const outputPath = node.config?.output_path; + const metadata = payload.tasks_metadata?.[node.id]; + const outputPath = metadata?.output_path; if (status === schemas.TaskStatus.enum.Completed && outputCommit && outputPath) { try { diff --git a/frontend/src/pages/ReportPage.tsx b/frontend/src/pages/ReportPage.tsx index cfe7d09..c85db7d 100644 --- a/frontend/src/pages/ReportPage.tsx +++ b/frontend/src/pages/ReportPage.tsx @@ -63,7 +63,14 @@ export function ReportPage() { const res = await fetch(`/api/v1/workflow/snapshot/${id}`); if (res.ok) { const snapshot = await res.json(); - loadFromSnapshot(snapshot.data_payload); + + // Handle tagged enum wrapper (type/payload) if present + let rawPayload = snapshot.data_payload; + if (rawPayload && typeof rawPayload === 'object' && 'payload' in rawPayload && 'type' in rawPayload) { + rawPayload = rawPayload.payload; + } + + loadFromSnapshot(rawPayload); } } catch (e) { console.warn("Snapshot load failed (normal for new tasks):", e); diff --git a/openapi.json b/openapi.json index d66b041..95d7995 100644 --- a/openapi.json +++ b/openapi.json @@ -9,77 +9,6 @@ "version": "0.1.0" }, "paths": { - "/api/v1/analysis-results": { - "get": { - "tags": [ - "api" - ], - "summary": "[GET /v1/analysis-results?symbol=...]", - "operationId": "get_analysis_results_by_symbol", - "parameters": [ - { - "name": "symbol", - "in": "query", - "description": "Optional symbol to filter results", - "required": false, - "schema": { - "type": "string" - } - } - ], - "responses": { - "200": { - "description": "List of analysis results", - "content": { - "application/json": { - "schema": { - "type": "array", - "items": { - "$ref": "#/components/schemas/AnalysisResultDto" - } - } - } - } - } - } - } - }, - "/api/v1/analysis-results/{id}": { - "get": { - "tags": [ - "api" - ], - "summary": "[GET /api/v1/analysis-results/:id]", - "operationId": "get_analysis_result_by_id", - "parameters": [ - { - "name": "id", - "in": "path", - "description": "Analysis result ID", - "required": true, - "schema": { - "type": "string", - "format": "uuid" - } - } - ], - "responses": { - "200": { - "description": "Analysis result", - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/AnalysisResultDto" - } - } - } - }, - "404": { - "description": "Not found" - } - } - } - }, "/api/v1/configs/analysis_template_sets": { "get": { "tags": [ @@ -343,6 +272,87 @@ } } }, + "/api/v1/history": { + "get": { + "tags": [ + "api" + ], + "summary": "[GET /v1/history]", + "operationId": "get_workflow_histories", + "parameters": [ + { + "name": "symbol", + "in": "query", + "description": "Filter by symbol", + "required": false, + "schema": { + "type": "string" + } + }, + { + "name": "limit", + "in": "query", + "description": "Limit number of results", + "required": false, + "schema": { + "type": "integer", + "format": "int64" + } + } + ], + "responses": { + "200": { + "description": "Workflow history summaries", + "content": { + "application/json": { + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/WorkflowHistorySummaryDto" + } + } + } + } + } + } + } + }, + "/api/v1/history/{request_id}": { + "get": { + "tags": [ + "api" + ], + "summary": "[GET /v1/history/:request_id]", + "operationId": "get_workflow_history_by_id", + "parameters": [ + { + "name": "request_id", + "in": "path", + "description": "Workflow Request ID", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + } + ], + "responses": { + "200": { + "description": "Workflow history details", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/WorkflowHistoryDto" + } + } + } + }, + "404": { + "description": "History not found" + } + } + } + }, "/api/v1/registry/providers": { "get": { "tags": [ @@ -531,50 +541,6 @@ }, "additionalProperties": false }, - "AnalysisResultDto": { - "type": "object", - "description": "Represents a persisted analysis result read from the database.", - "required": [ - "id", - "request_id", - "symbol", - "template_id", - "module_id", - "content", - "meta_data", - "created_at" - ], - "properties": { - "content": { - "type": "string" - }, - "created_at": { - "type": "string", - "format": "date-time" - }, - "id": { - "type": "string", - "format": "uuid" - }, - "meta_data": { - "$ref": "#/components/schemas/Value" - }, - "module_id": { - "type": "string" - }, - "request_id": { - "type": "string", - "format": "uuid" - }, - "symbol": { - "type": "string" - }, - "template_id": { - "type": "string" - } - }, - "additionalProperties": false - }, "AnalysisTemplateSet": { "type": "object", "description": "A single, self-contained set of analysis modules representing a complete workflow.\ne.g., \"Standard Fundamental Analysis\"", @@ -1617,6 +1583,101 @@ } ], "description": "Unified event stream for frontend consumption." + }, + "WorkflowHistoryDto": { + "type": "object", + "required": [ + "request_id", + "symbol", + "market", + "status", + "start_time", + "snapshot_data", + "created_at" + ], + "properties": { + "created_at": { + "type": "string", + "format": "date-time" + }, + "end_time": { + "type": [ + "string", + "null" + ], + "format": "date-time" + }, + "market": { + "type": "string" + }, + "request_id": { + "type": "string", + "format": "uuid" + }, + "snapshot_data": { + "$ref": "#/components/schemas/Value" + }, + "start_time": { + "type": "string", + "format": "date-time" + }, + "status": { + "type": "string" + }, + "symbol": { + "type": "string" + }, + "template_id": { + "type": [ + "string", + "null" + ] + } + }, + "additionalProperties": false + }, + "WorkflowHistorySummaryDto": { + "type": "object", + "required": [ + "request_id", + "symbol", + "market", + "status", + "start_time" + ], + "properties": { + "end_time": { + "type": [ + "string", + "null" + ], + "format": "date-time" + }, + "market": { + "type": "string" + }, + "request_id": { + "type": "string", + "format": "uuid" + }, + "start_time": { + "type": "string", + "format": "date-time" + }, + "status": { + "type": "string" + }, + "symbol": { + "type": "string" + }, + "template_id": { + "type": [ + "string", + "null" + ] + } + }, + "additionalProperties": false } } }, diff --git a/ref/service_kit_mirror/service_kit/service_kit/service-kit-macros/src/lib.rs b/ref/service_kit_mirror/service_kit/service_kit/service-kit-macros/src/lib.rs index e63024e..3732e19 100644 --- a/ref/service_kit_mirror/service_kit/service_kit/service-kit-macros/src/lib.rs +++ b/ref/service_kit_mirror/service_kit/service_kit/service-kit-macros/src/lib.rs @@ -167,7 +167,7 @@ pub fn api(args: TokenStream, input: TokenStream) -> TokenStream { // Extract each element let mut extracted_vars = Vec::new(); - for (idx, (elem_pat, elem_ty)) in tuple_pat.elems.iter().zip(inner_tuple_types.iter()).enumerate() { + for (_idx, (elem_pat, elem_ty)) in tuple_pat.elems.iter().zip(inner_tuple_types.iter()).enumerate() { if let Pat::Ident(pat_ident) = elem_pat { let param_name = pat_ident.ident.to_string(); let type_name = type_to_string(elem_ty); diff --git a/services/alphavantage-provider-service/Cargo.toml b/services/alphavantage-provider-service/Cargo.toml index 60bf274..76f88b8 100644 --- a/services/alphavantage-provider-service/Cargo.toml +++ b/services/alphavantage-provider-service/Cargo.toml @@ -1,6 +1,7 @@ [package] name = "alphavantage-provider-service" version = "0.1.0" +default-run = "alphavantage-provider-service" edition = "2021" [dependencies] diff --git a/services/alphavantage-provider-service/src/av_client.rs b/services/alphavantage-provider-service/src/av_client.rs index 415f861..f07f65c 100644 --- a/services/alphavantage-provider-service/src/av_client.rs +++ b/services/alphavantage-provider-service/src/av_client.rs @@ -32,18 +32,6 @@ impl AvClient { Ok(Self { service: running }) } - pub async fn connect_with_bearer(mcp_endpoint_url: &str, bearer_token: &str) -> Result { - let config = StreamableHttpClientTransportConfig::with_uri(mcp_endpoint_url.to_string()) - .auth_header(bearer_token.to_string()); - let transport = StreamableHttpClientTransport::with_client(CustomHttpClient::new(), config); - let running = DummyClientHandler - ::default() - .serve(transport) - .await - .map_err(|e| AppError::Configuration(format!("Fail to init MCP service: {e:?}")))?; - Ok(Self { service: running }) - } - pub async fn query(&self, function: &str, params: &[(&str, &str)]) -> Result { let mut args = Map::new(); for (k, v) in params { diff --git a/services/alphavantage-provider-service/src/config.rs b/services/alphavantage-provider-service/src/config.rs index 8437510..8a7aa5c 100644 --- a/services/alphavantage-provider-service/src/config.rs +++ b/services/alphavantage-provider-service/src/config.rs @@ -1,6 +1,7 @@ use serde::Deserialize; #[derive(Debug, Deserialize, Clone)] +#[allow(dead_code)] pub struct AppConfig { pub server_port: u16, pub nats_addr: String, @@ -13,6 +14,7 @@ pub struct AppConfig { } impl AppConfig { + #[allow(dead_code)] pub fn load() -> Result { let config = config::Config::builder() .add_source(config::Environment::default().separator("__")) diff --git a/services/alphavantage-provider-service/src/error.rs b/services/alphavantage-provider-service/src/error.rs index bd4a2d0..63ce4d9 100644 --- a/services/alphavantage-provider-service/src/error.rs +++ b/services/alphavantage-provider-service/src/error.rs @@ -10,9 +10,11 @@ pub enum AppError { DataParsing(#[from] anyhow::Error), #[error("Internal error: {0}")] + #[allow(dead_code)] Internal(String), #[error("Provider not available: {0}")] + #[allow(dead_code)] ProviderNotAvailable(String), #[error(transparent)] diff --git a/services/alphavantage-provider-service/src/worker.rs b/services/alphavantage-provider-service/src/worker.rs index 1f87262..7d7b9e8 100644 --- a/services/alphavantage-provider-service/src/worker.rs +++ b/services/alphavantage-provider-service/src/worker.rs @@ -1,9 +1,8 @@ use crate::error::{Result, AppError}; use crate::mapping::{CombinedFinancials, parse_company_profile, parse_financials, parse_realtime_quote}; use common_contracts::persistence_client::PersistenceClient; -use common_contracts::dtos::{SessionDataDto, ProviderCacheDto, TimeSeriesFinancialDto, CompanyProfileDto}; +use common_contracts::dtos::{ProviderCacheDto, SessionDataDto}; use crate::state::{AppState, TaskStore}; -use anyhow::Context; use chrono::{Utc, Datelike, Duration}; use common_contracts::messages::{FetchCompanyDataCommand, FinancialsPersistedEvent, DataFetchFailedEvent}; use common_contracts::observability::{TaskProgress, ObservabilityTaskStatus}; diff --git a/services/api-gateway/src/api.rs b/services/api-gateway/src/api.rs index 1f59abb..d7532f6 100644 --- a/services/api-gateway/src/api.rs +++ b/services/api-gateway/src/api.rs @@ -439,11 +439,22 @@ async fn get_workflow_snapshot( State(state): State, Path(request_id): Path, ) -> Result { - let snapshots = state.persistence_client.get_session_data(request_id, None, None).await?; + // Note: The persistence service currently returns ALL session data for a request_id + // and ignores the query params. We must filter manually here until persistence service is updated. + let snapshots = state.persistence_client.get_session_data(request_id, Some("orchestrator"), Some("workflow_snapshot")).await?; - if let Some(snapshot) = snapshots.into_iter().next() { + info!("get_workflow_snapshot: retrieved {} records for {}", snapshots.len(), request_id); + + // Filter for the correct snapshot + let snapshot = snapshots.into_iter().find(|s| { + info!("Checking record: provider={}, data_type={}", s.provider, s.data_type); + s.provider == "orchestrator" && s.data_type == "workflow_snapshot" + }); + + if let Some(snapshot) = snapshot { Ok((StatusCode::OK, Json(snapshot)).into_response()) } else { + warn!("Snapshot not found for {}", request_id); Ok(( StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "Snapshot not found"})), diff --git a/services/common-contracts/src/workflow_runner.rs b/services/common-contracts/src/workflow_runner.rs index b24c197..04bc4f8 100644 --- a/services/common-contracts/src/workflow_runner.rs +++ b/services/common-contracts/src/workflow_runner.rs @@ -57,6 +57,7 @@ impl WorkflowNodeRunner { let exec_result_artifacts = exec_result.artifacts; let report_md_clone = report_md.clone(); let symbol = cmd.config.get("symbol").and_then(|s| s.as_str()).unwrap_or("unknown").to_string(); + let symbol_for_blocking = symbol.clone(); // We also want to generate an execution log (basic one for now) // In future, we might want to capture logs during execute() @@ -66,7 +67,7 @@ impl WorkflowNodeRunner { let mut ctx = WorkerContext::new(&root_path_clone, &req_id_clone, &base_commit_clone); // Define output directory convention - let base_dir = format!("raw/{}/{}", node_clone.node_type(), symbol); + let base_dir = format!("raw/{}/{}", node_clone.node_type(), symbol_for_blocking); // Write Artifacts for (filename, content) in exec_result_artifacts { @@ -105,6 +106,15 @@ impl WorkflowNodeRunner { }; self.publish_common(&cmd.request_id, stream_event).await?; + // 5.1 Update Meta Summary with Paths + let mut summary = exec_result.meta_summary.clone().unwrap_or(serde_json::json!({})); + if let Some(obj) = summary.as_object_mut() { + // Reconstruct paths used in VGCS block (must match) + let base_dir = format!("raw/{}/{}", node.node_type(), symbol); + obj.insert("output_path".to_string(), serde_json::Value::String(format!("{}/report.md", base_dir))); + obj.insert("execution_log_path".to_string(), serde_json::Value::String(format!("{}/_execution.md", base_dir))); + } + // 6. Publish Completion Event let event = WorkflowTaskEvent { request_id: cmd.request_id, @@ -113,7 +123,7 @@ impl WorkflowNodeRunner { result: Some(TaskResult { new_commit: Some(new_commit), error: None, - summary: exec_result.meta_summary, + summary: Some(summary), }), }; self.publish_event(event).await?; diff --git a/services/data-persistence-service/src/api/provider_cache.rs b/services/data-persistence-service/src/api/provider_cache.rs index 6c17b12..461e257 100644 --- a/services/data-persistence-service/src/api/provider_cache.rs +++ b/services/data-persistence-service/src/api/provider_cache.rs @@ -9,7 +9,6 @@ use axum::{ }; use common_contracts::dtos::ProviderCacheDto; use service_kit::api; -use tracing::info; use anyhow::Error as AnyhowError; use serde::Deserialize; diff --git a/services/data-persistence-service/src/db/provider_cache.rs b/services/data-persistence-service/src/db/provider_cache.rs index 9a7b165..735464c 100644 --- a/services/data-persistence-service/src/db/provider_cache.rs +++ b/services/data-persistence-service/src/db/provider_cache.rs @@ -1,6 +1,6 @@ use common_contracts::dtos::ProviderCacheDto; use sqlx::PgPool; -use chrono::{Utc, DateTime}; +use chrono::Utc; pub async fn get_cache( pool: &PgPool, diff --git a/services/finnhub-provider-service/src/finnhub.rs b/services/finnhub-provider-service/src/finnhub.rs index 0435d0d..852a68a 100644 --- a/services/finnhub-provider-service/src/finnhub.rs +++ b/services/finnhub-provider-service/src/finnhub.rs @@ -11,6 +11,7 @@ use tokio; #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] +#[allow(dead_code)] pub struct FinnhubProfile { pub country: Option, pub currency: Option, @@ -28,6 +29,7 @@ pub struct FinnhubProfile { #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] +#[allow(dead_code)] pub struct FinnhubFinancialsReported { pub data: Vec, pub symbol: String, @@ -35,6 +37,7 @@ pub struct FinnhubFinancialsReported { #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] +#[allow(dead_code)] pub struct AnnualReport { pub year: u16, pub start_date: String, diff --git a/services/finnhub-provider-service/src/message_consumer.rs b/services/finnhub-provider-service/src/message_consumer.rs index f9c55a4..f556e86 100644 --- a/services/finnhub-provider-service/src/message_consumer.rs +++ b/services/finnhub-provider-service/src/message_consumer.rs @@ -1,10 +1,8 @@ use crate::error::Result; use crate::state::{AppState, ServiceOperationalStatus}; use common_contracts::messages::FetchCompanyDataCommand; -use common_contracts::observability::ObservabilityTaskStatus; use common_contracts::subjects::NatsSubject; use futures_util::StreamExt; -use std::sync::Arc; use std::time::Duration; use tracing::{error, info, warn}; @@ -89,49 +87,3 @@ async fn subscribe_and_process(state: AppState, client: async_nats::Client) -> R Ok(()) } -pub async fn subscribe_to_data_commands(app_state: Arc, nats_client: async_nats::Client) -> Result<()> { - let mut subscriber = nats_client.subscribe(NatsSubject::DataFetchCommands.to_string()).await?; - - while let Some(message) = subscriber.next().await { - let command: FetchCompanyDataCommand = match serde_json::from_slice(&message.payload) { - Ok(c) => c, - Err(e) => { - error!("Failed to deserialize message: {}", e); - continue; - } - }; - let task_id = command.request_id; - - if command.market.to_uppercase() == "CN" { - info!( - "Skipping command for symbol '{}' as its market ('{}') is 'CN'.", - command.symbol, command.market - ); - continue; - } - - app_state.tasks.insert(task_id, common_contracts::observability::TaskProgress { - request_id: task_id, - task_name: format!("finnhub:{}", command.symbol), - status: ObservabilityTaskStatus::Queued, - progress_percent: 0, - details: "Command received".to_string(), - started_at: chrono::Utc::now(), - }); - - // Spawn the workflow in a separate task - let workflow_state = app_state.clone(); - let publisher_clone = nats_client.clone(); - tokio::spawn(async move { - let state_owned = (*workflow_state).clone(); - let result = crate::worker::handle_fetch_command(state_owned, command, publisher_clone).await; - if let Err(e) = result { - error!( - "Error executing Finnhub workflow for task {}: {:?}", - task_id, e - ); - } - }); - } - Ok(()) -} diff --git a/services/mock-provider-service/src/config.rs b/services/mock-provider-service/src/config.rs index f47625f..01ca013 100644 --- a/services/mock-provider-service/src/config.rs +++ b/services/mock-provider-service/src/config.rs @@ -1,6 +1,5 @@ use serde::Deserialize; use config::Config; -use std::env; #[derive(Debug, Deserialize, Clone)] pub struct AppConfig { diff --git a/services/mock-provider-service/src/error.rs b/services/mock-provider-service/src/error.rs index 76be3f0..498cac2 100644 --- a/services/mock-provider-service/src/error.rs +++ b/services/mock-provider-service/src/error.rs @@ -10,8 +10,8 @@ pub enum AppError { Nats(#[from] async_nats::Error), #[error("Data parsing error: {0}")] DataParsing(#[from] anyhow::Error), - #[error("Unknown error: {0}")] - Unknown(String), + // #[error("Unknown error: {0}")] + // Unknown(String), } pub type Result = std::result::Result; diff --git a/services/mock-provider-service/src/worker.rs b/services/mock-provider-service/src/worker.rs index 2922dcd..6aa6812 100644 --- a/services/mock-provider-service/src/worker.rs +++ b/services/mock-provider-service/src/worker.rs @@ -1,7 +1,7 @@ use anyhow::Result; use tracing::{info, error}; use common_contracts::workflow_types::WorkflowTaskCommand; -use common_contracts::subjects::{NatsSubject, SubjectMessage}; +use common_contracts::subjects::NatsSubject; use crate::state::AppState; use futures_util::StreamExt; use std::sync::Arc; diff --git a/services/mock-provider-service/src/workflow_adapter.rs b/services/mock-provider-service/src/workflow_adapter.rs index 526d253..7c99ca1 100644 --- a/services/mock-provider-service/src/workflow_adapter.rs +++ b/services/mock-provider-service/src/workflow_adapter.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; + use async_trait::async_trait; use anyhow::{Result, anyhow}; use serde_json::{json, Value}; @@ -11,6 +11,7 @@ use common_contracts::dtos::{CompanyProfileDto, TimeSeriesFinancialDto}; use crate::state::AppState; pub struct MockNode { + #[allow(dead_code)] state: AppState, } diff --git a/services/report-generator-service/src/config.rs b/services/report-generator-service/src/config.rs index dc423a3..2dc09f3 100644 --- a/services/report-generator-service/src/config.rs +++ b/services/report-generator-service/src/config.rs @@ -8,6 +8,7 @@ pub struct AppConfig { pub workflow_data_path: String, } +#[allow(dead_code)] fn default_workflow_data_path() -> String { "/app/data".to_string() } diff --git a/services/report-generator-service/src/error.rs b/services/report-generator-service/src/error.rs index 2596c3a..701cd5f 100644 --- a/services/report-generator-service/src/error.rs +++ b/services/report-generator-service/src/error.rs @@ -16,9 +16,9 @@ pub enum ProviderError { #[error("Configuration error: {0}")] Configuration(String), - #[error("Persistence client error: {0}")] - Persistence(String), + // #[error("Persistence client error: {0}")] + // Persistence(String), - #[error("Internal error: {0}")] + #[error(transparent)] Internal(#[from] anyhow::Error), } diff --git a/services/report-generator-service/src/persistence.rs b/services/report-generator-service/src/persistence.rs index c45e89b..311eb65 100644 --- a/services/report-generator-service/src/persistence.rs +++ b/services/report-generator-service/src/persistence.rs @@ -5,13 +5,7 @@ //! use crate::error::Result; -use common_contracts::{ - config_models::{AnalysisTemplateSets, LlmProvidersConfig}, - dtos::{ - CompanyProfileDto, RealtimeQuoteDto, SessionDataDto, - TimeSeriesFinancialBatchDto, TimeSeriesFinancialDto, - }, -}; +use common_contracts::config_models::LlmProvidersConfig; use tracing::info; #[derive(Clone)] @@ -28,63 +22,6 @@ impl PersistenceClient { } } - pub async fn get_company_profile(&self, symbol: &str) -> Result { - let url = format!("{}/companies/{}", self.base_url, symbol); - info!("Fetching company profile for {} from {}", symbol, url); - let dto = self - .client - .get(&url) - .send() - .await? - .error_for_status()? - .json::() - .await?; - Ok(dto) - } - - pub async fn get_financial_statements( - &self, - symbol: &str, - ) -> Result> { - let url = format!("{}/market-data/financial-statements/{}", self.base_url, symbol); - info!("Fetching financials for {} from {}", symbol, url); - let dtos = self - .client - .get(&url) - .send() - .await? - .error_for_status()? - .json::>() - .await?; - Ok(dtos) - } - - pub async fn get_session_data(&self, request_id: uuid::Uuid) -> Result> { - let url = format!("{}/session-data/{}", self.base_url, request_id); - info!("Fetching session data for {} from {}", request_id, url); - let dtos = self - .client - .get(&url) - .send() - .await? - .error_for_status()? - .json::>() - .await?; - Ok(dtos) - } - - pub async fn insert_session_data(&self, data: &SessionDataDto) -> Result<()> { - let url = format!("{}/session-data", self.base_url); - info!("Inserting session data for {} to {}", data.request_id, url); - self.client - .post(&url) - .json(data) - .send() - .await? - .error_for_status()?; - Ok(()) - } - // --- Config Fetching & Updating Methods --- pub async fn get_llm_providers_config(&self) -> Result { @@ -101,92 +38,4 @@ impl PersistenceClient { Ok(config) } - pub async fn update_llm_providers_config(&self, config: &LlmProvidersConfig) -> Result<()> { - let url = format!("{}/configs/llm_providers", self.base_url); - info!("Updating LLM providers config to {}", url); - self.client - .put(&url) - .json(config) - .send() - .await? - .error_for_status()?; - Ok(()) - } - - pub async fn get_analysis_template_sets(&self) -> Result { - let url = format!("{}/configs/analysis_template_sets", self.base_url); - info!("Fetching analysis template sets from {}", url); - let config = self - .client - .get(&url) - .send() - .await? - .error_for_status()? - .json::() - .await?; - Ok(config) - } - - pub async fn update_analysis_template_sets(&self, templates: &AnalysisTemplateSets) -> Result<()> { - let url = format!("{}/configs/analysis_template_sets", self.base_url); - info!("Updating analysis template sets to {}", url); - self.client - .put(&url) - .json(templates) - .send() - .await? - .error_for_status()?; - Ok(()) - } - - // --- Data Writing Methods --- - - pub async fn upsert_company_profile(&self, profile: CompanyProfileDto) -> Result<()> { - let url = format!("{}/companies", self.base_url); - info!("Upserting company profile for {} to {}", profile.symbol, url); - self.client - .put(&url) - .json(&profile) - .send() - .await? - .error_for_status()?; - Ok(()) - } - - pub async fn upsert_realtime_quote(&self, quote: RealtimeQuoteDto) -> Result<()> { - let url = format!("{}/market-data/quotes", self.base_url); - info!("Upserting realtime quote for {} to {}", quote.symbol, url); - self.client - .post(&url) - .json("e) - .send() - .await? - .error_for_status()?; - Ok(()) - } - - pub async fn batch_insert_financials(&self, dtos: Vec) -> Result<()> { - if dtos.is_empty() { - return Ok(()); - } - let url = format!("{}/market-data/financials/batch", self.base_url); - let symbol = dtos[0].symbol.clone(); - info!( - "Batch inserting {} financial statements for {} to {}", - dtos.len(), - symbol, - url - ); - - let batch = TimeSeriesFinancialBatchDto { records: dtos }; - - self.client - .post(&url) - .json(&batch) - .send() - .await? - .error_for_status()?; - - Ok(()) - } } diff --git a/services/report-generator-service/src/state.rs b/services/report-generator-service/src/state.rs index 54dd653..8a42efe 100644 --- a/services/report-generator-service/src/state.rs +++ b/services/report-generator-service/src/state.rs @@ -14,6 +14,7 @@ pub struct AppState { pub tasks: Arc>, pub streams: Arc, pub config: Arc, + #[allow(dead_code)] pub tera: Arc, pub nats: async_nats::Client, } @@ -43,6 +44,7 @@ impl StreamManager { }).value().clone() } + #[allow(dead_code)] pub fn remove_channel(&self, request_id: &Uuid) { self.channels.remove(request_id); } diff --git a/services/report-generator-service/src/templates.rs b/services/report-generator-service/src/templates.rs index 51d717b..a2e3b05 100644 --- a/services/report-generator-service/src/templates.rs +++ b/services/report-generator-service/src/templates.rs @@ -10,6 +10,7 @@ pub fn load_tera() -> Tera { tera } +#[allow(dead_code)] pub fn render_prompt( tera: &Tera, template_name: &str, diff --git a/services/tushare-provider-service/src/error.rs b/services/tushare-provider-service/src/error.rs index 6b6a848..fdeef5d 100644 --- a/services/tushare-provider-service/src/error.rs +++ b/services/tushare-provider-service/src/error.rs @@ -15,8 +15,8 @@ pub enum AppError { #[error("Internal error: {0}")] Internal(String), - #[error("Provider not available: {0}")] - ProviderNotAvailable(String), + // #[error("Provider not available: {0}")] + // ProviderNotAvailable(String), #[error(transparent)] Reqwest(#[from] ReqwestError), diff --git a/services/tushare-provider-service/src/mapping.rs b/services/tushare-provider-service/src/mapping.rs index d0baac3..9676279 100644 --- a/services/tushare-provider-service/src/mapping.rs +++ b/services/tushare-provider-service/src/mapping.rs @@ -19,6 +19,7 @@ pub struct TushareFinancials { pub income: Vec, pub cashflow: Vec, pub fina_indicator: Vec, + #[allow(dead_code)] pub repurchase: Vec, pub dividend: Vec, pub stk_holdernumber: Vec, diff --git a/services/tushare-provider-service/src/tushare.rs b/services/tushare-provider-service/src/tushare.rs index 137083c..0f62abd 100644 --- a/services/tushare-provider-service/src/tushare.rs +++ b/services/tushare-provider-service/src/tushare.rs @@ -194,6 +194,7 @@ impl TushareDataProvider { } #[derive(Deserialize, Debug, Clone)] +#[allow(dead_code)] pub struct StockBasic { pub ts_code: String, pub symbol: Option, @@ -206,6 +207,7 @@ pub struct StockBasic { } #[derive(Deserialize, Debug, Clone)] +#[allow(dead_code)] pub struct StockCompany { pub ts_code: String, pub exchange: Option, @@ -226,6 +228,7 @@ pub struct StockCompany { } #[derive(Deserialize, Debug, Clone)] +#[allow(dead_code)] pub struct BalanceSheet { pub ts_code: String, pub ann_date: Option, @@ -252,6 +255,7 @@ pub struct BalanceSheet { } #[derive(Deserialize, Debug, Clone)] +#[allow(dead_code)] pub struct Income { pub ts_code: String, pub ann_date: Option, @@ -274,6 +278,7 @@ pub struct Income { } #[derive(Deserialize, Debug, Clone)] +#[allow(dead_code)] pub struct Cashflow { pub ts_code: String, pub ann_date: Option, @@ -286,6 +291,7 @@ pub struct Cashflow { } #[derive(Deserialize, Debug, Clone)] +#[allow(dead_code)] pub struct FinaIndicator { pub ts_code: String, pub end_date: Option, @@ -305,6 +311,7 @@ pub struct FinaIndicator { } #[derive(Deserialize, Debug, Clone)] +#[allow(dead_code)] pub struct Repurchase { pub ts_code: String, pub ann_date: Option, @@ -316,6 +323,7 @@ pub struct Repurchase { } #[derive(Deserialize, Debug, Clone)] +#[allow(dead_code)] pub struct Dividend { pub ts_code: String, pub end_date: Option, @@ -325,6 +333,7 @@ pub struct Dividend { } #[derive(Deserialize, Debug, Clone)] +#[allow(dead_code)] pub struct StkHolderNumber { pub ts_code: String, pub ann_date: Option, diff --git a/services/tushare-provider-service/src/workflow_adapter.rs b/services/tushare-provider-service/src/workflow_adapter.rs index 6ef4881..b17026b 100644 --- a/services/tushare-provider-service/src/workflow_adapter.rs +++ b/services/tushare-provider-service/src/workflow_adapter.rs @@ -1,4 +1,3 @@ -use std::sync::Arc; use async_trait::async_trait; use anyhow::{Result, anyhow, Context}; use serde_json::{json, Value}; diff --git a/services/workflow-orchestrator-service/src/llm_client.rs b/services/workflow-orchestrator-service/src/llm_client.rs index 162d9a7..de37a97 100644 --- a/services/workflow-orchestrator-service/src/llm_client.rs +++ b/services/workflow-orchestrator-service/src/llm_client.rs @@ -1,7 +1,7 @@ use anyhow::{Result, anyhow}; use serde_json::{json, Value}; use std::time::Duration; -use tracing::{debug, error, info}; +use tracing::debug; pub struct LlmClient { http_client: reqwest::Client, diff --git a/services/workflow-orchestrator-service/src/workflow.rs b/services/workflow-orchestrator-service/src/workflow.rs index 3465032..eacb71e 100644 --- a/services/workflow-orchestrator-service/src/workflow.rs +++ b/services/workflow-orchestrator-service/src/workflow.rs @@ -288,7 +288,7 @@ impl WorkflowEngine { }; // 2. Save New Workflow History - let start_time = dag.start_time; // We need to track start time in DAG or pass it + let _start_time = dag.start_time; // We need to track start time in DAG or pass it // For now, let's approximate or fetch if available. // Actually, DAG doesn't track start time yet. We should probably add it. // As a workaround, use now - X, or just now if we don't care about precision. diff --git a/services/yfinance-provider-service/src/workflow_adapter.rs b/services/yfinance-provider-service/src/workflow_adapter.rs index 50671d4..cbf559f 100644 --- a/services/yfinance-provider-service/src/workflow_adapter.rs +++ b/services/yfinance-provider-service/src/workflow_adapter.rs @@ -1,4 +1,3 @@ -use std::sync::Arc; use async_trait::async_trait; use anyhow::{Result, anyhow, Context}; use serde_json::{json, Value};