TradingAgents-CN

交易员智能体

概述

交易员智能体是 TradingAgents 框架的核心决策组件,负责综合分析师报告和研究员辩论结果,制定最终的交易决策。交易员智能体具备专业的交易知识和风险意识,能够在复杂的市场环境中做出明智的投资决策。

交易员架构

基础交易员类

class Trader:
    """交易员智能体 - 负责最终交易决策"""
    
    def __init__(self, llm, config):
        self.llm = llm
        self.config = config
        self.trading_style = config.get("trading_style", "balanced")
        self.risk_tolerance = config.get("risk_tolerance", "medium")
        self.memory = TradingMemory()
        self.position_manager = PositionManager()
        
    def make_decision(self, analysis_data: Dict) -> Dict:
        """制定交易决策"""
        
        # 1. 综合分析所有输入
        comprehensive_analysis = self.synthesize_analysis(analysis_data)
        
        # 2. 评估市场条件
        market_assessment = self.assess_market_conditions(analysis_data)
        
        # 3. 制定交易策略
        trading_strategy = self.develop_trading_strategy(
            comprehensive_analysis, market_assessment
        )
        
        # 4. 确定仓位大小
        position_size = self.calculate_position_size(trading_strategy)
        
        # 5. 设置风险管理参数
        risk_parameters = self.set_risk_parameters(trading_strategy)
        
        # 6. 生成最终决策
        final_decision = self.generate_final_decision(
            trading_strategy, position_size, risk_parameters
        )
        
        # 7. 更新交易记忆
        self.memory.update_decision(final_decision)
        
        return final_decision

核心功能模块

1. 分析综合模块

def synthesize_analysis(self, analysis_data: Dict) -> Dict:
    """综合分析所有输入数据"""
    
    # 提取各类分析结果
    analyst_reports = analysis_data.get("analyst_reports", {})
    research_consensus = analysis_data.get("research_consensus", {})
    market_data = analysis_data.get("market_data", {})
    
    # 分析师报告权重
    analyst_weights = self.config.get("analyst_weights", {
        "fundamentals": 0.3,
        "technical": 0.3,
        "news": 0.2,
        "social": 0.2
    })
    
    # 计算加权分析评分
    weighted_scores = {}
    total_score = 0
    
    for analyst_type, weight in analyst_weights.items():
        if analyst_type in analyst_reports:
            score = analyst_reports[analyst_type].get("overall_score", 0.5)
            weighted_scores[analyst_type] = score * weight
            total_score += weighted_scores[analyst_type]
    
    # 研究员共识影响
    consensus_impact = self._assess_consensus_impact(research_consensus)
    
    # 综合评估
    synthesis = {
        "analyst_scores": weighted_scores,
        "total_analyst_score": total_score,
        "consensus_impact": consensus_impact,
        "adjusted_score": self._adjust_score_with_consensus(total_score, consensus_impact),
        "confidence_level": self._calculate_overall_confidence(analyst_reports, research_consensus),
        "key_factors": self._extract_key_factors(analyst_reports, research_consensus)
    }
    
    return synthesis

def _assess_consensus_impact(self, research_consensus: Dict) -> Dict:
    """评估研究员共识的影响"""
    
    consensus_strength = research_consensus.get("consensus_level", 0.5)
    recommendation = research_consensus.get("recommendation", "neutral")
    
    # 共识强度影响权重
    if consensus_strength > 0.8:
        impact_weight = 0.3  # 高共识,高影响
    elif consensus_strength > 0.6:
        impact_weight = 0.2  # 中等共识,中等影响
    else:
        impact_weight = 0.1  # 低共识,低影响
    
    # 推荐方向影响
    direction_impact = {
        "谨慎乐观": 0.15,
        "谨慎悲观": -0.15,
        "中性观望": 0.0
    }.get(recommendation, 0.0)
    
    return {
        "strength": consensus_strength,
        "weight": impact_weight,
        "direction": direction_impact,
        "net_impact": direction_impact * impact_weight
    }

2. 市场条件评估

def assess_market_conditions(self, analysis_data: Dict) -> Dict:
    """评估当前市场条件"""
    
    market_indicators = {
        "volatility": self._assess_volatility(analysis_data),
        "liquidity": self._assess_liquidity(analysis_data),
        "sentiment": self._assess_market_sentiment(analysis_data),
        "trend": self._assess_market_trend(analysis_data),
        "correlation": self._assess_market_correlation(analysis_data)
    }
    
    # 市场环境分类
    market_regime = self._classify_market_regime(market_indicators)
    
    # 交易适宜性评估
    trading_suitability = self._assess_trading_suitability(market_indicators)
    
    return {
        "indicators": market_indicators,
        "regime": market_regime,
        "suitability": trading_suitability,
        "risk_level": self._calculate_market_risk_level(market_indicators)
    }

def _classify_market_regime(self, indicators: Dict) -> str:
    """分类市场环境"""
    
    volatility = indicators["volatility"]["level"]
    trend = indicators["trend"]["direction"]
    sentiment = indicators["sentiment"]["score"]
    
    if volatility == "low" and trend == "uptrend" and sentiment > 0.6:
        return "bull_market"
    elif volatility == "high" and trend == "downtrend" and sentiment < 0.4:
        return "bear_market"
    elif volatility == "high":
        return "volatile_market"
    else:
        return "sideways_market"

def _assess_trading_suitability(self, indicators: Dict) -> Dict:
    """评估交易适宜性"""
    
    suitability_score = 0.5
    factors = []
    
    # 波动性影响
    if indicators["volatility"]["level"] == "low":
        suitability_score += 0.1
        factors.append("低波动性有利于交易")
    elif indicators["volatility"]["level"] == "high":
        suitability_score -= 0.1
        factors.append("高波动性增加交易风险")
    
    # 流动性影响
    if indicators["liquidity"]["level"] == "high":
        suitability_score += 0.1
        factors.append("高流动性便于进出")
    
    # 趋势明确性影响
    if indicators["trend"]["strength"] > 0.7:
        suitability_score += 0.1
        factors.append("趋势明确有利于交易")
    
    return {
        "score": max(0.0, min(1.0, suitability_score)),
        "factors": factors,
        "recommendation": "适宜" if suitability_score > 0.6 else "谨慎" if suitability_score > 0.4 else "不适宜"
    }

3. 交易策略制定

def develop_trading_strategy(self, analysis: Dict, market_conditions: Dict) -> Dict:
    """制定交易策略"""
    
    # 基于分析结果确定基本方向
    base_signal = self._determine_base_signal(analysis)
    
    # 根据市场条件调整策略
    adjusted_strategy = self._adjust_for_market_conditions(base_signal, market_conditions)
    
    # 选择交易类型
    trade_type = self._select_trade_type(adjusted_strategy, market_conditions)
    
    # 设定时间框架
    time_horizon = self._determine_time_horizon(adjusted_strategy, market_conditions)
    
    # 制定进出场策略
    entry_exit_strategy = self._develop_entry_exit_strategy(adjusted_strategy)
    
    return {
        "base_signal": base_signal,
        "adjusted_signal": adjusted_strategy,
        "trade_type": trade_type,
        "time_horizon": time_horizon,
        "entry_strategy": entry_exit_strategy["entry"],
        "exit_strategy": entry_exit_strategy["exit"],
        "confidence": self._calculate_strategy_confidence(analysis, market_conditions)
    }

def _determine_base_signal(self, analysis: Dict) -> Dict:
    """确定基本交易信号"""
    
    adjusted_score = analysis.get("adjusted_score", 0.5)
    confidence = analysis.get("confidence_level", 0.5)
    
    # 信号强度阈值
    strong_buy_threshold = 0.7
    buy_threshold = 0.6
    sell_threshold = 0.4
    strong_sell_threshold = 0.3
    
    if adjusted_score >= strong_buy_threshold and confidence > 0.7:
        signal = "strong_buy"
    elif adjusted_score >= buy_threshold:
        signal = "buy"
    elif adjusted_score <= strong_sell_threshold and confidence > 0.7:
        signal = "strong_sell"
    elif adjusted_score <= sell_threshold:
        signal = "sell"
    else:
        signal = "hold"
    
    return {
        "action": signal,
        "strength": abs(adjusted_score - 0.5) * 2,  # 0-1 scale
        "confidence": confidence,
        "score": adjusted_score
    }

def _select_trade_type(self, strategy: Dict, market_conditions: Dict) -> str:
    """选择交易类型"""
    
    signal_strength = strategy.get("strength", 0.5)
    market_regime = market_conditions.get("regime", "sideways_market")
    volatility = market_conditions["indicators"]["volatility"]["level"]
    
    # 根据信号强度和市场条件选择交易类型
    if signal_strength > 0.8 and market_regime in ["bull_market", "bear_market"]:
        return "momentum_trade"  # 动量交易
    elif signal_strength > 0.6 and volatility == "low":
        return "swing_trade"     # 波段交易
    elif market_regime == "sideways_market":
        return "range_trade"     # 区间交易
    else:
        return "position_trade"  # 仓位交易

4. 仓位管理

def calculate_position_size(self, strategy: Dict) -> Dict:
    """计算仓位大小"""
    
    # 基础仓位大小 (基于信号强度)
    signal_strength = strategy.get("strength", 0.5)
    base_position = signal_strength * self.config.get("max_position_size", 0.1)
    
    # 风险调整
    risk_adjustment = self._calculate_risk_adjustment(strategy)
    
    # 市场条件调整
    market_adjustment = self._calculate_market_adjustment(strategy)
    
    # 最终仓位大小
    final_position = base_position * risk_adjustment * market_adjustment
    
    # 确保在限制范围内
    max_position = self.config.get("max_position_size", 0.1)
    min_position = self.config.get("min_position_size", 0.01)
    
    final_position = max(min_position, min(max_position, final_position))
    
    return {
        "base_size": base_position,
        "risk_adjustment": risk_adjustment,
        "market_adjustment": market_adjustment,
        "final_size": final_position,
        "size_rationale": self._explain_position_sizing(
            base_position, risk_adjustment, market_adjustment
        )
    }

def _calculate_risk_adjustment(self, strategy: Dict) -> float:
    """计算风险调整系数"""
    
    confidence = strategy.get("confidence", 0.5)
    trade_type = strategy.get("trade_type", "position_trade")
    
    # 基于置信度的调整
    confidence_adjustment = 0.5 + (confidence - 0.5)
    
    # 基于交易类型的调整
    type_adjustments = {
        "momentum_trade": 1.2,    # 动量交易可以稍大仓位
        "swing_trade": 1.0,       # 波段交易标准仓位
        "range_trade": 0.8,       # 区间交易较小仓位
        "position_trade": 0.9     # 仓位交易中等仓位
    }
    
    type_adjustment = type_adjustments.get(trade_type, 1.0)
    
    return confidence_adjustment * type_adjustment

5. 风险管理参数

def set_risk_parameters(self, strategy: Dict) -> Dict:
    """设置风险管理参数"""
    
    trade_type = strategy.get("trade_type", "position_trade")
    signal_strength = strategy.get("strength", 0.5)
    time_horizon = strategy.get("time_horizon", "medium")
    
    # 止损设置
    stop_loss = self._calculate_stop_loss(trade_type, signal_strength)
    
    # 止盈设置
    take_profit = self._calculate_take_profit(trade_type, signal_strength)
    
    # 风险收益比
    risk_reward_ratio = take_profit / stop_loss if stop_loss > 0 else 0
    
    # 最大持有时间
    max_holding_period = self._determine_max_holding_period(time_horizon)
    
    return {
        "stop_loss": stop_loss,
        "take_profit": take_profit,
        "risk_reward_ratio": risk_reward_ratio,
        "max_holding_period": max_holding_period,
        "position_monitoring": self._setup_position_monitoring(strategy),
        "exit_conditions": self._define_exit_conditions(strategy)
    }

def _calculate_stop_loss(self, trade_type: str, signal_strength: float) -> float:
    """计算止损水平"""
    
    base_stop_loss = {
        "momentum_trade": 0.03,   # 3% 止损
        "swing_trade": 0.05,      # 5% 止损
        "range_trade": 0.02,      # 2% 止损
        "position_trade": 0.08    # 8% 止损
    }.get(trade_type, 0.05)
    
    # 根据信号强度调整
    # 信号越强,可以容忍更大的回撤
    strength_adjustment = 1.0 + (signal_strength - 0.5) * 0.5
    
    return base_stop_loss * strength_adjustment

def _calculate_take_profit(self, trade_type: str, signal_strength: float) -> float:
    """计算止盈水平"""
    
    base_take_profit = {
        "momentum_trade": 0.08,   # 8% 止盈
        "swing_trade": 0.12,      # 12% 止盈
        "range_trade": 0.04,      # 4% 止盈
        "position_trade": 0.20    # 20% 止盈
    }.get(trade_type, 0.10)
    
    # 根据信号强度调整
    strength_adjustment = 1.0 + signal_strength * 0.5
    
    return base_take_profit * strength_adjustment

6. 最终决策生成

def generate_final_decision(self, strategy: Dict, position: Dict, risk_params: Dict) -> Dict:
    """生成最终交易决策"""
    
    action = strategy["adjusted_signal"]["action"]
    
    # 构建决策结构
    decision = {
        "action": action,
        "quantity": position["final_size"],
        "confidence": strategy["confidence"],
        "strategy_type": strategy["trade_type"],
        "time_horizon": strategy["time_horizon"],
        
        # 风险管理
        "stop_loss": risk_params["stop_loss"],
        "take_profit": risk_params["take_profit"],
        "risk_reward_ratio": risk_params["risk_reward_ratio"],
        
        # 执行细节
        "entry_strategy": strategy["entry_strategy"],
        "exit_strategy": strategy["exit_strategy"],
        "max_holding_period": risk_params["max_holding_period"],
        
        # 决策依据
        "reasoning": self._generate_decision_reasoning(strategy, position, risk_params),
        "key_factors": strategy.get("key_factors", []),
        
        # 监控要求
        "monitoring_requirements": risk_params["position_monitoring"],
        "exit_conditions": risk_params["exit_conditions"],
        
        # 元数据
        "decision_timestamp": datetime.now().isoformat(),
        "decision_id": self._generate_decision_id(),
        "trader_style": self.trading_style,
        "risk_tolerance": self.risk_tolerance
    }
    
    return decision

def _generate_decision_reasoning(self, strategy: Dict, position: Dict, risk_params: Dict) -> str:
    """生成决策推理说明"""
    
    action = strategy["adjusted_signal"]["action"]
    confidence = strategy["confidence"]
    trade_type = strategy["trade_type"]
    
    reasoning_parts = []
    
    # 基本决策逻辑
    if action in ["buy", "strong_buy"]:
        reasoning_parts.append(f"基于综合分析,建议{action}该股票")
    elif action in ["sell", "strong_sell"]:
        reasoning_parts.append(f"基于综合分析,建议{action}该股票")
    else:
        reasoning_parts.append("当前分析结果建议持有观望")
    
    # 置信度说明
    if confidence > 0.8:
        reasoning_parts.append(f"决策置信度很高({confidence:.1%})")
    elif confidence > 0.6:
        reasoning_parts.append(f"决策置信度较高({confidence:.1%})")
    else:
        reasoning_parts.append(f"决策置信度中等({confidence:.1%}),建议谨慎操作")
    
    # 交易类型说明
    reasoning_parts.append(f"采用{trade_type}策略")
    
    # 风险管理说明
    reasoning_parts.append(
        f"设置{risk_params['stop_loss']:.1%}止损和{risk_params['take_profit']:.1%}止盈"
    )
    
    # 仓位说明
    reasoning_parts.append(f"建议仓位大小为{position['final_size']:.1%}")
    
    return "。".join(reasoning_parts) + "。"

7. 交易记忆管理

class TradingMemory:
    """交易记忆管理"""
    
    def __init__(self):
        self.decision_history = []
        self.performance_metrics = {}
        self.learning_insights = []
    
    def update_decision(self, decision: Dict):
        """更新决策记录"""
        self.decision_history.append(decision)
        
        # 保持最近100个决策
        if len(self.decision_history) > 100:
            self.decision_history = self.decision_history[-100:]
    
    def learn_from_outcomes(self, decision_id: str, outcome: Dict):
        """从交易结果中学习"""
        
        # 找到对应的决策
        decision = self._find_decision(decision_id)
        if not decision:
            return
        
        # 分析决策质量
        decision_quality = self._analyze_decision_quality(decision, outcome)
        
        # 提取学习要点
        insights = self._extract_learning_insights(decision, outcome, decision_quality)
        
        # 更新学习记录
        self.learning_insights.extend(insights)
        
        # 更新性能指标
        self._update_performance_metrics(decision, outcome)
    
    def get_relevant_experience(self, current_situation: Dict) -> List[Dict]:
        """获取相关的历史经验"""
        
        relevant_decisions = []
        
        for decision in self.decision_history:
            similarity = self._calculate_situation_similarity(
                current_situation, decision
            )
            
            if similarity > 0.7:  # 相似度阈值
                relevant_decisions.append({
                    "decision": decision,
                    "similarity": similarity
                })
        
        # 按相似度排序
        relevant_decisions.sort(key=lambda x: x["similarity"], reverse=True)
        
        return relevant_decisions[:5]  # 返回最相似的5个决策

交易员智能体通过综合分析、策略制定、风险管理和持续学习,确保在复杂的市场环境中做出明智的投资决策。