Fundamental_Analysis/archive/python/backend/app/services/company_profile_client.py
Lv, Qi a6cca48fed chore(cleanup): remove redundant data-distance-service stub tests
- Covered by data-persistence-service tests (db/api).
- No references or compose entries.
2025-11-16 20:52:26 +08:00

160 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
OpenAI-compatible API Client for company profile generation
"""
import time
from typing import Dict, List, Optional
import openai
class CompanyProfileClient:
def __init__(self, api_key: str, base_url: str, model: str = "gemini-1.5-flash"):
"""Initialize OpenAI client with API key, base_url and model"""
self.client = openai.AsyncOpenAI(api_key=api_key, base_url=base_url)
self.model_name = model
async def generate_profile(
self,
company_name: str,
ts_code: str,
financial_data: Optional[Dict] = None
) -> Dict:
"""
Generate company profile using OpenAI-compatible API (non-streaming)
Args:
company_name: Company name
ts_code: Stock code
financial_data: Optional financial data for context
Returns:
Dict with profile content and metadata
"""
start_time = time.perf_counter_ns()
# Build prompt
prompt = self._build_prompt(company_name, ts_code, financial_data)
# Call OpenAI-compatible API
try:
response = await self.client.chat.completions.create(
model=self.model_name,
messages=[{"role": "user", "content": prompt}],
)
content = response.choices[0].message.content if response.choices else ""
usage = response.usage
elapsed_ms = int((time.perf_counter_ns() - start_time) / 1_000_000)
return {
"content": content,
"model": self.model_name,
"tokens": {
"prompt_tokens": usage.prompt_tokens if usage else 0,
"completion_tokens": usage.completion_tokens if usage else 0,
"total_tokens": usage.total_tokens if usage else 0,
} if usage else {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0},
"elapsed_ms": elapsed_ms,
"success": True,
}
except Exception as e:
elapsed_ms = int((time.perf_counter_ns() - start_time) / 1_000_000)
return {
"content": "",
"model": self.model_name,
"tokens": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0},
"elapsed_ms": elapsed_ms,
"success": False,
"error": str(e),
}
def generate_profile_stream(
self,
company_name: str,
ts_code: str,
financial_data: Optional[Dict] = None
):
"""
Generate company profile using Gemini API with streaming
Args:
company_name: Company name
ts_code: Stock code
financial_data: Optional financial data for context
Yields:
Chunks of generated content
"""
import logging
logger = logging.getLogger(__name__)
logger.info(f"[CompanyProfile] Starting stream generation for {company_name} ({ts_code})")
# Build prompt
prompt = self._build_prompt(company_name, ts_code, financial_data)
logger.info(f"[CompanyProfile] Prompt built, length: {len(prompt)} chars")
# Call Gemini API with streaming
try:
logger.info("[CompanyProfile] Calling Gemini API with stream=True")
# Generate streaming response (sync call, but yields chunks)
response_stream = self.model.generate_content(prompt, stream=True)
logger.info("[CompanyProfile] Gemini API stream object created")
chunk_count = 0
# Stream chunks
logger.info("[CompanyProfile] Starting to iterate response stream")
for chunk in response_stream:
logger.info(f"[CompanyProfile] Received chunk from Gemini, has text: {hasattr(chunk, 'text')}")
if hasattr(chunk, 'text') and chunk.text:
chunk_count += 1
text_len = len(chunk.text)
logger.info(f"[CompanyProfile] Chunk {chunk_count}: {text_len} chars")
yield chunk.text
else:
logger.warning(f"[CompanyProfile] Chunk has no text attribute or empty, chunk: {chunk}")
logger.info(f"[CompanyProfile] Stream iteration completed. Total chunks: {chunk_count}")
except Exception as e:
logger.error(f"[CompanyProfile] Error during streaming: {type(e).__name__}: {str(e)}", exc_info=True)
yield f"\n\n---\n\n**错误**: {type(e).__name__}: {str(e)}"
def _build_prompt(self, company_name: str, ts_code: str, financial_data: Optional[Dict] = None) -> str:
"""Build prompt for company profile generation"""
prompt = f"""您是一位专业的证券市场分析师。请为公司 {company_name} (股票代码: {ts_code}) 生成一份详细且专业的公司介绍。开头不要自我介绍直接开始正文。正文用MarkDown输出尽量说明信息来源用斜体显示信息来源。在生成内容时请严格遵循以下要求并采用清晰、结构化的格式
1. **公司概览**:
* 简要介绍公司的性质、核心业务领域及其在行业中的定位。
* 提炼并阐述公司的核心价值理念。
2. **主营业务**:
* 详细描述公司主要的**产品或服务**。
* **重要提示**:如果能获取到公司最新的官方**年报**或**财务报告**,请从中提取各主要产品/服务线的**收入金额**和其占公司总收入的**百分比**。请**明确标注数据来源**(例如:"数据来源于XX年年度报告")。
* **严格禁止**编造或估算任何财务数据。若无法找到公开、准确的财务数据,请**不要**在这一点中提及具体金额或比例,仅描述业务内容。
3. **发展历程**:
* 以时间线或关键事件的形式,概述公司自成立以来的主要**里程碑事件**、重大发展阶段、战略转型或重要成就。
4. **核心团队**:
* 介绍公司**主要管理层和核心技术团队成员**。
* 对于每位核心成员,提供其**职务、主要工作履历、教育背景**。
* 如果公开可查,可补充其**出生年份**。
5. **供应链**:
* 描述公司的**主要原材料、部件或服务来源**。
* 如果公开信息中包含,请列出**主要供应商名称**,并**明确其在总采购金额中的大致占比**。若无此数据,则仅描述采购模式。
6. **主要客户及销售模式**:
* 阐明公司的**销售模式**(例如:直销、经销、线上销售、代理等)。
* 列出公司的**主要客户群体**或**代表性大客户**。
* 如果公开信息中包含,请标明**主要客户(或前五大客户)的销售额占公司总销售额的比例**。若无此数据,则仅描述客户类型。
7. **未来展望**:
* 基于公司**公开的官方声明、管理层访谈或战略规划**,总结公司未来的发展方向、战略目标、重点项目或市场预期。请确保此部分内容有可靠的信息来源支持。"""
if financial_data:
prompt += f"\n\n参考财务数据:\n{financial_data}"
return prompt