TradingAgents-CN

研究员团队设计

概述

研究员团队是 TradingAgents 框架中的关键组件,负责对分析师团队的报告进行深度研究和辩论。通过看涨和看跌研究员之间的结构化辩论,系统能够从多个角度评估投资机会,形成更加平衡和全面的投资观点。

研究员架构

基础研究员类

class BaseResearcher:
    """所有研究员的基础类"""
    
    def __init__(self, llm, config, stance="neutral"):
        self.llm = llm
        self.config = config
        self.stance = stance  # "bullish", "bearish", "neutral"
        self.memory = ResearcherMemory()
        self.debate_history = []
        
    def research(self, analyst_reports: Dict, context: Dict = None) -> Dict:
        """执行研究分析"""
        
        # 1. 分析师报告解读
        interpretation = self.interpret_reports(analyst_reports)
        
        # 2. 立场分析
        stance_analysis = self.analyze_from_stance(interpretation)
        
        # 3. 生成研究观点
        research_view = self.generate_research_view(stance_analysis)
        
        # 4. 准备辩论要点
        debate_points = self.prepare_debate_points(research_view)
        
        return {
            "interpretation": interpretation,
            "stance_analysis": stance_analysis,
            "research_view": research_view,
            "debate_points": debate_points,
            "confidence": self.calculate_confidence(research_view)
        }
    
    def debate(self, opponent_view: Dict, round_number: int) -> Dict:
        """参与辩论"""
        
        # 1. 分析对手观点
        opponent_analysis = self.analyze_opponent_view(opponent_view)
        
        # 2. 准备反驳
        counter_arguments = self.prepare_counter_arguments(opponent_analysis)
        
        # 3. 强化自己观点
        reinforced_view = self.reinforce_own_view(counter_arguments)
        
        # 4. 生成辩论回应
        debate_response = self.generate_debate_response(
            counter_arguments, reinforced_view, round_number
        )
        
        # 5. 更新辩论历史
        self.debate_history.append({
            "round": round_number,
            "opponent_view": opponent_view,
            "response": debate_response
        })
        
        return debate_response

1. 看涨研究员 (Bull Researcher)

职责与特点

class BullResearcher(BaseResearcher):
    """看涨研究员 - 从乐观角度评估投资机会"""
    
    def __init__(self, llm, config):
        super().__init__(llm, config, stance="bullish")
        
    专业特点:
    - 积极寻找增长机会
    - 强调正面催化剂
    - 关注上涨潜力
    - 挑战悲观观点
    
    分析重点:
    - 收入增长驱动因素
    - 市场扩张机会
    - 竞争优势分析
    - 估值吸引力
    - 技术创新潜力
    - 管理层执行力

核心研究方法

def analyze_from_stance(self, interpretation: Dict) -> Dict:
    """从看涨角度分析"""
    
    bullish_factors = []
    growth_opportunities = []
    positive_catalysts = []
    
    # 基本面看涨因素
    fundamental_data = interpretation.get("fundamental_analysis", {})
    if fundamental_data.get("revenue_growth", 0) > 0.1:  # 10%以上增长
        bullish_factors.append("强劲的收入增长")
    
    if fundamental_data.get("profit_margin_trend") == "improving":
        bullish_factors.append("利润率改善趋势")
    
    # 技术面看涨信号
    technical_data = interpretation.get("technical_analysis", {})
    if technical_data.get("trend_direction") == "uptrend":
        bullish_factors.append("技术面上升趋势")
    
    if technical_data.get("momentum") == "positive":
        bullish_factors.append("正面动量信号")
    
    # 新闻面积极因素
    news_data = interpretation.get("news_analysis", {})
    if news_data.get("sentiment_score", 0) > 0.6:
        positive_catalysts.append("积极的新闻情绪")
    
    # 识别增长机会
    growth_opportunities = self._identify_growth_opportunities(interpretation)
    
    return {
        "bullish_factors": bullish_factors,
        "growth_opportunities": growth_opportunities,
        "positive_catalysts": positive_catalysts,
        "upside_potential": self._calculate_upside_potential(interpretation),
        "risk_mitigation": self._identify_risk_mitigation_factors(interpretation)
    }

def _identify_growth_opportunities(self, interpretation: Dict) -> List[str]:
    """识别增长机会"""
    
    opportunities = []
    
    # 市场扩张机会
    if interpretation.get("market_size_growth", 0) > 0.05:
        opportunities.append("市场规模持续扩张")
    
    # 新产品/服务机会
    if interpretation.get("new_product_pipeline"):
        opportunities.append("丰富的新产品管线")
    
    # 国际化机会
    if interpretation.get("international_expansion"):
        opportunities.append("国际市场扩张潜力")
    
    # 技术创新机会
    if interpretation.get("innovation_score", 0) > 0.7:
        opportunities.append("技术创新领先优势")
    
    return opportunities

def prepare_debate_points(self, research_view: Dict) -> Dict:
    """准备辩论要点"""
    
    return {
        "main_arguments": [
            "基本面数据显示强劲增长潜力",
            "技术指标支持上涨趋势",
            "市场情绪积极向好",
            "估值仍有上升空间"
        ],
        "supporting_evidence": research_view.get("bullish_factors", []),
        "growth_thesis": research_view.get("growth_opportunities", []),
        "risk_responses": self._prepare_risk_responses(research_view),
        "target_scenarios": self._develop_bull_scenarios(research_view)
    }

辩论策略

def generate_debate_response(self, counter_args: Dict, reinforced_view: Dict, round_num: int) -> Dict:
    """生成辩论回应"""
    
    response_strategy = self._determine_response_strategy(round_num)
    
    if response_strategy == "aggressive":
        return self._aggressive_response(counter_args, reinforced_view)
    elif response_strategy == "defensive":
        return self._defensive_response(counter_args, reinforced_view)
    else:
        return self._balanced_response(counter_args, reinforced_view)

def _aggressive_response(self, counter_args: Dict, reinforced_view: Dict) -> Dict:
    """积极进攻型回应"""
    
    return {
        "response_type": "aggressive",
        "main_points": [
            "对手过度关注短期风险,忽视长期价值",
            "市场恐慌情绪创造了绝佳买入机会",
            "基本面改善趋势不可逆转",
            "当前估值明显低估真实价值"
        ],
        "evidence_reinforcement": reinforced_view.get("strengthened_evidence", []),
        "opponent_weaknesses": self._identify_opponent_weaknesses(counter_args),
        "confidence_boost": "高度确信看涨观点的正确性"
    }

def _defensive_response(self, counter_args: Dict, reinforced_view: Dict) -> Dict:
    """防守型回应"""
    
    return {
        "response_type": "defensive",
        "risk_acknowledgment": "承认存在一定风险,但风险可控",
        "mitigation_strategies": [
            "分散投资降低单一风险",
            "设置合理止损位",
            "关注基本面变化",
            "动态调整仓位"
        ],
        "qualified_optimism": "在风险可控前提下保持乐观",
        "evidence_reaffirmation": reinforced_view.get("core_evidence", [])
    }

2. 看跌研究员 (Bear Researcher)

职责与特点

class BearResearcher(BaseResearcher):
    """看跌研究员 - 从悲观角度评估投资风险"""
    
    def __init__(self, llm, config):
        super().__init__(llm, config, stance="bearish")
        
    专业特点:
    - 专注风险识别
    - 质疑乐观假设
    - 关注下跌风险
    - 挑战看涨观点
    
    分析重点:
    - 潜在风险因素
    - 估值过高风险
    - 竞争威胁分析
    - 宏观经济风险
    - 行业周期性风险
    - 公司治理问题

核心研究方法

def analyze_from_stance(self, interpretation: Dict) -> Dict:
    """从看跌角度分析"""
    
    bearish_factors = []
    risk_factors = []
    negative_catalysts = []
    
    # 基本面风险因素
    fundamental_data = interpretation.get("fundamental_analysis", {})
    if fundamental_data.get("debt_to_equity", 0) > 0.6:
        bearish_factors.append("高负债比率风险")
    
    if fundamental_data.get("profit_margin_trend") == "declining":
        bearish_factors.append("利润率下降趋势")
    
    # 技术面看跌信号
    technical_data = interpretation.get("technical_analysis", {})
    if technical_data.get("trend_direction") == "downtrend":
        bearish_factors.append("技术面下降趋势")
    
    if technical_data.get("volume_trend") == "declining":
        bearish_factors.append("成交量萎缩信号")
    
    # 新闻面负面因素
    news_data = interpretation.get("news_analysis", {})
    if news_data.get("sentiment_score", 0) < 0.4:
        negative_catalysts.append("负面新闻情绪")
    
    # 识别风险因素
    risk_factors = self._identify_risk_factors(interpretation)
    
    return {
        "bearish_factors": bearish_factors,
        "risk_factors": risk_factors,
        "negative_catalysts": negative_catalysts,
        "downside_potential": self._calculate_downside_potential(interpretation),
        "valuation_concerns": self._assess_valuation_risks(interpretation)
    }

def _identify_risk_factors(self, interpretation: Dict) -> List[Dict]:
    """识别风险因素"""
    
    risks = []
    
    # 市场风险
    if interpretation.get("market_volatility", 0) > 0.3:
        risks.append({
            "type": "market_risk",
            "description": "市场波动性过高",
            "severity": "high",
            "probability": 0.7
        })
    
    # 行业风险
    if interpretation.get("industry_headwinds"):
        risks.append({
            "type": "industry_risk",
            "description": "行业面临逆风",
            "severity": "medium",
            "probability": 0.6
        })
    
    # 公司特定风险
    if interpretation.get("company_specific_issues"):
        risks.append({
            "type": "company_risk",
            "description": "公司特定问题",
            "severity": "high",
            "probability": 0.8
        })
    
    return risks

def prepare_debate_points(self, research_view: Dict) -> Dict:
    """准备辩论要点"""
    
    return {
        "main_arguments": [
            "估值过高,缺乏安全边际",
            "基本面恶化趋势明显",
            "技术指标显示下跌风险",
            "宏观环境不利于增长"
        ],
        "risk_evidence": research_view.get("risk_factors", []),
        "valuation_concerns": research_view.get("valuation_concerns", []),
        "scenario_analysis": self._develop_bear_scenarios(research_view),
        "contrarian_points": self._prepare_contrarian_arguments(research_view)
    }

风险评估专长

def _assess_valuation_risks(self, interpretation: Dict) -> Dict:
    """评估估值风险"""
    
    valuation_data = interpretation.get("fundamental_analysis", {})
    
    concerns = []
    
    # P/E 比率分析
    pe_ratio = valuation_data.get("pe_ratio", 0)
    industry_avg_pe = valuation_data.get("industry_avg_pe", 0)
    
    if pe_ratio > industry_avg_pe * 1.5:
        concerns.append("P/E比率显著高于行业平均")
    
    # P/B 比率分析
    pb_ratio = valuation_data.get("pb_ratio", 0)
    if pb_ratio > 3.0:
        concerns.append("P/B比率过高,存在泡沫风险")
    
    # 增长率与估值匹配度
    growth_rate = valuation_data.get("growth_rate", 0)
    peg_ratio = pe_ratio / max(growth_rate * 100, 1)
    
    if peg_ratio > 2.0:
        concerns.append("PEG比率过高,增长不足以支撑估值")
    
    return {
        "concerns": concerns,
        "overvaluation_risk": "high" if len(concerns) >= 2 else "medium",
        "fair_value_estimate": self._calculate_conservative_fair_value(valuation_data),
        "downside_protection": self._assess_downside_protection(valuation_data)
    }

3. 辩论机制设计

辩论管理器

class DebateManager:
    """辩论管理器 - 协调研究员之间的辩论"""
    
    def __init__(self, config):
        self.config = config
        self.max_rounds = config.get("max_debate_rounds", 3)
        self.consensus_threshold = config.get("consensus_threshold", 0.8)
        
    def conduct_debate(self, bull_researcher: BullResearcher, 
                      bear_researcher: BearResearcher,
                      analyst_reports: Dict) -> Dict:
        """主持辩论过程"""
        
        # 初始研究
        bull_initial = bull_researcher.research(analyst_reports)
        bear_initial = bear_researcher.research(analyst_reports)
        
        debate_history = []
        current_round = 1
        
        while current_round <= self.max_rounds:
            print(f"=== 辩论第 {current_round} 轮 ===")
            
            # 看涨方辩论
            bull_response = bull_researcher.debate(bear_initial, current_round)
            
            # 看跌方辩论
            bear_response = bear_researcher.debate(bull_initial, current_round)
            
            # 记录辩论
            round_record = {
                "round": current_round,
                "bull_response": bull_response,
                "bear_response": bear_response,
                "consensus_level": self._calculate_consensus(bull_response, bear_response)
            }
            
            debate_history.append(round_record)
            
            # 检查是否达成共识或需要继续
            if self._should_continue_debate(round_record, current_round):
                current_round += 1
                # 更新观点用于下一轮
                bull_initial = bull_response
                bear_initial = bear_response
            else:
                break
        
        # 生成最终共识
        final_consensus = self._generate_consensus(debate_history, bull_initial, bear_initial)
        
        return {
            "debate_history": debate_history,
            "final_consensus": final_consensus,
            "total_rounds": current_round,
            "consensus_quality": self._assess_consensus_quality(final_consensus)
        }
    
    def _calculate_consensus(self, bull_view: Dict, bear_view: Dict) -> float:
        """计算共识水平"""
        
        # 提取关键观点
        bull_confidence = bull_view.get("confidence", 0.5)
        bear_confidence = bear_view.get("confidence", 0.5)
        
        # 计算观点差异
        confidence_diff = abs(bull_confidence - bear_confidence)
        
        # 共识水平 = 1 - 观点差异
        consensus_level = 1.0 - confidence_diff
        
        return max(0.0, min(1.0, consensus_level))
    
    def _should_continue_debate(self, round_record: Dict, current_round: int) -> bool:
        """判断是否继续辩论"""
        
        # 达到最大轮次
        if current_round >= self.max_rounds:
            return False
        
        # 达成足够共识
        if round_record["consensus_level"] >= self.consensus_threshold:
            return False
        
        # 观点分歧仍然较大,继续辩论
        return True
    
    def _generate_consensus(self, debate_history: List[Dict], 
                          final_bull: Dict, final_bear: Dict) -> Dict:
        """生成最终共识"""
        
        # 综合双方观点
        consensus_points = []
        
        # 提取共同认可的要点
        bull_factors = set(final_bull.get("main_points", []))
        bear_factors = set(final_bear.get("main_points", []))
        
        common_points = bull_factors.intersection(bear_factors)
        consensus_points.extend(list(common_points))
        
        # 平衡风险和机会
        opportunities = final_bull.get("growth_opportunities", [])
        risks = final_bear.get("risk_factors", [])
        
        # 生成平衡的投资建议
        if len(opportunities) > len(risks):
            recommendation = "谨慎乐观"
            confidence = 0.6
        elif len(risks) > len(opportunities):
            recommendation = "谨慎悲观"
            confidence = 0.4
        else:
            recommendation = "中性观望"
            confidence = 0.5
        
        return {
            "consensus_points": consensus_points,
            "balanced_view": {
                "opportunities": opportunities[:3],  # 前3个机会
                "risks": risks[:3],  # 前3个风险
            },
            "recommendation": recommendation,
            "confidence": confidence,
            "key_factors_to_watch": self._identify_key_factors(final_bull, final_bear)
        }

辩论质量评估

class DebateQualityAssessor:
    """辩论质量评估器"""
    
    def assess_debate_quality(self, debate_result: Dict) -> Dict:
        """评估辩论质量"""
        
        quality_metrics = {
            "depth_score": self._assess_depth(debate_result),
            "balance_score": self._assess_balance(debate_result),
            "evidence_quality": self._assess_evidence_quality(debate_result),
            "logical_consistency": self._assess_logical_consistency(debate_result),
            "consensus_quality": self._assess_consensus_quality(debate_result)
        }
        
        overall_quality = sum(quality_metrics.values()) / len(quality_metrics)
        
        return {
            "quality_metrics": quality_metrics,
            "overall_quality": overall_quality,
            "quality_grade": self._assign_quality_grade(overall_quality),
            "improvement_suggestions": self._generate_improvement_suggestions(quality_metrics)
        }

研究员团队通过结构化的辩论机制,确保投资决策考虑了多个角度和潜在风险,提高了决策的质量和可靠性。