""" AI分析服务 处理Gemini API集成和AI分析功能 """ from typing import Dict, Any, Optional, List import httpx import asyncio import json import logging from datetime import datetime from ..core.exceptions import ( AIAnalysisError, APIError, AuthenticationError, RateLimitError ) from ..schemas.report import AIAnalysisRequest, AIAnalysisResponse logger = logging.getLogger(__name__) class GeminiAnalyzer: """Gemini AI分析器""" def __init__(self, api_key: str, config: Optional[Dict[str, Any]] = None): if not api_key: raise AuthenticationError("gemini", {"message": "Gemini API密钥未配置"}) self.api_key = api_key self.config = config or {} self.base_url = self.config.get("base_url", "https://generativelanguage.googleapis.com/v1beta") self.model = self.config.get("model", "gemini-pro") self.timeout = self.config.get("timeout", 60) self.max_retries = self.config.get("max_retries", 3) self.retry_delay = self.config.get("retry_delay", 2) # 生成配置 self.generation_config = { "temperature": self.config.get("temperature", 0.7), "top_p": self.config.get("top_p", 0.8), "top_k": self.config.get("top_k", 40), "max_output_tokens": self.config.get("max_output_tokens", 8192), } async def analyze_business_info(self, symbol: str, market: str, financial_data: Dict[str, Any]) -> AIAnalysisResponse: """分析公司业务信息""" if not symbol or not market: raise AIAnalysisError("股票代码和市场参数不能为空", self.model) prompt = self._build_business_info_prompt(symbol, market, financial_data) try: result = await self._retry_request(self._call_gemini_api, prompt) # 验证响应内容 if not result or len(result.strip()) < 100: raise AIAnalysisError("AI返回的业务信息分析内容过短或为空", self.model) # 解析响应 parsed_content = self._parse_business_info_response(result) # 验证解析结果的质量 quality = parsed_content.get("content_quality", {}) if quality.get("quality_score", 0) < 0.3: logger.warning(f"业务信息分析质量较低: {symbol} ({market}) - 质量分数: {quality.get('quality_score', 0)}") return AIAnalysisResponse( symbol=symbol, market=market, analysis_type="business_info", content=parsed_content, model_used=self.model, generated_at=datetime.now() ) except Exception as e: if isinstance(e, (AIAnalysisError, APIError)): raise raise AIAnalysisError(f"业务信息分析失败: {str(e)}", self.model) async def analyze_fundamental(self, symbol: str, market: str, financial_data: Dict[str, Any], business_info: Dict[str, Any]) -> AIAnalysisResponse: """基本面分析(景林模型)""" prompt = self._build_fundamental_analysis_prompt(symbol, market, financial_data, business_info) try: result = await self._retry_request(self._call_gemini_api, prompt) return AIAnalysisResponse( symbol=symbol, market=market, analysis_type="fundamental_analysis", content=self._parse_fundamental_response(result), model_used=self.model, generated_at=datetime.now() ) except Exception as e: if isinstance(e, (AIAnalysisError, APIError)): raise raise AIAnalysisError(f"基本面分析失败: {str(e)}", self.model) async def analyze_bullish_case(self, symbol: str, market: str, context_data: Dict[str, Any]) -> AIAnalysisResponse: """看涨分析(隐藏资产、护城河分析)""" prompt = self._build_bullish_analysis_prompt(symbol, market, context_data) try: result = await self._retry_request(self._call_gemini_api, prompt) return AIAnalysisResponse( symbol=symbol, market=market, analysis_type="bullish_analysis", content=self._parse_bullish_response(result), model_used=self.model, generated_at=datetime.now() ) except Exception as e: if isinstance(e, (AIAnalysisError, APIError)): raise raise AIAnalysisError(f"看涨分析失败: {str(e)}", self.model) async def analyze_bearish_case(self, symbol: str, market: str, context_data: Dict[str, Any]) -> AIAnalysisResponse: """看跌分析(价值底线、最坏情况分析)""" prompt = self._build_bearish_analysis_prompt(symbol, market, context_data) try: result = await self._retry_request(self._call_gemini_api, prompt) return AIAnalysisResponse( symbol=symbol, market=market, analysis_type="bearish_analysis", content=self._parse_bearish_response(result), model_used=self.model, generated_at=datetime.now() ) except Exception as e: if isinstance(e, (AIAnalysisError, APIError)): raise raise AIAnalysisError(f"看跌分析失败: {str(e)}", self.model) async def analyze_market_sentiment(self, symbol: str, market: str, context_data: Dict[str, Any]) -> AIAnalysisResponse: """市场情绪分析""" prompt = self._build_market_analysis_prompt(symbol, market, context_data) try: result = await self._retry_request(self._call_gemini_api, prompt) return AIAnalysisResponse( symbol=symbol, market=market, analysis_type="market_analysis", content=self._parse_market_response(result), model_used=self.model, generated_at=datetime.now() ) except Exception as e: if isinstance(e, (AIAnalysisError, APIError)): raise raise AIAnalysisError(f"市场分析失败: {str(e)}", self.model) async def analyze_news_catalysts(self, symbol: str, market: str, context_data: Dict[str, Any]) -> AIAnalysisResponse: """新闻催化剂分析""" prompt = self._build_news_analysis_prompt(symbol, market, context_data) try: result = await self._retry_request(self._call_gemini_api, prompt) return AIAnalysisResponse( symbol=symbol, market=market, analysis_type="news_analysis", content=self._parse_news_response(result), model_used=self.model, generated_at=datetime.now() ) except Exception as e: if isinstance(e, (AIAnalysisError, APIError)): raise raise AIAnalysisError(f"新闻分析失败: {str(e)}", self.model) async def analyze_trading_dynamics(self, symbol: str, market: str, context_data: Dict[str, Any]) -> AIAnalysisResponse: """交易动态分析""" prompt = self._build_trading_analysis_prompt(symbol, market, context_data) try: result = await self._retry_request(self._call_gemini_api, prompt) return AIAnalysisResponse( symbol=symbol, market=market, analysis_type="trading_analysis", content=self._parse_trading_response(result), model_used=self.model, generated_at=datetime.now() ) except Exception as e: if isinstance(e, (AIAnalysisError, APIError)): raise raise AIAnalysisError(f"交易分析失败: {str(e)}", self.model) async def analyze_insider_institutional(self, symbol: str, market: str, context_data: Dict[str, Any]) -> AIAnalysisResponse: """内部人与机构动向分析""" prompt = self._build_insider_analysis_prompt(symbol, market, context_data) try: result = await self._retry_request(self._call_gemini_api, prompt) return AIAnalysisResponse( symbol=symbol, market=market, analysis_type="insider_analysis", content=self._parse_insider_response(result), model_used=self.model, generated_at=datetime.now() ) except Exception as e: if isinstance(e, (AIAnalysisError, APIError)): raise raise AIAnalysisError(f"内部人分析失败: {str(e)}", self.model) async def generate_final_conclusion(self, symbol: str, market: str, all_analyses: List[Dict[str, Any]]) -> AIAnalysisResponse: """生成最终结论""" prompt = self._build_conclusion_prompt(symbol, market, all_analyses) try: result = await self._retry_request(self._call_gemini_api, prompt) return AIAnalysisResponse( symbol=symbol, market=market, analysis_type="final_conclusion", content=self._parse_conclusion_response(result), model_used=self.model, generated_at=datetime.now() ) except Exception as e: if isinstance(e, (AIAnalysisError, APIError)): raise raise AIAnalysisError(f"最终结论生成失败: {str(e)}", self.model) async def _call_gemini_api(self, prompt: str) -> str: """调用Gemini API""" url = f"{self.base_url}/models/{self.model}:generateContent" headers = { "Content-Type": "application/json", } data = { "contents": [ { "parts": [ { "text": prompt } ] } ], "generationConfig": self.generation_config } params = { "key": self.api_key } try: async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post(url, json=data, headers=headers, params=params) response.raise_for_status() result = response.json() if "error" in result: error = result["error"] error_code = error.get("code", 0) error_message = error.get("message", "Unknown error") if error_code == 401 or "API key" in error_message: raise AuthenticationError("gemini", {"message": error_message}) elif error_code == 429 or "quota" in error_message.lower(): raise RateLimitError("gemini") else: raise APIError(f"Gemini API错误: {error_message}", error_code, "gemini") candidates = result.get("candidates", []) if not candidates: raise AIAnalysisError("Gemini API返回空结果", self.model) content = candidates[0].get("content", {}) parts = content.get("parts", []) if not parts: raise AIAnalysisError("Gemini API返回内容为空", self.model) return parts[0].get("text", "") except httpx.HTTPStatusError as e: if e.response.status_code == 401: raise AuthenticationError("gemini", {"status_code": e.response.status_code}) elif e.response.status_code == 429: raise RateLimitError("gemini") else: raise APIError(f"HTTP错误: {e.response.status_code}", e.response.status_code, "gemini") except httpx.RequestError as e: raise AIAnalysisError(f"网络请求失败: {str(e)}", self.model) async def _retry_request(self, func, *args, **kwargs): """重试机制""" last_exception = None for attempt in range(self.max_retries): try: return await func(*args, **kwargs) except (httpx.TimeoutException, httpx.ConnectError, AIAnalysisError) as e: last_exception = e if attempt < self.max_retries - 1: await asyncio.sleep(self.retry_delay * (2 ** attempt)) # 指数退避 continue except (AuthenticationError, RateLimitError) as e: # 认证错误和频率限制错误不重试 raise e except Exception as e: # 其他异常不重试 raise e # 所有重试都失败了 raise AIAnalysisError( f"Gemini API请求失败,已重试 {self.max_retries} 次: {str(last_exception)}", self.model ) def _build_business_info_prompt(self, symbol: str, market: str, financial_data: Dict[str, Any]) -> str: """构建业务信息分析提示词""" # 根据市场调整分析重点 market_context = { "中国": "中国A股市场", "香港": "香港联交所", "美国": "美国证券市场", "日本": "日本证券市场" }.get(market, market) return f""" 请对股票代码 {symbol}({market_context})进行全面的业务信息分析。 基于以下财务数据: {json.dumps(financial_data, ensure_ascii=False, indent=2)} 请按照以下结构提供详细分析,每个部分都要有具体的信息和数据支撑: ## 1. 公司概览 请提供公司的基本信息,包括: - 公司全称、成立时间、注册地 - 主要业务领域和所属行业 - 在行业中的市场地位和排名 - 公司规模(员工数量、资产规模等) - 主要竞争对手 ## 2. 主营业务分析 请详细分析公司的核心业务: - 主要产品和服务的详细描述 - 各业务板块的收入占比和盈利贡献 - 业务模式和价值链分析 - 核心竞争优势和差异化特点 - 产品或服务的市场需求和增长前景 ## 3. 发展历程 请梳理公司的重要发展节点: - 公司成立和上市历程 - 重要的业务转型和战略调整 - 关键的并购重组事件 - 重大技术突破或产品创新 - 国际化扩张历程(如适用) ## 4. 核心团队 请分析公司的管理层情况: - 董事长、CEO等核心高管的背景和经验 - 管理团队的稳定性和变动情况 - 创始人或控股股东的影响力 - 管理层的激励机制和持股情况 - 公司治理结构的特点 ## 5. 供应链分析 请分析公司的供应链体系: - 主要原材料或服务的供应商情况 - 供应链的集中度和依赖性风险 - 供应链管理的优势和挑战 - 重要客户的构成和集中度 - 客户关系的稳定性和粘性 ## 6. 销售模式 请分析公司的销售和营销策略: - 主要销售渠道和分销网络 - 直销与经销的比例和策略 - 线上线下销售模式的结合 - 目标客户群体和市场定位 - 品牌建设和市场推广策略 ## 7. 未来展望 请分析公司的发展前景: - 公司的中长期发展战略和规划 - 新业务或新市场的拓展计划 - 技术创新和研发投入方向 - 面临的主要机遇和挑战 - 行业发展趋势对公司的影响 请确保分析内容: 1. 基于公开可获得的信息和财务数据 2. 客观、准确,避免过度乐观或悲观 3. 具体详实,有数据和事实支撑 4. 结构清晰,便于理解和阅读 5. 用中文回答,语言专业但易懂 注意:如果某些信息无法获得或不确定,请明确说明,不要编造信息。 """ def _build_fundamental_analysis_prompt(self, symbol: str, market: str, financial_data: Dict[str, Any], business_info: Dict[str, Any]) -> str: """构建基本面分析提示词(景林模型)""" return f""" 请使用景林投资的基本面分析框架,对股票代码 {symbol}({market}市场)进行深度分析。 财务数据: {json.dumps(financial_data, ensure_ascii=False, indent=2)} 业务信息: {json.dumps(business_info, ensure_ascii=False, indent=2)} 请按照以下景林模型问题集进行分析: 1. 商业模式分析 - 这是一门什么样的生意? - 商业模式的核心竞争力是什么? - 盈利模式是否可持续? 2. 行业地位分析 - 公司在行业中的地位如何? - 市场份额和竞争优势? - 行业发展趋势对公司的影响? 3. 财务质量分析 - 收入增长的质量如何? - 盈利能力和现金流状况? - 资产负债结构是否健康? 4. 管理层评估 - 管理层的能力和诚信度? - 公司治理结构是否完善? - 股东利益是否得到保护? 5. 估值分析 - 当前估值水平是否合理? - 与同行业公司比较如何? - 未来增长预期是否支撑估值? 请用中文提供详细、专业的分析,每个方面都要有具体的数据支撑和逻辑推理。 """ def _build_bullish_analysis_prompt(self, symbol: str, market: str, context_data: Dict[str, Any]) -> str: """构建看涨分析提示词""" return f""" 请对股票代码 {symbol}({market}市场)进行看涨分析,重点关注隐藏资产和护城河竞争优势。 基础数据: {json.dumps(context_data, ensure_ascii=False, indent=2)} 请从以下角度进行看涨分析: 1. 隐藏资产发现 - 账面价值被低估的资产 - 无形资产的真实价值 - 潜在的资产重估机会 2. 护城河分析 - 品牌价值和客户忠诚度 - 技术壁垒和专利保护 - 规模经济和网络效应 - 转换成本和客户粘性 3. 成长潜力 - 新业务和新市场机会 - 产品创新和技术升级 - 市场扩张的可能性 4. 催化剂识别 - 短期可能的积极因素 - 政策支持和行业利好 - 公司内部改革和优化 5. 最佳情况假设 - 如果一切顺利,公司价值可能达到什么水平? - 关键假设和实现路径 请用中文提供乐观但理性的分析,要有具体的逻辑支撑。 """ def _build_bearish_analysis_prompt(self, symbol: str, market: str, context_data: Dict[str, Any]) -> str: """构建看跌分析提示词""" return f""" 请对股票代码 {symbol}({market}市场)进行看跌分析,重点关注价值底线和最坏情况。 基础数据: {json.dumps(context_data, ensure_ascii=False, indent=2)} 请从以下角度进行看跌分析: 1. 价值底线分析 - 清算价值估算 - 资产的最低合理价值 - 下行风险的底线在哪里 2. 主要风险因素 - 行业周期性风险 - 竞争加剧的威胁 - 技术替代的可能性 - 监管政策变化风险 3. 财务脆弱性 - 债务压力和流动性风险 - 现金流恶化的可能性 - 盈利能力下降的风险 4. 管理层风险 - 决策失误的历史 - 治理结构的缺陷 - 利益冲突的可能性 5. 最坏情况假设 - 如果一切都出错,公司价值可能跌到什么水平? - 关键风险因素和触发条件 请用中文提供谨慎但客观的分析,要有具体的风险量化。 """ def _build_market_analysis_prompt(self, symbol: str, market: str, context_data: Dict[str, Any]) -> str: """构建市场分析提示词""" return f""" 请对股票代码 {symbol}({market}市场)进行市场情绪分析,重点关注分歧点与变化驱动。 基础数据: {json.dumps(context_data, ensure_ascii=False, indent=2)} 请从以下角度进行市场分析: 1. 市场情绪评估 - 当前市场对该股票的主流观点 - 机构投资者的持仓变化 - 散户投资者的情绪指标 2. 分歧点识别 - 市场存在哪些主要分歧? - 乐观派和悲观派的核心观点 - 分歧的根本原因是什么? 3. 变化驱动因素 - 什么因素可能改变市场共识? - 关键数据点和时间节点 - 外部环境变化的影响 4. 资金流向分析 - 主力资金的进出情况 - 不同类型投资者的行为模式 - 流动性状况评估 5. 市场预期vs现实 - 市场预期是否过于乐观或悲观? - 预期差的投资机会在哪里? 请用中文提供专业的市场分析,要有数据支撑和逻辑推理。 """ def _build_news_analysis_prompt(self, symbol: str, market: str, context_data: Dict[str, Any]) -> str: """构建新闻分析提示词""" return f""" 请对股票代码 {symbol}({market}市场)进行新闻催化剂分析,重点关注股价拐点预判。 基础数据: {json.dumps(context_data, ensure_ascii=False, indent=2)} 请从以下角度进行新闻分析: 1. 近期重要新闻梳理 - 公司公告和重大事件 - 行业政策和监管变化 - 宏观经济相关新闻 2. 催化剂识别 - 正面催化剂(业绩超预期、政策利好等) - 负面催化剂(风险事件、竞争加剧等) - 中性但重要的信息 3. 拐点预判 - 基本面拐点的可能时间 - 市场情绪拐点的信号 - 技术面拐点的确认 4. 新闻影响评估 - 短期影响vs长期影响 - 市场反应是否充分 - 后续发展的可能路径 5. 关注要点 - 未来需要重点关注的事件 - 可能的风险点和机会点 - 时间窗口和操作建议 请用中文提供前瞻性的分析,要有时间维度和影响程度的判断。 """ def _build_trading_analysis_prompt(self, symbol: str, market: str, context_data: Dict[str, Any]) -> str: """构建交易分析提示词""" return f""" 请对股票代码 {symbol}({market}市场)进行交易分析,重点关注市场体量与增长路径。 基础数据: {json.dumps(context_data, ensure_ascii=False, indent=2)} 请从以下角度进行交易分析: 1. 市场体量分析 - 总市值和流通市值 - 日均成交量和换手率 - 市场容量和流动性评估 2. 增长路径分析 - 历史增长轨迹和驱动因素 - 未来增长的可能路径 - 增长的可持续性评估 3. 交易特征分析 - 股价波动特征和规律 - 主要交易时段和模式 - 大宗交易和异常交易情况 4. 技术面分析 - 关键技术位和支撑阻力 - 趋势线和形态分析 - 技术指标的信号 5. 交易策略建议 - 适合的交易时机和方式 - 风险控制和仓位管理 - 进出场点位的选择 请用中文提供实用的交易分析,要有具体的数据和操作建议。 """ def _build_insider_analysis_prompt(self, symbol: str, market: str, context_data: Dict[str, Any]) -> str: """构建内部人分析提示词""" return f""" 请对股票代码 {symbol}({market}市场)进行内部人与机构动向分析。 基础数据: {json.dumps(context_data, ensure_ascii=False, indent=2)} 请从以下角度进行分析: 1. 内部人交易分析 - 高管和大股东的买卖行为 - 内部人交易的时机和规模 - 内部人交易的信号意义 2. 机构持仓分析 - 主要机构投资者的持仓变化 - 新进和退出的机构情况 - 机构持仓集中度分析 3. 股东结构变化 - 股权结构的演变趋势 - 重要股东的进出情况 - 股权激励和员工持股情况 4. 资金流向追踪 - 大资金的进出时机 - 不同类型资金的偏好 - 资金成本和收益预期 5. 动向信号解读 - 内部人和机构行为的一致性 - 与股价走势的相关性 - 对未来走势的指示意义 请用中文提供专业的分析,要有数据支撑和逻辑推理。 """ def _build_conclusion_prompt(self, symbol: str, market: str, all_analyses: List[Dict[str, Any]]) -> str: """构建最终结论提示词""" analyses_text = "\n\n".join([ f"{analysis.get('analysis_type', '未知分析')}:\n{json.dumps(analysis.get('content', {}), ensure_ascii=False, indent=2)}" for analysis in all_analyses ]) return f""" 基于以下所有分析结果,请对股票代码 {symbol}({market}市场)给出最终投资结论。 所有分析结果: {analyses_text} 请从以下角度进行综合分析: 1. 关键矛盾识别 - 当前最核心的投资矛盾是什么? - 不同分析维度的结论是否一致? - 主要的不确定性因素有哪些? 2. 预期差分析 - 市场预期与实际情况的差异 - 可能被忽视或误解的关键信息 - 预期差带来的投资机会 3. 拐点临近性判断 - 基本面拐点的时间和概率 - 市场情绪拐点的信号 - 催化剂的时效性分析 4. 风险收益评估 - 上行空间和下行风险的量化 - 风险调整后的收益预期 - 投资的风险收益比 5. 最终投资建议 - 明确的投资观点(看多/看空/中性) - 建议的投资时间框架 - 关键的跟踪指标和退出条件 请用中文提供清晰、明确的投资结论,要有逻辑性和可操作性。 """ def _parse_business_info_response(self, response: str) -> Dict[str, Any]: """解析业务信息分析响应""" return { "company_overview": self._extract_section(response, "公司概览"), "main_business": self._extract_section(response, "主营业务分析"), "development_history": self._extract_section(response, "发展历程"), "core_team": self._extract_section(response, "核心团队"), "supply_chain": self._extract_section(response, "供应链分析"), "sales_model": self._extract_section(response, "销售模式"), "future_outlook": self._extract_section(response, "未来展望"), "full_analysis": response, "analysis_timestamp": datetime.now().isoformat(), "content_quality": self._assess_content_quality(response) } def _parse_fundamental_response(self, response: str) -> Dict[str, Any]: """解析基本面分析响应""" return { "business_model": self._extract_section(response, "商业模式"), "industry_position": self._extract_section(response, "行业地位"), "financial_quality": self._extract_section(response, "财务质量"), "management_assessment": self._extract_section(response, "管理层"), "valuation_analysis": self._extract_section(response, "估值分析"), "full_analysis": response } def _parse_bullish_response(self, response: str) -> Dict[str, Any]: """解析看涨分析响应""" return { "hidden_assets": self._extract_section(response, "隐藏资产"), "moat_analysis": self._extract_section(response, "护城河"), "growth_potential": self._extract_section(response, "成长潜力"), "catalysts": self._extract_section(response, "催化剂"), "best_case": self._extract_section(response, "最佳情况"), "full_analysis": response } def _parse_bearish_response(self, response: str) -> Dict[str, Any]: """解析看跌分析响应""" return { "value_floor": self._extract_section(response, "价值底线"), "risk_factors": self._extract_section(response, "风险因素"), "financial_vulnerability": self._extract_section(response, "财务脆弱性"), "management_risks": self._extract_section(response, "管理层风险"), "worst_case": self._extract_section(response, "最坏情况"), "full_analysis": response } def _parse_market_response(self, response: str) -> Dict[str, Any]: """解析市场分析响应""" return { "market_sentiment": self._extract_section(response, "市场情绪"), "disagreement_points": self._extract_section(response, "分歧点"), "change_drivers": self._extract_section(response, "变化驱动"), "capital_flow": self._extract_section(response, "资金流向"), "expectation_vs_reality": self._extract_section(response, "预期vs现实"), "full_analysis": response } def _parse_news_response(self, response: str) -> Dict[str, Any]: """解析新闻分析响应""" return { "recent_news": self._extract_section(response, "重要新闻"), "catalysts": self._extract_section(response, "催化剂"), "inflection_points": self._extract_section(response, "拐点预判"), "news_impact": self._extract_section(response, "影响评估"), "focus_points": self._extract_section(response, "关注要点"), "full_analysis": response } def _parse_trading_response(self, response: str) -> Dict[str, Any]: """解析交易分析响应""" return { "market_size": self._extract_section(response, "市场体量"), "growth_path": self._extract_section(response, "增长路径"), "trading_characteristics": self._extract_section(response, "交易特征"), "technical_analysis": self._extract_section(response, "技术面"), "trading_strategy": self._extract_section(response, "交易策略"), "full_analysis": response } def _parse_insider_response(self, response: str) -> Dict[str, Any]: """解析内部人分析响应""" return { "insider_trading": self._extract_section(response, "内部人交易"), "institutional_holdings": self._extract_section(response, "机构持仓"), "ownership_changes": self._extract_section(response, "股东结构"), "capital_flow": self._extract_section(response, "资金流向"), "signal_interpretation": self._extract_section(response, "信号解读"), "full_analysis": response } def _parse_conclusion_response(self, response: str) -> Dict[str, Any]: """解析最终结论响应""" return { "key_contradictions": self._extract_section(response, "关键矛盾"), "expectation_gap": self._extract_section(response, "预期差"), "inflection_timing": self._extract_section(response, "拐点临近性"), "risk_return": self._extract_section(response, "风险收益"), "investment_recommendation": self._extract_section(response, "投资建议"), "full_analysis": response } def _extract_section(self, text: str, section_name: str) -> str: """从文本中提取特定章节内容""" lines = text.split('\n') section_content = [] in_section = False for line in lines: # 匹配章节标题(支持多种格式) if section_name in line and any(marker in line for marker in ['##', '1.', '2.', '3.', '4.', '5.', '6.', '7.', ':', ':']): in_section = True # 不包含标题行,只要内容 continue if in_section: # 遇到下一个主要章节,停止 if line.strip() and any(marker in line for marker in ['##', '1.', '2.', '3.', '4.', '5.', '6.', '7.']) and section_name not in line: break section_content.append(line) # 清理内容 content = '\n'.join(section_content).strip() # 移除多余的空行 content = '\n'.join(line for line in content.split('\n') if line.strip() or not content.split('\n')[content.split('\n').index(line):content.split('\n').index(line)+2] == ['', '']) return content def _assess_content_quality(self, response: str) -> Dict[str, Any]: """评估内容质量""" word_count = len(response) sections_found = 0 # 检查各个章节是否存在 required_sections = ["公司概览", "主营业务", "发展历程", "核心团队", "供应链", "销售模式", "未来展望"] for section in required_sections: if section in response: sections_found += 1 # 计算完整度 completeness = sections_found / len(required_sections) # 评估详细程度 detail_level = "简要" if word_count < 500 else "详细" if word_count < 1500 else "非常详细" return { "word_count": word_count, "sections_found": sections_found, "total_sections": len(required_sections), "completeness_ratio": completeness, "detail_level": detail_level, "quality_score": min(1.0, (completeness * 0.7 + min(1.0, word_count / 1000) * 0.3)) } class AIAnalyzerFactory: """AI分析器工厂""" @classmethod def create_gemini_analyzer(cls, api_key: str, config: Optional[Dict[str, Any]] = None) -> GeminiAnalyzer: """创建Gemini分析器""" return GeminiAnalyzer(api_key, config) @classmethod def create_analyzer(cls, analyzer_type: str, **kwargs) -> GeminiAnalyzer: """创建分析器(可扩展支持其他AI服务)""" if analyzer_type.lower() == "gemini": return cls.create_gemini_analyzer(kwargs.get("api_key"), kwargs.get("config")) else: raise AIAnalysisError(f"不支持的AI分析器类型: {analyzer_type}")