TradingAgents-CN

数据流架构

概述

TradingAgents 的数据流架构设计用于高效地获取、处理和分发金融数据。系统支持多种数据源,实现了统一的数据接口,并提供了强大的缓存和处理机制。

数据流架构图

graph TB
    subgraph "外部数据源"
        FINNHUB[FinnHub API<br/>实时金融数据]
        YAHOO[Yahoo Finance<br/>历史价格数据]
        REDDIT[Reddit API<br/>社交媒体数据]
        GNEWS[Google News<br/>新闻数据]
        CUSTOM[自定义数据源<br/>扩展接口]
    end
    
    subgraph "数据获取层"
        FUTILS[FinnHub Utils]
        YUTILS[YFinance Utils]
        RUTILS[Reddit Utils]
        NUTILS[News Utils]
        SUTILS[StockStats Utils]
    end
    
    subgraph "数据处理层"
        INTERFACE[Data Interface]
        PROCESSOR[Data Processor]
        VALIDATOR[Data Validator]
        TRANSFORMER[Data Transformer]
    end
    
    subgraph "缓存层"
        CACHE[Data Cache]
        REDIS[Redis Cache]
        LOCAL[Local Cache]
        MEMORY[Memory Cache]
    end
    
    subgraph "数据分发层"
        DISPATCHER[Data Dispatcher]
        ROUTER[Data Router]
        FORMATTER[Data Formatter]
    end
    
    subgraph "智能体消费层"
        ANALYSTS[分析师团队]
        RESEARCHERS[研究员团队]
        TRADER[交易员]
        RISK[风险管理]
    end
    
    FINNHUB --> FUTILS
    YAHOO --> YUTILS
    REDDIT --> RUTILS
    GNEWS --> NUTILS
    CUSTOM --> SUTILS
    
    FUTILS --> INTERFACE
    YUTILS --> INTERFACE
    RUTILS --> INTERFACE
    NUTILS --> INTERFACE
    SUTILS --> INTERFACE
    
    INTERFACE --> PROCESSOR
    PROCESSOR --> VALIDATOR
    VALIDATOR --> TRANSFORMER
    
    TRANSFORMER --> CACHE
    CACHE --> REDIS
    CACHE --> LOCAL
    CACHE --> MEMORY
    
    CACHE --> DISPATCHER
    DISPATCHER --> ROUTER
    ROUTER --> FORMATTER
    
    FORMATTER --> ANALYSTS
    FORMATTER --> RESEARCHERS
    FORMATTER --> TRADER
    FORMATTER --> RISK

数据源详解

1. FinnHub API

class FinnHubUtils:
    """FinnHub 数据获取工具"""
    
    支持的数据类型:
    - 实时股价数据
    - 公司基本信息
    - 财务报表数据
    - 新闻和公告
    - 技术指标
    - 市场情绪指标
    
    API限制:
    - 免费版: 60 calls/minute
    - 付费版: 更高频率限制
    
    数据格式:
    {
        "symbol": "AAPL",
        "price": 150.25,
        "change": 2.15,
        "changePercent": 1.45,
        "timestamp": 1640995200
    }

2. Yahoo Finance

class YFinanceUtils:
    """Yahoo Finance 数据获取工具"""
    
    支持的数据类型:
    - 历史价格数据
    - 股票分割信息
    - 股息数据
    - 期权数据
    - 财务指标
    
    优势:
    - 免费使用
    - 数据覆盖面广
    - 历史数据丰富
    
    数据格式:
    {
        "Date": "2024-01-01",
        "Open": 148.50,
        "High": 152.30,
        "Low": 147.80,
        "Close": 150.25,
        "Volume": 45678900
    }

3. Reddit API

class RedditUtils:
    """Reddit 社交媒体数据获取工具"""
    
    支持的数据类型:
    - 热门帖子
    - 评论情感
    - 用户讨论热度
    - 关键词提及频率
    
    分析维度:
    - 情感极性 (正面/负面/中性)
    - 讨论热度
    - 用户参与度
    - 话题趋势
    
    数据格式:
    {
        "post_id": "abc123",
        "title": "AAPL earnings discussion",
        "score": 1250,
        "comments": 89,
        "sentiment": 0.65,
        "timestamp": 1640995200
    }

4. Google News

class GoogleNewsUtils:
    """Google News 新闻数据获取工具"""
    
    支持的数据类型:
    - 相关新闻文章
    - 新闻情感分析
    - 事件时间线
    - 影响力评估
    
    处理流程:
    1. 关键词搜索
    2. 新闻筛选
    3. 内容提取
    4. 情感分析
    5. 影响力评估
    
    数据格式:
    {
        "title": "Apple reports strong Q4 earnings",
        "source": "Reuters",
        "published": "2024-01-01T10:00:00Z",
        "sentiment": 0.8,
        "relevance": 0.95,
        "impact_score": 0.7
    }

数据处理流程

1. 数据获取阶段

class DataAcquisition:
    """数据获取协调器"""
    
    def fetch_data(self, symbol: str, date: str) -> Dict:
        """获取指定股票和日期的所有数据"""
        
        # 并行获取各类数据
        tasks = [
            self.fetch_price_data(symbol, date),
            self.fetch_fundamental_data(symbol),
            self.fetch_news_data(symbol, date),
            self.fetch_social_data(symbol, date),
            self.fetch_technical_data(symbol, date)
        ]
        
        # 等待所有任务完成
        results = await asyncio.gather(*tasks)
        
        # 整合数据
        return self.integrate_data(results)

2. 数据验证阶段

class DataValidator:
    """数据验证器"""
    
    验证规则:
    - 数据完整性检查
    - 数据类型验证
    - 数值范围检查
    - 时间戳验证
    - 异常值检测
    
    def validate(self, data: Dict) -> Tuple[bool, List[str]]:
        """验证数据质量"""
        errors = []
        
        # 检查必需字段
        if not self.check_required_fields(data):
            errors.append("Missing required fields")
        
        # 检查数据类型
        if not self.check_data_types(data):
            errors.append("Invalid data types")
        
        # 检查数值范围
        if not self.check_value_ranges(data):
            errors.append("Values out of range")
        
        return len(errors) == 0, errors

3. 数据转换阶段

class DataTransformer:
    """数据转换器"""
    
    转换功能:
    - 数据标准化
    - 单位统一
    - 格式转换
    - 特征工程
    - 数据聚合
    
    def transform(self, raw_data: Dict) -> Dict:
        """转换原始数据为标准格式"""
        
        transformed = {}
        
        # 价格数据标准化
        transformed['price_data'] = self.normalize_prices(
            raw_data['price_data']
        )
        
        # 财务数据转换
        transformed['financial_data'] = self.convert_financials(
            raw_data['financial_data']
        )
        
        # 情感数据聚合
        transformed['sentiment_data'] = self.aggregate_sentiment(
            raw_data['news_data'],
            raw_data['social_data']
        )
        
        return transformed

缓存策略

1. 多层缓存架构

class CacheManager:
    """缓存管理器"""
    
    缓存层次:
    1. 内存缓存 (最快访问)
    2. 本地文件缓存 (持久化)
    3. Redis缓存 (分布式)
    4. 数据库缓存 (长期存储)
    
    def get_data(self, key: str) -> Optional[Dict]:
        """按优先级获取缓存数据"""
        
        # 1. 检查内存缓存
        if data := self.memory_cache.get(key):
            return data
        
        # 2. 检查本地缓存
        if data := self.local_cache.get(key):
            self.memory_cache.set(key, data)
            return data
        
        # 3. 检查Redis缓存
        if data := self.redis_cache.get(key):
            self.local_cache.set(key, data)
            self.memory_cache.set(key, data)
            return data
        
        return None

2. 缓存策略

缓存配置:
{
    "price_data": {
        "ttl": 300,        # 5分钟过期
        "refresh": "auto"   # 自动刷新
    },
    "fundamental_data": {
        "ttl": 86400,      # 24小时过期
        "refresh": "manual" # 手动刷新
    },
    "news_data": {
        "ttl": 3600,       # 1小时过期
        "refresh": "auto"   # 自动刷新
    },
    "social_data": {
        "ttl": 1800,       # 30分钟过期
        "refresh": "auto"   # 自动刷新
    }
}

数据分发机制

1. 数据路由

class DataRouter:
    """数据路由器"""
    
    路由规则:
    - 基本面数据  基本面分析师
    - 技术数据  技术分析师
    - 新闻数据  新闻分析师
    - 社交数据  社交媒体分析师
    - 综合数据  所有智能体
    
    def route_data(self, data: Dict, agents: List[str]) -> Dict:
        """根据智能体类型分发相应数据"""
        
        routed_data = {}
        
        for agent in agents:
            if agent == "fundamentals_analyst":
                routed_data[agent] = {
                    "financial_data": data["financial_data"],
                    "company_info": data["company_info"],
                    "industry_data": data["industry_data"]
                }
            elif agent == "technical_analyst":
                routed_data[agent] = {
                    "price_data": data["price_data"],
                    "volume_data": data["volume_data"],
                    "technical_indicators": data["technical_indicators"]
                }
            # ... 其他智能体的路由规则
        
        return routed_data

2. 数据格式化

class DataFormatter:
    """数据格式化器"""
    
    def format_for_agent(self, data: Dict, agent_type: str) -> Dict:
        """为特定智能体格式化数据"""
        
        if agent_type == "fundamentals_analyst":
            return self.format_fundamental_data(data)
        elif agent_type == "technical_analyst":
            return self.format_technical_data(data)
        elif agent_type == "news_analyst":
            return self.format_news_data(data)
        elif agent_type == "social_analyst":
            return self.format_social_data(data)
        
        return data

性能优化

1. 并行处理

2. 智能缓存

3. 数据压缩

4. 错误处理

这种数据流架构确保了系统能够高效、可靠地处理大量金融数据,为智能体提供高质量的数据支持。