- Covered by data-persistence-service tests (db/api). - No references or compose entries.
203 lines
7.0 KiB
Python
203 lines
7.0 KiB
Python
"""
|
|
Generic Analysis Client for various analysis types using an OpenAI-compatible API
|
|
"""
|
|
import time
|
|
import json
|
|
import os
|
|
from typing import Dict, Optional
|
|
import openai
|
|
import string
|
|
|
|
|
|
class AnalysisClient:
|
|
"""Generic client for generating various types of analysis using an OpenAI-compatible API"""
|
|
|
|
def __init__(self, api_key: str, base_url: str, model: str):
|
|
"""Initialize OpenAI client with API key, base URL, and model"""
|
|
# Increase client timeout to allow long-running analysis (5 minutes)
|
|
self.client = openai.AsyncOpenAI(api_key=api_key, base_url=base_url, timeout=300.0)
|
|
self.model_name = model
|
|
|
|
async def generate_analysis(
|
|
self,
|
|
analysis_type: str,
|
|
company_name: str,
|
|
ts_code: str,
|
|
prompt_template: str,
|
|
financial_data: Optional[Dict] = None,
|
|
context: Optional[Dict] = None
|
|
) -> Dict:
|
|
"""
|
|
Generate analysis using OpenAI-compatible API (non-streaming)
|
|
|
|
Args:
|
|
analysis_type: Type of analysis (e.g., "fundamental_analysis")
|
|
company_name: Company name
|
|
ts_code: Stock code
|
|
prompt_template: Prompt template with placeholders
|
|
financial_data: Optional financial data for context
|
|
context: Optional dictionary with results from previous analyses
|
|
|
|
Returns:
|
|
Dict with analysis content and metadata
|
|
"""
|
|
start_time = time.perf_counter_ns()
|
|
|
|
# Build prompt from template
|
|
prompt = self._build_prompt(
|
|
prompt_template,
|
|
company_name,
|
|
ts_code,
|
|
financial_data,
|
|
context
|
|
)
|
|
|
|
# Call OpenAI-compatible API
|
|
try:
|
|
response = await self.client.chat.completions.create(
|
|
model=self.model_name,
|
|
messages=[{"role": "user", "content": prompt}],
|
|
timeout=300.0,
|
|
)
|
|
|
|
content = response.choices[0].message.content if response.choices else ""
|
|
usage = response.usage
|
|
|
|
elapsed_ms = int((time.perf_counter_ns() - start_time) / 1_000_000)
|
|
|
|
return {
|
|
"content": content,
|
|
"model": self.model_name,
|
|
"tokens": {
|
|
"prompt_tokens": usage.prompt_tokens if usage else 0,
|
|
"completion_tokens": usage.completion_tokens if usage else 0,
|
|
"total_tokens": usage.total_tokens if usage else 0,
|
|
} if usage else {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0},
|
|
"elapsed_ms": elapsed_ms,
|
|
"success": True,
|
|
"analysis_type": analysis_type,
|
|
}
|
|
except Exception as e:
|
|
elapsed_ms = int((time.perf_counter_ns() - start_time) / 1_000_000)
|
|
return {
|
|
"content": "",
|
|
"model": self.model_name,
|
|
"tokens": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0},
|
|
"elapsed_ms": elapsed_ms,
|
|
"success": False,
|
|
"error": str(e),
|
|
"analysis_type": analysis_type,
|
|
}
|
|
|
|
def _build_prompt(
|
|
self,
|
|
prompt_template: str,
|
|
company_name: str,
|
|
ts_code: str,
|
|
financial_data: Optional[Dict] = None,
|
|
context: Optional[Dict] = None
|
|
) -> str:
|
|
"""Build prompt from template by replacing placeholders"""
|
|
|
|
# Start with base placeholders
|
|
placeholders = {
|
|
"company_name": company_name,
|
|
"ts_code": ts_code,
|
|
}
|
|
|
|
# Add financial data if provided
|
|
financial_data_str = ""
|
|
if financial_data:
|
|
try:
|
|
financial_data_str = json.dumps(financial_data, ensure_ascii=False, indent=2)
|
|
except Exception:
|
|
financial_data_str = str(financial_data)
|
|
placeholders["financial_data"] = financial_data_str
|
|
|
|
# Add context from previous analysis steps
|
|
if context:
|
|
placeholders.update(context)
|
|
|
|
# Replace placeholders in template
|
|
# Use a custom formatter to handle missing keys gracefully
|
|
class SafeFormatter(string.Formatter):
|
|
def get_value(self, key, args, kwargs):
|
|
if isinstance(key, str):
|
|
return kwargs.get(key, f"{{{key}}}")
|
|
else:
|
|
return super().get_value(key, args, kwargs)
|
|
|
|
formatter = SafeFormatter()
|
|
prompt = formatter.format(prompt_template, **placeholders)
|
|
|
|
return prompt
|
|
|
|
async def generate_analysis_stream(
|
|
self,
|
|
analysis_type: str,
|
|
company_name: str,
|
|
ts_code: str,
|
|
prompt_template: str,
|
|
financial_data: Optional[Dict] = None,
|
|
context: Optional[Dict] = None
|
|
):
|
|
"""Yield analysis content chunks using OpenAI-compatible streaming API.
|
|
|
|
Yields plain text chunks as they arrive.
|
|
"""
|
|
# Build prompt
|
|
prompt = self._build_prompt(
|
|
prompt_template,
|
|
company_name,
|
|
ts_code,
|
|
financial_data,
|
|
context,
|
|
)
|
|
|
|
try:
|
|
stream = await self.client.chat.completions.create(
|
|
model=self.model_name,
|
|
messages=[{"role": "user", "content": prompt}],
|
|
stream=True,
|
|
timeout=300.0,
|
|
)
|
|
|
|
# The SDK yields events with incremental deltas
|
|
async for event in stream:
|
|
try:
|
|
choice = event.choices[0] if getattr(event, "choices", None) else None
|
|
delta = getattr(choice, "delta", None) if choice is not None else None
|
|
content = getattr(delta, "content", None) if delta is not None else None
|
|
if content:
|
|
yield content
|
|
except Exception:
|
|
# Best-effort: ignore malformed chunks
|
|
continue
|
|
except Exception as e:
|
|
# Emit error message to the stream so the client can surface it
|
|
yield f"\n\n[错误] {type(e).__name__}: {str(e)}\n"
|
|
|
|
|
|
def load_analysis_config() -> Dict:
|
|
"""Load analysis configuration from JSON file"""
|
|
# Get project root: backend/app/services -> project_root/config/analysis-config.json
|
|
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
|
|
config_path = os.path.join(project_root, "config", "analysis-config.json")
|
|
|
|
if not os.path.exists(config_path):
|
|
return {}
|
|
|
|
try:
|
|
with open(config_path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def get_analysis_config(analysis_type: str) -> Optional[Dict]:
|
|
"""Get configuration for a specific analysis type"""
|
|
config = load_analysis_config()
|
|
modules = config.get("analysis_modules", {})
|
|
return modules.get(analysis_type)
|
|
|